This is an automated email from the ASF dual-hosted git repository.

njayaram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 0851fdf09824c93571aaa310a931f763defc2406
Author: Domino Valdano <[email protected]>
AuthorDate: Thu May 9 12:15:03 2019 -0700

    DL: Convert the keras_eval function from UDF to UDA
    
    JIRA:MADLIB-1332
    This commit converts the internal_keras_evaluate from a UDF into a
    UDA. Input validation table is expected to be batched now instead
    of unbatched.
    
    This commit also does the following:
    1. Rename variables for consistency & clean up.
    2. total_images_per_seg is now images_per_seg, all_seg_ids is seg_ids.
    3. Update dev check for evaluate.
    4. Add unit tests for merge & final eval functions.
    5. Update error-checking for fit and evaluate UDAs
    
    Closes #389
    
    Co-authored-by: Jingyi Mei <[email protected]>
---
 .../modules/deep_learning/madlib_keras.py_in       | 196 ++++++++-------
 .../modules/deep_learning/madlib_keras.sql_in      |  91 +++++--
 .../deep_learning/madlib_keras_validator.py_in     |   2 +-
 .../modules/deep_learning/test/madlib_keras.sql_in |  10 +-
 .../test/unit_tests/test_madlib_keras.py_in        | 277 ++++++++++++++++++++-
 5 files changed, 450 insertions(+), 126 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index d08bad6..e9b3654 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -40,7 +40,6 @@ from input_data_preprocessor import 
MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL
 from input_data_preprocessor import MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL
 from madlib_keras_helper import CLASS_VALUES_COLNAME
 from madlib_keras_helper import DEPENDENT_VARTYPE_COLNAME
-from madlib_keras_helper import expand_input_dims
 from madlib_keras_helper import NORMALIZING_CONST_COLNAME
 from madlib_keras_validator import FitInputValidator
 from madlib_keras_wrapper import *
@@ -54,7 +53,6 @@ from utilities.utilities import madlib_version
 from utilities.validate_args import get_col_value_and_type
 from utilities.validate_args import quote_ident
 
