This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit f15384b6b16ae9e52d34155186e3b2d59dc00654
Author: Domino Valdano <[email protected]>
AuthorDate: Wed May 15 12:15:04 2019 -0700

    DL: Refactor gp_segment_id_col
    
    This was getting passed around as a param to a lot of different
    functions, but all it was really doing is serving as a flag
    to say whether we're running on greenplum or postgres.  Now this
    is just determined right before the query by calling is_platform_pg().
    
    Other minor chages:
      - Removed extraneous error check in fit_transiton
      - Renamed rows_per_seg to images_per_seg param in compute_metrics.
      - Remove STRICT property from internal_keras_eval_transition()
           and internal_keras_eval_final().  We think this may result
           in slightly better error detection.
    
    Co-authored-by: Orhan Kislal <[email protected]>
---
 .../modules/deep_learning/madlib_keras.py_in       | 85 ++++++++++------------
 .../modules/deep_learning/madlib_keras.sql_in      |  4 +-
 .../deep_learning/madlib_keras_predict.py_in       |  2 +-
 3 files changed, 42 insertions(+), 49 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index be3b4e0..6393dd8 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -114,20 +114,19 @@ def fit(schema_madlib, source_table, 
model,model_arch_table,
     #TODO: Refactor the pg related logic in a future PR when we think
     # about making the fit function easier to read and maintain.
     if is_platform_pg():
+        gp_segment_id_col = '0'
         set_keras_session(gpus_per_host, segments_per_host)
     else:
         # we want to disable gpu on gpdb's master node because GPUs will only 
be used
         # for segment nodes.
         set_cuda_env('-1')
+        gp_segment_id_col = 'gp_segment_id'
 
     # Compute total images on each segment
-    gp_segment_id_col,\
-    seg_ids_train,\
-    images_per_seg_train = get_images_per_seg(source_table, dependent_varname)
+    seg_ids_train, images_per_seg_train = get_images_per_seg(source_table, 
dependent_varname)
 
     if validation_table:
-        _, seg_ids_val,\
-        images_per_seg_val = get_images_per_seg(validation_table, 
dependent_varname)
+        seg_ids_val, images_per_seg_val = get_images_per_seg(validation_table, 
dependent_varname)
 
     # Convert model from json and initialize weights
     master_model = model_from_json(model_arch)
@@ -194,8 +193,7 @@ def fit(schema_madlib, source_table, model,model_arch_table,
                 schema_madlib, source_table, dependent_varname,
                 independent_varname, compile_params_to_pass, model_arch,
                 serialized_weights, gpus_per_host, segments_per_host, 
seg_ids_train,
-                images_per_seg_train, gp_segment_id_col,
-                training_metrics, training_loss,
+                images_per_seg_train, training_metrics, training_loss,
                 i, "Training")
             metrics_iters.append(i)
             if validation_set_provided:
@@ -204,8 +202,7 @@ def fit(schema_madlib, source_table, model,model_arch_table,
                     schema_madlib, validation_table, dependent_varname,
                     independent_varname, compile_params_to_pass, model_arch,
                     serialized_weights, gpus_per_host, segments_per_host, 
seg_ids_val,
-                    images_per_seg_val, gp_segment_id_col,
-                    validation_metrics, validation_loss,
+                    images_per_seg_val, validation_metrics, validation_loss,
                     i, "Validation")
             metrics_elapsed_end_time = time.time()
             metrics_elapsed_time.append(
@@ -344,8 +341,8 @@ def get_metrics_sql_string(metrics_list, 
is_metrics_specified):
 def compute_loss_and_metrics(schema_madlib, table, dependent_varname,
                              independent_varname, compile_params, model_arch,
                              serialized_weights, gpus_per_host, 
segments_per_host,
-                             seg_ids, rows_per_seg,
-                             gp_segment_id_col, metrics_list, loss_list,
+                             seg_ids, images_per_seg_val,
+                             metrics_list, loss_list,
                              curr_iter, dataset_name):
     """
     Compute the loss and metric using a given model (serialized_weights) on the
@@ -353,17 +350,16 @@ def compute_loss_and_metrics(schema_madlib, table, 
dependent_varname,
     """
     start_val = time.time()
     evaluate_result = get_loss_metric_from_keras_eval(schema_madlib,
-                                                      table,
-                                                      dependent_varname,
-                                                      independent_varname,
-                                                      compile_params,
-                                                      model_arch,
-                                                      serialized_weights,
-                                                      gpus_per_host,
-                                                      segments_per_host,
-                                                      seg_ids,
-                                                      rows_per_seg,
-                                                      gp_segment_id_col)
+                                                   table,
+                                                   dependent_varname,
+                                                   independent_varname,
+                                                   compile_params,
+                                                   model_arch,
+                                                   serialized_weights,
+                                                   gpus_per_host,
+                                                   segments_per_host,
+                                                   seg_ids,
+                                                   images_per_seg_val)
     end_val = time.time()
     plpy.info("Time for evaluation in iteration {0}: {1} sec.". format(
         curr_iter, end_val - start_val))
@@ -404,8 +400,6 @@ def get_images_per_seg(source_table, dependent_varname):
     :param source_table:
     :param dependent_var:
     :return: Returns a string and two arrays
-    1. The appropriate string to use for querying segment number
-    ("gp_segment_id" for gpdb or "-1" for postgres).
     1. An array containing all the segment numbers in ascending order
     1. An array containing the total images on each of the segments in the
     segment array.
@@ -417,7 +411,6 @@ def get_images_per_seg(source_table, dependent_varname):
             """.format(dependent_varname, source_table))
         images_per_seg = [int(res[0]['images_per_seg'])]
         seg_ids = [0]
-        gp_segment_id_col = -1
     else:
         images_per_seg = plpy.execute(
             """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS 
images_per_seg
@@ -428,8 +421,7 @@ def get_images_per_seg(source_table, dependent_varname):
                    for each_segment in images_per_seg]
         images_per_seg = [int(each_segment["images_per_seg"])
             for each_segment in images_per_seg]
