This is an automated email from the ASF dual-hosted git repository. njayaram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit f15384b6b16ae9e52d34155186e3b2d59dc00654 Author: Domino Valdano <[email protected]> AuthorDate: Wed May 15 12:15:04 2019 -0700 DL: Refactor gp_segment_id_col This was getting passed around as a param to a lot of different functions, but all it was really doing is serving as a flag to say whether we're running on greenplum or postgres. Now this is just determined right before the query by calling is_platform_pg(). Other minor chages: - Removed extraneous error check in fit_transiton - Renamed rows_per_seg to images_per_seg param in compute_metrics. - Remove STRICT property from internal_keras_eval_transition() and internal_keras_eval_final(). We think this may result in slightly better error detection. Co-authored-by: Orhan Kislal <[email protected]> --- .../modules/deep_learning/madlib_keras.py_in | 85 ++++++++++------------ .../modules/deep_learning/madlib_keras.sql_in | 4 +- .../deep_learning/madlib_keras_predict.py_in | 2 +- 3 files changed, 42 insertions(+), 49 deletions(-) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in index be3b4e0..6393dd8 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in @@ -114,20 +114,19 @@ def fit(schema_madlib, source_table, model,model_arch_table, #TODO: Refactor the pg related logic in a future PR when we think # about making the fit function easier to read and maintain. if is_platform_pg(): + gp_segment_id_col = '0' set_keras_session(gpus_per_host, segments_per_host) else: # we want to disable gpu on gpdb's master node because GPUs will only be used # for segment nodes. set_cuda_env('-1') + gp_segment_id_col = 'gp_segment_id' # Compute total images on each segment - gp_segment_id_col,\ - seg_ids_train,\ - images_per_seg_train = get_images_per_seg(source_table, dependent_varname) + seg_ids_train, images_per_seg_train = get_images_per_seg(source_table, dependent_varname) if validation_table: - _, seg_ids_val,\ - images_per_seg_val = get_images_per_seg(validation_table, dependent_varname) + seg_ids_val, images_per_seg_val = get_images_per_seg(validation_table, dependent_varname) # Convert model from json and initialize weights master_model = model_from_json(model_arch) @@ -194,8 +193,7 @@ def fit(schema_madlib, source_table, model,model_arch_table, schema_madlib, source_table, dependent_varname, independent_varname, compile_params_to_pass, model_arch, serialized_weights, gpus_per_host, segments_per_host, seg_ids_train, - images_per_seg_train, gp_segment_id_col, - training_metrics, training_loss, + images_per_seg_train, training_metrics, training_loss, i, "Training") metrics_iters.append(i) if validation_set_provided: @@ -204,8 +202,7 @@ def fit(schema_madlib, source_table, model,model_arch_table, schema_madlib, validation_table, dependent_varname, independent_varname, compile_params_to_pass, model_arch, serialized_weights, gpus_per_host, segments_per_host, seg_ids_val, - images_per_seg_val, gp_segment_id_col, - validation_metrics, validation_loss, + images_per_seg_val, validation_metrics, validation_loss, i, "Validation") metrics_elapsed_end_time = time.time() metrics_elapsed_time.append( @@ -344,8 +341,8 @@ def get_metrics_sql_string(metrics_list, is_metrics_specified): def compute_loss_and_metrics(schema_madlib, table, dependent_varname, independent_varname, compile_params, model_arch, serialized_weights, gpus_per_host, segments_per_host, - seg_ids, rows_per_seg, - gp_segment_id_col, metrics_list, loss_list, + seg_ids, images_per_seg_val, + metrics_list, loss_list, curr_iter, dataset_name): """ Compute the loss and metric using a given model (serialized_weights) on the @@ -353,17 +350,16 @@ def compute_loss_and_metrics(schema_madlib, table, dependent_varname, """ start_val = time.time() evaluate_result = get_loss_metric_from_keras_eval(schema_madlib, - table, - dependent_varname, - independent_varname, - compile_params, - model_arch, - serialized_weights, - gpus_per_host, - segments_per_host, - seg_ids, - rows_per_seg, - gp_segment_id_col) + table, + dependent_varname, + independent_varname, + compile_params, + model_arch, + serialized_weights, + gpus_per_host, + segments_per_host, + seg_ids, + images_per_seg_val) end_val = time.time() plpy.info("Time for evaluation in iteration {0}: {1} sec.". format( curr_iter, end_val - start_val)) @@ -404,8 +400,6 @@ def get_images_per_seg(source_table, dependent_varname): :param source_table: :param dependent_var: :return: Returns a string and two arrays - 1. The appropriate string to use for querying segment number - ("gp_segment_id" for gpdb or "-1" for postgres). 1. An array containing all the segment numbers in ascending order 1. An array containing the total images on each of the segments in the segment array. @@ -417,7 +411,6 @@ def get_images_per_seg(source_table, dependent_varname): """.format(dependent_varname, source_table)) images_per_seg = [int(res[0]['images_per_seg'])] seg_ids = [0] - gp_segment_id_col = -1 else: images_per_seg = plpy.execute( """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS images_per_seg @@ -428,8 +421,7 @@ def get_images_per_seg(source_table, dependent_varname): for each_segment in images_per_seg] images_per_seg = [int(each_segment["images_per_seg"]) for each_segment in images_per_seg] - gp_segment_id_col = 'gp_segment_id' - return gp_segment_id_col, seg_ids, images_per_seg + return seg_ids, images_per_seg def fit_transition(state, dependent_var, independent_var, model_architecture, compile_params, fit_params, current_seg_id, seg_ids, @@ -478,15 +470,14 @@ def fit_transition(state, dependent_var, independent_var, model_architecture, total_images = images_per_seg[0] else: total_images = images_per_seg[seg_ids.index(current_seg_id)] + if total_images == 0: - plpy.error('Got 0 rows. Expected at least 1.') + plpy.error('Total images is 0 in fit_transition on segment {0}'.format(current_seg_id)) # Re-serialize the weights # Update image count, check if we are done if agg_image_count == total_images: - if total_images == 0: - plpy.error('Total images is 0') - # Once done with all images on a segment, we update weights + # Once done with all images on a segment, we update weights # with the total number of images here instead of the merge function. # The merge function only deals with aggregating them. updated_weights = [ total_images * w for w in updated_weights ] @@ -580,8 +571,10 @@ def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table, def get_loss_metric_from_keras_eval(schema_madlib, table, dependent_varname, independent_varname, compile_params, model_arch, serialized_weights, gpus_per_host, - segments_per_host, seg_ids, images_per_seg, - gp_segment_id_col): + segments_per_host, seg_ids, images_per_seg): + + gp_segment_id_col = '0' if is_platform_pg() else 'gp_segment_id' + """ This function will call the internal keras evaluate function to get the loss and accuracy of each tuple which then gets averaged to get the final result. @@ -591,19 +584,19 @@ def get_loss_metric_from_keras_eval(schema_madlib, table, dependent_varname, -- The right solution is either to change the datatype of the agg function from -- SMALLINT to INTEGER, or change the output of minibatch util to produce SMALLINT -- For the first, we should change fit_step also - SELECT ({schema_madlib}.internal_keras_evaluate( - {dependent_varname}::SMALLINT[], - {independent_varname}::REAL[], - $MAD${model_arch}$MAD$, - $1, - {compile_params}, - {gp_segment_id_col}, - ARRAY{seg_ids}, - ARRAY{images_per_seg}, - {gpus_per_host}, - {segments_per_host} - )) AS loss_metric - FROM {table} + select ({schema_madlib}.internal_keras_evaluate( + {dependent_varname}::SMALLINT[], + {independent_varname}::REAL[], + $MAD${model_arch}$MAD$, + $1, + {compile_params}, + {gp_segment_id_col}, + ARRAY{seg_ids}, + ARRAY{images_per_seg}, + {gpus_per_host}, + {segments_per_host} + )) as loss_metric + from {table} """.format(**locals()), ["bytea"]) res = plpy.execute(evaluate_query, [serialized_weights]) loss_metric = res[0]['loss_metric'] diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in index 4db18b8..f18c63d 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in @@ -294,7 +294,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_transition( ) RETURNS REAL[3] AS $$ PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras') return madlib_keras.internal_keras_eval_transition(**globals()) -$$ LANGUAGE plpythonu STRICT +$$ LANGUAGE plpythonu m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_merge( @@ -312,7 +312,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_final( PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras') return madlib_keras.internal_keras_eval_final(**globals()) $$ LANGUAGE plpythonu -m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `') STRICT; +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.internal_keras_evaluate( SMALLINT[], diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in index b4d21fb..deff6cf 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in @@ -131,7 +131,7 @@ def get_images_per_seg_for_non_minibatched_data(table_name): FROM {0} """.format(table_name)) seg_ids = [0] - gp_segment_id_col = -1 + gp_segment_id_col = '0' else: # Compute total buffers on each segment images_per_seg = plpy.execute(