-
 def fit(schema_madlib, source_table, model,model_arch_table,
         model_arch_id, compile_params, fit_params, num_iterations,
         gpus_per_host = 0, validation_table=None, name="",
@@ -113,10 +111,11 @@ def fit(schema_madlib, source_table, 
model,model_arch_table,
     # Compute total images on each segment
     gp_segment_id_col,\
     seg_ids_train,\
-    total_images_per_seg = get_images_per_seg(source_table, dependent_varname)
+    images_per_seg_train = get_images_per_seg(source_table, dependent_varname)
 
     if validation_table:
-        seg_ids_val, rows_per_seg_val = 
get_rows_per_seg_from_db(validation_table)
+        _, seg_ids_val,\
+        images_per_seg_val = get_images_per_seg(validation_table, 
dependent_varname)
 
     # Convert model from json and initialize weights
     master_model = model_from_json(model_arch)
@@ -137,9 +136,6 @@ def fit(schema_madlib, source_table, model,model_arch_table,
     validation_set_provided = bool(validation_table)
     validation_aggregate_accuracy = []; validation_aggregate_loss = []
 
-    total_images_per_seg = [int(each_segment["total_images_per_seg"])
-        for each_segment in total_images_per_seg]
-
     # Prepare the SQL for running distributed training via UDA
     compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
     fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
@@ -150,7 +146,7 @@ def fit(schema_madlib, source_table, model,model_arch_table,
             {gp_segment_id_col},
             {num_classes}::INTEGER,
             ARRAY{seg_ids_train},
-            ARRAY{total_images_per_seg},
+            ARRAY{images_per_seg_train},
             $MAD${model_arch}$MAD$::TEXT,
             {compile_params_to_pass}::TEXT,
             {fit_params_to_pass}::TEXT,
@@ -197,7 +193,7 @@ def fit(schema_madlib, source_table, model,model_arch_table,
                                                            gpus_per_host,
                                                            segments_per_host,
                                                            seg_ids_val,
-                                                           rows_per_seg_val,
+                                                           images_per_seg_val,
                                                            gp_segment_id_col)
             end_val = time.time()
             plpy.info("Time for validation in iteration {0}: {1} sec". 
format(i + 1, end_val - start_val))
@@ -327,59 +323,30 @@ def get_images_per_seg(source_table, dependent_varname):
     segment array.
     """
     if is_platform_pg():
-        total_images_per_seg = plpy.execute(
-            """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg
+        res = plpy.execute(
+            """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS images_per_seg
                 FROM {1}
             """.format(dependent_varname, source_table))
-        seg_ids_train = "[]::integer[]"
-        gp_segment_id_col =  -1
+        images_per_seg = [int(res[0]['images_per_seg'])]
+        seg_ids = [0]
+        gp_segment_id_col = -1
     else:
-        total_images_per_seg = plpy.execute(
-            """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS 
total_images_per_seg
+        images_per_seg = plpy.execute(
+            """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS 
images_per_seg
                 FROM {1}
                 GROUP BY gp_segment_id
             """.format(dependent_varname, source_table))
-        seg_ids_train = [int(each_segment["gp_segment_id"])
-                   for each_segment in total_images_per_seg]
+        seg_ids = [int(each_segment["gp_segment_id"])
+                   for each_segment in images_per_seg]
+        images_per_seg = [int(each_segment["images_per_seg"])
+            for each_segment in images_per_seg]
         gp_segment_id_col = 'gp_segment_id'
-    return gp_segment_id_col, seg_ids_train, total_images_per_seg
-
-def get_rows_per_seg_from_db(table_name):
-    """
-    This function queries the given table and returns the total rows per 
segment.
-    Since we cannot pass a dictionary to the keras fit step function we create 
arrays
-    out of the segment numbers and the rows per segment values.
-    This function assumes that the table is not empty.
-    :param table_name:
-    :return: Returns two arrays
-    1. An array containing all the segment numbers in ascending order
-    1. An array containing the total rows for each of the segments in the
-    segment array
-    """
-    if is_platform_pg():
-        rows = plpy.execute(
-            """ SELECT count(*) AS rows_per_seg
-                FROM {0}
-            """.format(table_name))
-        seg_ids = "[]::integer[]"
-    else:
-        # Compute total buffers on each segment
-        rows = plpy.execute(
-            """ SELECT gp_segment_id, count(*) AS rows_per_seg
-                FROM {0}
-                GROUP BY gp_segment_id
-            """.format(table_name))
-        seg_ids = [int(row["gp_segment_id"]) for row in rows]
-
-    rows = [int(row["rows_per_seg"]) for row in rows]
-    return seg_ids, rows
-
+    return gp_segment_id_col, seg_ids, images_per_seg
 
 def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
-                   all_seg_ids, total_images_per_seg, architecture,
+                   seg_ids, images_per_seg, architecture,
                    compile_params, fit_params, gpus_per_host, 
segments_per_host,
                    previous_state, **kwargs):
-
     """
 
     :param state:
@@ -387,8 +354,8 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, 
num_classes,
     :param dep_var:
     :param current_seg_id:
     :param num_classes:
-    :param all_seg_ids:
-    :param total_images_per_seg:
+    :param seg_ids:
+    :param images_per_seg:
     :param architecture:
     :param compile_params:
     :param fit_params:
@@ -447,9 +414,9 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, 
num_classes,
     with K.tf.device(device_name):
         updated_weights = segment_model.get_weights()
     if is_platform_pg():
-        total_images = total_images_per_seg[0]
+        total_images = images_per_seg[0]
     else:
-        total_images = total_images_per_seg[all_seg_ids.index(current_seg_id)]
+        total_images = images_per_seg[seg_ids.index(current_seg_id)]
     if total_images == 0:
         plpy.error('Got 0 rows. Expected at least 1.')
 
@@ -495,9 +462,6 @@ def fit_merge(state1, state2, **kwargs):
 
     # Compute total image counts
     image_count = (image_count1 + image_count2) * 1.0
-    plpy.info("FIT_MERGE: Mergeing {0} + {1} = {2} 
images".format(image_count1,image_count2,image_count))
-    if image_count == 0:
-        plpy.error('total images in merge is 0')
 
     # Aggregate the losses
     total_loss = loss1 + loss2
@@ -518,6 +482,9 @@ def fit_final(state, **kwargs):
         return state
 
     loss, accuracy, image_count, weights = 
madlib_keras_serializer.deserialize_weights_merge(state)
+    if image_count == 0:
+        plpy.error("fit_final: Total images processed is 0")
+
     # Averaging the accuracy, loss and weights
     loss /= image_count
     accuracy /= image_count
@@ -558,42 +525,45 @@ def evaluate1(schema_madlib, model_table, test_table, 
id_col, model_arch_table,
     plpy.info('evaluate result acc is {}'.format(loss_acc[1]))
 
 def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname,
-                                 independent_varname, compile_params, 
model_arch,
-                                 model_data, gpus_per_host, segments_per_host,
-                                 seg_ids_val,
-                                 rows_per_seg_val, gp_segment_id_col):
+                                 independent_varname, compile_params,
+                                 model_arch, model_data, gpus_per_host,
+                                 segments_per_host, seg_ids, images_per_seg,
+                                 gp_segment_id_col):
     """
     This function will call the internal keras evaluate function to get the 
loss
     and accuracy of each tuple which then gets averaged to get the final 
result.
     """
     evaluate_query = plpy.prepare("""
-    select {schema_madlib}.array_avg(loss_acc, True) as final_loss_acc from
-    (
-        select ({schema_madlib}.internal_keras_evaluate({dependent_varname},
-                                            {independent_varname},
+    -- TODO:  really, we should not be casting integers and big integers to 
smallint's
+    --  The right solution is either to change the datatype of the agg 
function from
+    --  SMALLINT to INTEGER, or change the output of minibatch util to produce 
SMALLINT
+    --  For the first, we should change fit_step also
+    select 
({schema_madlib}.internal_keras_evaluate({dependent_varname}::SMALLINT[],
+                                            {independent_varname}::REAL[],
                                             $MAD${model_arch}$MAD$,
-                                            $1, {compile_params},
+                                            $1,
+                                            {compile_params},
                                             {gpus_per_host},
                                             {segments_per_host},
-                                            ARRAY{seg_ids_val},
-                                            ARRAY{rows_per_seg_val},
+                                            ARRAY{seg_ids},
+                                            ARRAY{images_per_seg},
                                             {gp_segment_id_col})) as loss_acc
         from {table}
-    ) q""".format(**locals()), ["bytea"])
+    """.format(**locals()), ["bytea"])
     res = plpy.execute(evaluate_query, [model_data])
-    loss_acc = res[0]['final_loss_acc']
+    loss_acc = res[0]['loss_acc']
     return loss_acc
 
-
-def internal_keras_evaluate(dependent_var, independent_var, model_architecture,
-                            model_data, compile_params, gpus_per_host,
-                            segments_per_host, seg_ids_val,
-                            rows_per_seg_val, current_seg, **kwargs):
+def internal_keras_eval_transition(state, dependent_var, independent_var,
+                                   model_architecture, model_data, 
compile_params,
+                                   gpus_per_host, segments_per_host, seg_ids,
+                                   images_per_seg, current_seg_id, **kwargs):
     SD = kwargs['SD']
-    device_name = get_device_name_and_set_cuda_env(gpus_per_host,
-                                                   current_seg)
+    device_name = get_device_name_and_set_cuda_env(gpus_per_host, 
current_seg_id)
 
-    if 'segment_model' not in SD:
+    agg_loss, agg_accuracy, agg_image_count = state
+
+    if not agg_image_count:
         if not is_platform_pg():
             set_keras_session(gpus_per_host, segments_per_host)
         model = model_from_json(model_architecture)
@@ -604,32 +574,70 @@ def internal_keras_evaluate(dependent_var, 
independent_var, model_architecture,
         with K.tf.device(device_name):
             compile_model(model, compile_params)
         SD['segment_model'] = model
-        SD['row_count'] = 0
+        # These should already be 0, but just in case make sure
+        agg_accuracy = 0
+        agg_loss = 0
     else:
+        # Same model every time, no need to re-compile or update weights
         model = SD['segment_model']
-    SD['row_count'] += 1
 
-    # Since the training data is batched but the validation data isn't,
-    # we have to make sure that the validation data np array has the same
-    # number of dimensions as training data. So we prepend a dimension to
-    # both x and y np arrays using expand_dims.
-    independent_var = expand_input_dims(independent_var, target_type='float32')
-    dependent_var = expand_input_dims(dependent_var)
+    x_val = np.array(independent_var)
+    y_val = np.array(dependent_var)
 
     with K.tf.device(device_name):
-        res = model.evaluate(independent_var, dependent_var)
+        res = model.evaluate(x_val, y_val)
+
+    loss, accuracy = res
+
+    image_count = len(dependent_var)
+
+    agg_image_count += image_count
+    agg_loss += (image_count * loss)
+    agg_accuracy += (image_count * accuracy)
+
     if is_platform_pg():
-        total_rows = rows_per_seg_val[0]
+        total_images = images_per_seg[0]
     else:
-        total_rows = rows_per_seg_val[seg_ids_val.index(current_seg)]
+        total_images = images_per_seg[seg_ids.index(current_seg_id)]
 
-    if is_last_row_in_seg(SD['row_count'], total_rows):
+    if agg_image_count == total_images:
         SD.pop('segment_model', None)
         if not is_platform_pg():
             clear_keras_session()
+    elif agg_image_count > total_images:
+        plpy.error("Evaluated too many images.")
+
+    state[0] = agg_loss
+    state[1] = agg_accuracy
+    state[2] = agg_image_count
+
+    return state
 
-    return res
+def internal_keras_eval_merge(state1, state2, **kwargs):
+    # If either state is None, return the other one
+    if not state1 or not state2:
+        return state1 or state2
 
+    loss1, accuracy1, image_count1 = state1
+    loss2, accuracy2, image_count2 = state2
+
+    merged_loss = loss1 + loss2
+    merged_accuracy = accuracy1 + accuracy2
+
+    total_image_count = image_count1 + image_count2
+
+    merged_state = [ merged_loss, merged_accuracy , total_image_count ]
+
+    return merged_state
+
+def internal_keras_eval_final(state, **kwargs):
+    loss, accuracy, image_count = state
+
+    if image_count == 0:
+        plpy.error("internal_keras_eval_final: Total images processed is 0")
+
+    loss /= image_count
+    accuracy /= image_count
 
-def is_last_row_in_seg(row_count, total_rows):
-    return row_count == total_rows
+    state = loss, accuracy, image_count
+    return state
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 1844a29..2d7170b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -96,8 +96,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.fit_transition(
     dep_var                    SMALLINT[],
     current_seg_id             INTEGER,
     num_classes                INTEGER,
-    all_seg_ids                INTEGER[],
-    total_images_per_seg       INTEGER[],
+    seg_ids                    INTEGER[],
+    images_per_seg             INTEGER[],
     architecture               TEXT,
     compile_params             TEXT,
     fit_params                 TEXT,
@@ -147,8 +147,8 @@ CREATE AGGREGATE MADLIB_SCHEMA.fit_step(
     /* dep_var */                SMALLINT[],
     /* current_seg_id */         INTEGER,
     /* num_classes */            INTEGER,
-    /* all_seg_ids*/             INTEGER[],
-    /* total_buffers_per_seg*/   INTEGER[],
+    /* seg_ids*/                 INTEGER[],
+    /* images_per_seg*/          INTEGER[],
     /* architecture */           TEXT,
     /* compile_params */         TEXT,
     /* fit_params */             TEXT,
@@ -257,20 +257,69 @@ CREATE OR REPLACE FUNCTION 
MADLIB_SCHEMA.madlib_keras_evaluate1(
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
-CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_evaluate(
-   dependent_var double precision [],
-   independent_var double precision [],
-   model_architecture TEXT,
-   model_data bytea,
-   compile_params TEXT,
-   gpus_per_host INTEGER,
-   segments_per_host INTEGER,
-   seg_ids_val INTEGER[],
-   rows_per_seg_val INTEGER[],
-   current_seg INTEGER
-) RETURNS DOUBLE PRECISION[] AS $$
-    PythonFunctionBodyOnly(`deep_learning', `madlib_keras')
-    with AOControl(False):
-        return madlib_keras.internal_keras_evaluate(**globals())
-$$ LANGUAGE plpythonu VOLATILE
-m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_transition(
+    state                              REAL[3],
+    dependent_var                      SMALLINT[],
+    independent_var                    REAL[],
+    model_architecture                 TEXT,
+    model_data                         BYTEA,
+    compile_params                     TEXT,
+    gpus_per_host                      INTEGER,
+    segments_per_host                  INTEGER,
+    seg_ids                            INTEGER[],
+    images_per_seg                     INTEGER[],
+    current_seg_id                     INTEGER
+) RETURNS REAL[3] AS $$
+PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras')
+    return madlib_keras.internal_keras_eval_transition(**globals())
+$$ LANGUAGE plpythonu STRICT
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_merge(
+    state1          REAL[3],
+    state2          REAL[3]
+) RETURNS REAL[3] AS $$
+PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras')
+    return madlib_keras.internal_keras_eval_merge(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_final(
+    state REAL[3]
+) RETURNS REAL[2] AS $$
+PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras')
+    return madlib_keras.internal_keras_eval_final(**globals())
+$$ LANGUAGE plpythonu
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `') STRICT;
+
+DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.internal_keras_evaluate(
+                                       SMALLINT[],
+                                       REAL[],
+                                       TEXT,
+                                       BYTEA,
+                                       TEXT,
+                                       BOOLEAN,
+                                       INTEGER[],
+                                       INTEGER[],
+                                       INTEGER
+);
+
+CREATE AGGREGATE MADLIB_SCHEMA.internal_keras_evaluate(
+    /* dependent_var */                SMALLINT[],
+    /* independent_var */              REAL[],
+    /* model_architecture */           TEXT,
+    /* model_data */                   BYTEA,
+    /* compile_params */               TEXT,
+    /* gpus_per_host */                INTEGER,
+    /* segments_per_host */            INTEGER,
+    /* seg_ids */                      INTEGER[],
+    /* images_per_seg*/                INTEGER[],
+    /* current_seg_id */               INTEGER
+)(
+    STYPE=REAL[3],
+    INITCOND='{0,0,0}',
+    SFUNC=MADLIB_SCHEMA.internal_keras_eval_transition,
+    m4_ifdef(`__POSTGRESQL__', `', 
`prefunc=MADLIB_SCHEMA.internal_keras_eval_merge,')
+    FINALFUNC=MADLIB_SCHEMA.internal_keras_eval_final
+);
+
diff --git 
a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
index ee667d0..d66219c 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
@@ -235,4 +235,4 @@ class FitInputValidator:
         if self.validation_table:
             _validate_input_shapes(
                 self.validation_table, self.independent_varname,
-                input_shape, 1)
+                input_shape, 2)
diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in 
b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
index 9d34303..404c39c 100644
--- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in
@@ -26,10 +26,10 @@ copy cifar_10_sample from stdin delimiter '|';
 \.
 
 drop table if exists cifar_10_sample_val;
-create table cifar_10_sample_val(id INTEGER,dependent_var SMALLINT[], 
independent_var  REAL[] );
+create table cifar_10_sample_val(independent_var REAL[], dependent_var 
INTEGER[], buffer_id SMALLINT);
 copy cifar_10_sample_val from stdin delimiter '|';
-1|{0,1}|{{{248,248,250},{245,245,246},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247}},{{248,248,250},{245,245,247},{245
 [...]
-2|{1,0}|{{{103,115,158},{102,114,156},{102,114,156},{103,115,155},{103,115,155},{103,115,154},{103,115,155},{104,116,157},{104,116,158},{104,115,157},{103,115,157},{104,115,158},{103,114,157},{103,114,156},{106,117,157},{104,115,156},{103,114,156},{104,114,157},{103,114,157},{108,117,154},{104,110,147},{78,83,124},{98,108,150},{104,115,158},{107,115,155},{123,132,163},{138,145,172},{138,144,172},{141,145,173},{128,132,163},{106,114,152},{102,113,155}},{{104,116,158},{103,115,157},{103,11
 [...]
+{{{{0.494118,0.462745,0.431373},{0.478431,0.45098,0.423529},{0.494118,0.466667,0.435294},{0.498039,0.466667,0.427451},{0.509804,0.478431,0.435294},{0.509804,0.478431,0.435294},{0.517647,0.486275,0.443137},{0.521569,0.490196,0.447059},{0.509804,0.478431,0.435294},{0.517647,0.486275,0.443137},{0.52549,0.494118,0.45098},{0.513726,0.482353,0.439216},{0.513726,0.482353,0.439216},{0.52549,0.494118,0.45098},{0.521569,0.490196,0.447059},{0.533333,0.501961,0.458824},{0.537255,0.505882,0.462745},{
 [...]
+{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216},{0.85098,0.858
 [...]
 \.
 -- normalize the indep variable
 -- TODO Calling this function makes keras.fit fail with the exception 
(investigate later)
@@ -74,7 +74,7 @@ INSERT INTO cifar_10_sample_batched_summary values (
     'x',
     'smallint',
     ARRAY[0,1],
-    2,
+    1,
     255.0);
 
 DROP TABLE IF EXISTS model_arch;
@@ -331,7 +331,7 @@ SELECT madlib_keras_fit(
 -- induce failure by passing a non numeric column
 create table cifar_10_sample_val_failure as select * from cifar_10_sample_val;
 alter table cifar_10_sample_val_failure rename dependent_var to 
dependent_var_original;
-alter table cifar_10_sample_val_failure rename id to dependent_var;
+alter table cifar_10_sample_val_failure rename buffer_id to dependent_var;
 DROP TABLE IF EXISTS keras_out, keras_out_summary;
 select assert(trap_error($TRAP$madlib_keras_fit(
            'cifar_10_sample_batched',
diff --git 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index d6a8e2b..8e65662 100644
--- 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -163,6 +163,7 @@ class MadlibKerasFitTestCase(unittest.TestCase):
                                              '/cpu:0', state.tostring(), 
self.model_shapes)
         k = {'SD': {'model_shapes': self.model_shapes}}
         k['SD']['segment_model'] = self.model
+
         new_model_state = self.subject.fit_transition(
             state.tostring(), self.independent_var, self.dependent_var, 0, 2, 
self.all_seg_ids, self.total_images_per_seg,
             self.model.to_json(), None, self.fit_params, 0, 4, 
'dummy_previous_state', **k)
@@ -192,8 +193,7 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
-        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in 
self.model_weights]
-        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+        multiplied_weights = 
mult(self.total_images_per_seg[0],self.model_weights)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), 
self.model_shapes)
@@ -229,8 +229,7 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         state.extend(self.model_weights)
         state = np.array(state, dtype=np.float32)
 
-        multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in 
self.model_weights]
-        multiplied_weights = np.rint(multiplied_weights).astype(np.int)
+        multiplied_weights = 
mult(self.total_images_per_seg[0],self.model_weights)
 
         self.subject.compile_and_set_weights(self.model, self.compile_params,
                                              '/cpu:0', state.tostring(), 
self.model_shapes)
@@ -384,10 +383,17 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.assertAlmostEqual(self.accuracy, agg_accuracy,2)
         self.assertTrue((self.model_weights == weights).all())
 
-    def fit_final_none(self):
+    def test_fit_final_none(self):
         result = self.subject.fit_final(None)
         self.assertEqual(result, None)
 
+    def test_fit_final_image_count_zero(self):
+        input_state = [0, 0, 0]
+        input_state.extend(self.model_weights)
+        input_state = np.array(input_state, dtype=np.float32)
+
+        with self.assertRaises(plpy.PLPYException):
+            result = self.subject.fit_final(input_state.tostring())
 
 class MadlibKerasWrapperTestCase(unittest.TestCase):
     def setUp(self):
@@ -413,6 +419,7 @@ class MadlibKerasWrapperTestCase(unittest.TestCase):
 
         seg_id = -1
         gpus_per_host = 3
+
         self.assertEqual('/gpu:0', 
self.subject.get_device_name_and_set_cuda_env(
             gpus_per_host, seg_id ))
         self.assertEqual('0,1,2', os.environ['CUDA_VISIBLE_DEVICES'])
@@ -803,6 +810,266 @@ class MadlibKerasHelperTestCase(unittest.TestCase):
                          self.subject.strip_trailing_nulls_from_class_values(
                 [None, None]))
 
+
+class MadlibKerasEvaluationTestCase(unittest.TestCase):
+    def setUp(self):
+        self.plpy_mock = Mock(spec='error')
+        patches = {
+            'plpy': plpy,
+            'utilities.minibatch_preprocessing': Mock()
+        }
+
+        self.plpy_mock_execute = MagicMock()
+        plpy.execute = self.plpy_mock_execute
+
+        self.module_patcher = patch.dict('sys.modules', patches)
+        self.module_patcher.start()
+        import madlib_keras
+        self.subject = madlib_keras
+
+        self.model = Sequential()
+        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
+                         input_shape=(1,1,1,), padding='same'))
+        self.model.add(Flatten())
+
+        self.compile_params = "optimizer=SGD(lr=0.01, decay=1e-6, 
nesterov=True), loss='categorical_crossentropy', metrics=['accuracy']"
+        self.model_weights = [3,4,5,6]
+        self.model_shapes = []
+        for a in self.model.get_weights():
+            self.model_shapes.append(a.shape)
+
+        self.loss = 0.5947071313858032
+        self.accuracy = 1.0
+        self.all_seg_ids = [0,1,2]
+
+        #self.model.evaluate = Mock(return_value = [self.loss, self.accuracy])
+
+        self.independent_var = [[[[0.5]]]] * 10
+        self.dependent_var = [[0,1]] * 10
+        # We test on segment 0, which has 3 buffers filled with 10 identical
+        #  images each, or 30 images total
+        self.total_images_per_seg = [3*len(self.dependent_var),20,40]
+
+    def tearDown(self):
+        self.module_patcher.stop()
+
+    def _test_internal_keras_eval_transition_first_buffer(self, 
is_platform_pg):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = is_platform_pg)
+        starting_image_count = 0
+        ending_image_count = len(self.dependent_var)
+
+        k = {'SD' : {}}
+        state = [0,0,0]
+
+        serialized_weights = [0, 0, 0] # not used
+        serialized_weights.extend(self.model_weights)
+        serialized_weights = np.array(serialized_weights, 
dtype=np.float32).tostring()
+
+        new_state = self.subject.internal_keras_eval_transition(
+            state, self.dependent_var , self.independent_var, 
self.model.to_json(), serialized_weights,
+            self.compile_params, 0, 3, self.all_seg_ids, 
self.total_images_per_seg,
+            0, **k)
+
+        agg_loss, agg_accuracy, image_count = new_state
+
+        self.assertEqual(ending_image_count, image_count)
+        # Call set_session once for gpdb (but not for postgres)
+        self.assertEqual(0 if is_platform_pg else 1, 
self.subject.K.set_session.call_count)
+        # loss and accuracy should be unchanged
+        self.assertAlmostEqual(self.loss * image_count, agg_loss, 4)
+        self.assertAlmostEqual(self.accuracy * image_count, agg_accuracy, 4)
+        # Clear session and sess.close must not get called for the first buffer
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
+        self.assertTrue(k['SD']['segment_model'])
+
+    def _test_internal_keras_eval_transition_middle_buffer(self, 
is_platform_pg):
+        #TODO should we mock tensorflow's close_session and keras'
+        # clear_session instead of mocking the function `clear_keras_session`
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = is_platform_pg)
+
+        starting_image_count = len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
+
+        k = {'SD' : {}}
+
+        model_state = [self.loss, self.accuracy, starting_image_count]
+        model_state.extend(self.model_weights)
+        model_state = np.array(model_state, dtype=np.float32)
+
+        self.subject.compile_and_set_weights(self.model, self.compile_params,
+                                             '/cpu:0', model_state.tostring(), 
self.model_shapes)
+
+        state = [self.loss * starting_image_count, self.accuracy * 
starting_image_count, starting_image_count]
+        k['SD']['segment_model'] = self.model
+
+        new_state = self.subject.internal_keras_eval_transition(
+            state, self.dependent_var , self.independent_var, 
self.model.to_json(), 'dummy_model_data',
+            None, 0, 3, self.all_seg_ids, self.total_images_per_seg,
+            0, **k)
+
+        agg_loss, agg_accuracy, image_count = new_state
+
+        self.assertEqual(ending_image_count, image_count)
+        # set_session is only called in first buffer, not here
+        self.assertEqual(0, self.subject.K.set_session.call_count)
+         # loss and accuracy should be unchanged
+        self.assertAlmostEqual(self.loss * ending_image_count, agg_loss, 4)
+        self.assertAlmostEqual(self.accuracy * ending_image_count, 
agg_accuracy, 4)
+        # Clear session and sess.close must not get called for the middle 
buffer
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
+
+    def _test_internal_keras_eval_transition_last_buffer(self, is_platform_pg):
+        #TODO should we mock tensorflow's close_session and keras'
+        # clear_session instead of mocking the function `clear_keras_session`
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = is_platform_pg)
+
+        starting_image_count = 2*len(self.dependent_var)
+        ending_image_count = starting_image_count + len(self.dependent_var)
+        k = {'SD' : {}}
+
+        model_state = [self.loss, self.accuracy, starting_image_count]
+        model_state.extend(self.model_weights)
+        model_state = np.array(model_state, dtype=np.float32)
+
+        self.subject.compile_and_set_weights(self.model, self.compile_params,
+                                             '/cpu:0', model_state.tostring(), 
self.model_shapes)
+
+        state = [self.loss * starting_image_count, self.accuracy * 
starting_image_count, starting_image_count]
+
+        k['SD']['segment_model'] = self.model
+        new_state = self.subject.internal_keras_eval_transition(
+            state, self.dependent_var , self.independent_var, 
self.model.to_json(), 'dummy_model_data',
+            None, 0, 3, self.all_seg_ids, self.total_images_per_seg,
+            0, **k)
+
+        agg_loss, agg_accuracy, image_count = new_state
+
+        self.assertEqual(ending_image_count, image_count)
+        # set_session is only called in first buffer, not here
+        self.assertEqual(0, self.subject.K.set_session.call_count)
+        # loss and accuracy should be unchanged
+        self.assertAlmostEqual(self.loss * ending_image_count, agg_loss, 4)
+        self.assertAlmostEqual(self.accuracy * ending_image_count, 
agg_accuracy, 4)
+        # Clear session and sess.close must get called for the last buffer in 
gpdb,
+        #  but not in postgres
+        self.assertEqual(0 if is_platform_pg else 1, 
self.subject.clear_keras_session.call_count)
+
+    def test_internal_keras_eval_transition_first_buffer_pg(self):
+        self._test_internal_keras_eval_transition_first_buffer(True)
+
+    def test_internal_keras_eval_transition_first_buffer_gpdb(self):
+        self._test_internal_keras_eval_transition_first_buffer(False)
+
+    def test_internal_keras_eval_transition_middle_buffer_pg(self):
+        self._test_internal_keras_eval_transition_middle_buffer(True)
+
+    def test_internal_keras_eval_transition_middle_buffer_gpdb(self):
+        self._test_internal_keras_eval_transition_middle_buffer(False)
+
+    def test_internal_keras_eval_transition_last_buffer_pg(self):
+        self._test_internal_keras_eval_transition_last_buffer(True)
+
+    def test_internal_keras_eval_transition_last_buffer_gpdb(self):
+        self._test_internal_keras_eval_transition_last_buffer(False)
+
+    def test_internal_keras_eval_merge(self):
+        image_count = self.total_images_per_seg[0]
+        state1 = [3.0*self.loss, 3.0*self.accuracy, image_count]
+        state1 = state1
+        state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30]
+        state2 = state2
+        merged_state = self.subject.internal_keras_eval_merge(state1,state2)
+        agg_loss = merged_state[0]
+        agg_accuracy = merged_state[1]
+        image_count_total = merged_state[2]
+
+        self.assertEqual( 2*image_count+30 , image_count_total )
+        self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2)
+        self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2)
+
+    def test_internal_keras_eval_merge_none_first(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        merged_state = self.subject.internal_keras_eval_merge(None, 
input_state)
+        agg_loss = merged_state[0]
+        agg_accuracy = merged_state[1]
+        image_count_total = merged_state[2]
+
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+
+    def test_internal_keras_eval_merge_none_second(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [self.loss, self.accuracy, image_count]
+        merged_state = self.subject.internal_keras_eval_merge(input_state, 
None)
+        agg_loss = merged_state[0]
+        agg_accuracy = merged_state[1]
+        image_count_total = merged_state[2]
+
+        self.assertEqual(image_count, image_count_total)
+        self.assertAlmostEqual(self.loss, agg_loss, 2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy, 2)
+
+    def test_internal_keras_eval_merge_both_none(self):
+        result = self.subject.internal_keras_eval_merge(None,None)
+        self.assertEqual(None, result)
+
+    def test_internal_keras_eval_final(self):
+        image_count = self.total_images_per_seg[0]
+        input_state = [image_count*self.loss, image_count*self.accuracy, 
image_count]
+
+        output_state = self.subject.internal_keras_eval_final(input_state)
+        agg_loss = output_state[0]
+        agg_accuracy = output_state[1]
+        image_count_output = output_state[2]
+
+        self.assertEqual(image_count, image_count_output)
+        self.assertAlmostEqual(self.loss, agg_loss,2)
+        self.assertAlmostEqual(self.accuracy, agg_accuracy,2)
+
+    def internal_keras_eval_final_none(self):
+        result = self.subject.internal_keras_eval_final(None)
+        self.assertEqual(result, None)
+
+    def test_internal_keras_eval_transition_too_many_images(self):
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+
+        starting_image_count = 5
+
+        k = {'SD' : {}}
+        model_state = [self.loss, self.accuracy, starting_image_count]
+        model_state.extend(self.model_weights)
+        model_state = np.array(model_state, dtype=np.float32)
+
+        self.subject.compile_and_set_weights(self.model, self.compile_params,
+                                             '/cpu:0', model_state.tostring(), 
self.model_shapes)
+
+        state = [self.loss * starting_image_count, self.accuracy * 
starting_image_count, starting_image_count]
+
+        k['SD']['segment_model'] = self.model
+
+        total_images_per_seg = [10, 10, 10]
+
+        with self.assertRaises(plpy.PLPYException):
+            self.subject.internal_keras_eval_transition(
+                state, self.dependent_var , self.independent_var, 
self.model.to_json(), 'dummy_model_data',
+                None, 0, 3, self.all_seg_ids, total_images_per_seg,
+                0, **k)
+
+    def test_internal_keras_eval_final_image_count_zero(self):
+        input_state = [0, 0, 0]
+
+        with self.assertRaises(plpy.PLPYException):
+            result = self.subject.internal_keras_eval_final(input_state)
+
 if __name__ == '__main__':
     unittest.main()
 # ---------------------------------------------------------------------

Reply via email to