-        gp_segment_id_col = 'gp_segment_id'
-    return gp_segment_id_col, seg_ids, images_per_seg
+    return seg_ids, images_per_seg
 
 def fit_transition(state, dependent_var, independent_var, model_architecture,
                    compile_params, fit_params, current_seg_id, seg_ids,
@@ -478,15 +470,14 @@ def fit_transition(state, dependent_var, independent_var, 
model_architecture,
         total_images = images_per_seg[0]
     else:
         total_images = images_per_seg[seg_ids.index(current_seg_id)]
+
     if total_images == 0:
-        plpy.error('Got 0 rows. Expected at least 1.')
+        plpy.error('Total images is 0 in fit_transition on segment 
{0}'.format(current_seg_id))
 
     # Re-serialize the weights
     # Update image count, check if we are done
     if agg_image_count == total_images:
-        if total_images == 0:
-            plpy.error('Total images is 0')
-        # Once done with all images on a segment, we update weights
+       # Once done with all images on a segment, we update weights
         # with the total number of images here instead of the merge function.
         # The merge function only deals with aggregating them.
         updated_weights = [ total_images * w for w in updated_weights ]
@@ -580,8 +571,10 @@ def evaluate1(schema_madlib, model_table, test_table, 
id_col, model_arch_table,
 def get_loss_metric_from_keras_eval(schema_madlib, table, dependent_varname,
                                  independent_varname, compile_params,
                                  model_arch, serialized_weights, gpus_per_host,
-                                 segments_per_host, seg_ids, images_per_seg,
-                                 gp_segment_id_col):
+                                 segments_per_host, seg_ids, images_per_seg):
+
+    gp_segment_id_col = '0' if is_platform_pg() else 'gp_segment_id'
+
     """
     This function will call the internal keras evaluate function to get the 
loss
     and accuracy of each tuple which then gets averaged to get the final 
result.
@@ -591,19 +584,19 @@ def get_loss_metric_from_keras_eval(schema_madlib, table, 
dependent_varname,
     --  The right solution is either to change the datatype of the agg 
function from
     --  SMALLINT to INTEGER, or change the output of minibatch util to produce 
SMALLINT
     --  For the first, we should change fit_step also
-    SELECT ({schema_madlib}.internal_keras_evaluate(
-                {dependent_varname}::SMALLINT[],
-                {independent_varname}::REAL[],
-                $MAD${model_arch}$MAD$,
-                $1,
-                {compile_params},
-                {gp_segment_id_col},
-                ARRAY{seg_ids},
-                ARRAY{images_per_seg},
-                {gpus_per_host},
-                {segments_per_host}
-                )) AS loss_metric
-    FROM {table}
+    select ({schema_madlib}.internal_keras_evaluate(
+                                            {dependent_varname}::SMALLINT[],
+                                            {independent_varname}::REAL[],
+                                            $MAD${model_arch}$MAD$,
+                                            $1,
+                                            {compile_params},
+                                            {gp_segment_id_col},
+                                            ARRAY{seg_ids},
+                                            ARRAY{images_per_seg},
+                                            {gpus_per_host},
+                                            {segments_per_host}
+                                            )) as loss_metric
+        from {table}
     """.format(**locals()), ["bytea"])
     res = plpy.execute(evaluate_query, [serialized_weights])
     loss_metric = res[0]['loss_metric']
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 4db18b8..f18c63d 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -294,7 +294,7 @@ CREATE OR REPLACE FUNCTION 
MADLIB_SCHEMA.internal_keras_eval_transition(
 ) RETURNS REAL[3] AS $$
 PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras')
     return madlib_keras.internal_keras_eval_transition(**globals())
-$$ LANGUAGE plpythonu STRICT
+$$ LANGUAGE plpythonu
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_merge(
@@ -312,7 +312,7 @@ CREATE OR REPLACE FUNCTION 
MADLIB_SCHEMA.internal_keras_eval_final(
 PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras')
     return madlib_keras.internal_keras_eval_final(**globals())
 $$ LANGUAGE plpythonu
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `') STRICT;
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
 
 DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.internal_keras_evaluate(
                                        SMALLINT[],
diff --git 
a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
index b4d21fb..deff6cf 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
@@ -131,7 +131,7 @@ def get_images_per_seg_for_non_minibatched_data(table_name):
                 FROM {0}
             """.format(table_name))
         seg_ids = [0]
-        gp_segment_id_col = -1
+        gp_segment_id_col = '0'
     else:
         # Compute total buffers on each segment
         images_per_seg = plpy.execute(

Reply via email to