kaknikhil commented on a change in pull request #443: DL: Add training for
multiple models
URL: https://github.com/apache/madlib/pull/443#discussion_r325819494
##########
File path: src/ports/postgres/modules/deep_learning/madlib_keras.py_in
##########
@@ -387,70 +396,91 @@ def should_compute_metrics_this_iter(curr_iter,
metrics_compute_frequency,
return (curr_iter)%metrics_compute_frequency == 0 or \
curr_iter == num_iterations
+def init_model(model_architecture, compile_params):
+ """
+ Should only be called at the first row of first iteration.
+ """
+ segment_model = model_from_json(model_architecture)
+ compile_model(segment_model, compile_params)
+ return segment_model
+
+def update_model(segment_model, prev_serialized_weights):
+ """
+ Happens at first row of each iteration.
+ """
+ model_shapes = get_model_shapes(segment_model)
+ model_weights = madlib_keras_serializer.deserialize_as_nd_weights(
+ prev_serialized_weights, model_shapes)
+ segment_model.set_weights(model_weights)
+
def fit_transition(state, dependent_var, independent_var, dependent_var_shape,
independent_var_shape, model_architecture,
compile_params, fit_params, current_seg_id, seg_ids,
images_per_seg, gpus_per_host, segments_per_host,
- prev_serialized_weights, **kwargs):
+ prev_serialized_weights, is_final_iteration=True,
+ is_multiple_model=False, **kwargs):
if not independent_var or not dependent_var:
return state
-
start_transition = time.time()
SD = kwargs['SD']
device_name = get_device_name_and_set_cuda_env(gpus_per_host,
current_seg_id)
- # Set up system if this is the first buffer on segment'
- if not state:
- set_keras_session(device_name, gpus_per_host, segments_per_host)
- segment_model = model_from_json(model_architecture)
- compile_and_set_weights(segment_model, compile_params, device_name,
- prev_serialized_weights)
-
- SD['segment_model'] = segment_model
- agg_image_count = 0
+ if is_multiple_model:
+ prev_serialized_weights = madlib_keras_serializer.\
+ get_serialized_1d_weights_from_state(prev_serialized_weights)
+ # If a live session is present, re-use it. Otherwise, recreate it.
+ if SD_NAMES.SESS in SD :
+ if SD_NAMES.SEGMENT_MODEL not in SD:
+ plpy.error("Session and model should exist in SD after the first
row"
+ "of the first iteration")
+ sess = SD[SD_NAMES.SESS]
+ segment_model = SD[SD_NAMES.SEGMENT_MODEL]
+ K.set_session(sess)
else:
- segment_model = SD['segment_model']
- agg_image_count =
madlib_keras_serializer.get_image_count_from_state(state)
+ sess = get_keras_session(device_name, gpus_per_host, segments_per_host)
+ SD[SD_NAMES.SESS] = sess
+ K.set_session(sess)
+ segment_model = init_model(model_architecture, compile_params)
+ SD[SD_NAMES.SEGMENT_MODEL] = segment_model
+ agg_image_count = madlib_keras_serializer.get_image_count_from_state(state)
+ if not state: # first row each iteration
+ update_model(segment_model, prev_serialized_weights)
# Prepare the data
x_train = np_array_float32(independent_var, independent_var_shape)
y_train = np_array_int16(dependent_var, dependent_var_shape)
# Fit segment model on data
start_fit = time.time()
- with K.tf.device(device_name):
- #TODO consider not doing this every time
- fit_params = parse_and_validate_fit_params(fit_params)
- history = segment_model.fit(x_train, y_train, **fit_params)
+ #TODO consider not doing this every time
+ fit_params = parse_and_validate_fit_params(fit_params)
+ history = segment_model.fit(x_train, y_train, **fit_params)
end_fit = time.time()
-
image_count = len(x_train)
+
# Aggregating number of images, loss and accuracy
agg_image_count += image_count
-
- with K.tf.device(device_name):
- updated_weights = segment_model.get_weights()
-
+ updated_weights = segment_model.get_weights()
total_images = get_image_count_per_seg_from_array(current_seg_id, seg_ids,
images_per_seg)
- # Re-serialize the weights
- # Update image count, check if we are done
+ # last row of the last iteration
if agg_image_count == total_images:
- # Once done with all images on a segment, we update weights
- # with the total number of images here instead of the merge function.
- # The merge function only deals with aggregating them.
- updated_weights = [ total_images * w for w in updated_weights ]
- # In GPDB, each segment would have a keras session, so clear
- # them after the last buffer is processed.
- clear_keras_session()
+ if not is_multiple_model:
Review comment:
Should we add a comment saying something along the lines of `For multi
model, we don't need to update weights with the total no of images because
there is no merge function` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services