This is an automated email from the ASF dual-hosted git repository. njayaram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 0851fdf09824c93571aaa310a931f763defc2406 Author: Domino Valdano <[email protected]> AuthorDate: Thu May 9 12:15:03 2019 -0700 DL: Convert the keras_eval function from UDF to UDA JIRA:MADLIB-1332 This commit converts the internal_keras_evaluate from a UDF into a UDA. Input validation table is expected to be batched now instead of unbatched. This commit also does the following: 1. Rename variables for consistency & clean up. 2. total_images_per_seg is now images_per_seg, all_seg_ids is seg_ids. 3. Update dev check for evaluate. 4. Add unit tests for merge & final eval functions. 5. Update error-checking for fit and evaluate UDAs Closes #389 Co-authored-by: Jingyi Mei <[email protected]> --- .../modules/deep_learning/madlib_keras.py_in | 196 ++++++++------- .../modules/deep_learning/madlib_keras.sql_in | 91 +++++-- .../deep_learning/madlib_keras_validator.py_in | 2 +- .../modules/deep_learning/test/madlib_keras.sql_in | 10 +- .../test/unit_tests/test_madlib_keras.py_in | 277 ++++++++++++++++++++- 5 files changed, 450 insertions(+), 126 deletions(-) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in index d08bad6..e9b3654 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in @@ -40,7 +40,6 @@ from input_data_preprocessor import MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL from input_data_preprocessor import MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL from madlib_keras_helper import CLASS_VALUES_COLNAME from madlib_keras_helper import DEPENDENT_VARTYPE_COLNAME -from madlib_keras_helper import expand_input_dims from madlib_keras_helper import NORMALIZING_CONST_COLNAME from madlib_keras_validator import FitInputValidator from madlib_keras_wrapper import * @@ -54,7 +53,6 @@ from utilities.utilities import madlib_version from utilities.validate_args import get_col_value_and_type from utilities.validate_args import quote_ident - def fit(schema_madlib, source_table, model,model_arch_table, model_arch_id, compile_params, fit_params, num_iterations, gpus_per_host = 0, validation_table=None, name="", @@ -113,10 +111,11 @@ def fit(schema_madlib, source_table, model,model_arch_table, # Compute total images on each segment gp_segment_id_col,\ seg_ids_train,\ - total_images_per_seg = get_images_per_seg(source_table, dependent_varname) + images_per_seg_train = get_images_per_seg(source_table, dependent_varname) if validation_table: - seg_ids_val, rows_per_seg_val = get_rows_per_seg_from_db(validation_table) + _, seg_ids_val,\ + images_per_seg_val = get_images_per_seg(validation_table, dependent_varname) # Convert model from json and initialize weights master_model = model_from_json(model_arch) @@ -137,9 +136,6 @@ def fit(schema_madlib, source_table, model,model_arch_table, validation_set_provided = bool(validation_table) validation_aggregate_accuracy = []; validation_aggregate_loss = [] - total_images_per_seg = [int(each_segment["total_images_per_seg"]) - for each_segment in total_images_per_seg] - # Prepare the SQL for running distributed training via UDA compile_params_to_pass = "$madlib$" + compile_params + "$madlib$" fit_params_to_pass = "$madlib$" + fit_params + "$madlib$" @@ -150,7 +146,7 @@ def fit(schema_madlib, source_table, model,model_arch_table, {gp_segment_id_col}, {num_classes}::INTEGER, ARRAY{seg_ids_train}, - ARRAY{total_images_per_seg}, + ARRAY{images_per_seg_train}, $MAD${model_arch}$MAD$::TEXT, {compile_params_to_pass}::TEXT, {fit_params_to_pass}::TEXT, @@ -197,7 +193,7 @@ def fit(schema_madlib, source_table, model,model_arch_table, gpus_per_host, segments_per_host, seg_ids_val, - rows_per_seg_val, + images_per_seg_val, gp_segment_id_col) end_val = time.time() plpy.info("Time for validation in iteration {0}: {1} sec". format(i + 1, end_val - start_val)) @@ -327,59 +323,30 @@ def get_images_per_seg(source_table, dependent_varname): segment array. """ if is_platform_pg(): - total_images_per_seg = plpy.execute( - """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg + res = plpy.execute( + """ SELECT SUM(ARRAY_LENGTH({0}, 1)) AS images_per_seg FROM {1} """.format(dependent_varname, source_table)) - seg_ids_train = "[]::integer[]" - gp_segment_id_col = -1 + images_per_seg = [int(res[0]['images_per_seg'])] + seg_ids = [0] + gp_segment_id_col = -1 else: - total_images_per_seg = plpy.execute( - """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS total_images_per_seg + images_per_seg = plpy.execute( + """ SELECT gp_segment_id, SUM(ARRAY_LENGTH({0}, 1)) AS images_per_seg FROM {1} GROUP BY gp_segment_id """.format(dependent_varname, source_table)) - seg_ids_train = [int(each_segment["gp_segment_id"]) - for each_segment in total_images_per_seg] + seg_ids = [int(each_segment["gp_segment_id"]) + for each_segment in images_per_seg] + images_per_seg = [int(each_segment["images_per_seg"]) + for each_segment in images_per_seg] gp_segment_id_col = 'gp_segment_id' - return gp_segment_id_col, seg_ids_train, total_images_per_seg - -def get_rows_per_seg_from_db(table_name): - """ - This function queries the given table and returns the total rows per segment. - Since we cannot pass a dictionary to the keras fit step function we create arrays - out of the segment numbers and the rows per segment values. - This function assumes that the table is not empty. - :param table_name: - :return: Returns two arrays - 1. An array containing all the segment numbers in ascending order - 1. An array containing the total rows for each of the segments in the - segment array - """ - if is_platform_pg(): - rows = plpy.execute( - """ SELECT count(*) AS rows_per_seg - FROM {0} - """.format(table_name)) - seg_ids = "[]::integer[]" - else: - # Compute total buffers on each segment - rows = plpy.execute( - """ SELECT gp_segment_id, count(*) AS rows_per_seg - FROM {0} - GROUP BY gp_segment_id - """.format(table_name)) - seg_ids = [int(row["gp_segment_id"]) for row in rows] - - rows = [int(row["rows_per_seg"]) for row in rows] - return seg_ids, rows - + return gp_segment_id_col, seg_ids, images_per_seg def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes, - all_seg_ids, total_images_per_seg, architecture, + seg_ids, images_per_seg, architecture, compile_params, fit_params, gpus_per_host, segments_per_host, previous_state, **kwargs): - """ :param state: @@ -387,8 +354,8 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes, :param dep_var: :param current_seg_id: :param num_classes: - :param all_seg_ids: - :param total_images_per_seg: + :param seg_ids: + :param images_per_seg: :param architecture: :param compile_params: :param fit_params: @@ -447,9 +414,9 @@ def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes, with K.tf.device(device_name): updated_weights = segment_model.get_weights() if is_platform_pg(): - total_images = total_images_per_seg[0] + total_images = images_per_seg[0] else: - total_images = total_images_per_seg[all_seg_ids.index(current_seg_id)] + total_images = images_per_seg[seg_ids.index(current_seg_id)] if total_images == 0: plpy.error('Got 0 rows. Expected at least 1.') @@ -495,9 +462,6 @@ def fit_merge(state1, state2, **kwargs): # Compute total image counts image_count = (image_count1 + image_count2) * 1.0 - plpy.info("FIT_MERGE: Mergeing {0} + {1} = {2} images".format(image_count1,image_count2,image_count)) - if image_count == 0: - plpy.error('total images in merge is 0') # Aggregate the losses total_loss = loss1 + loss2 @@ -518,6 +482,9 @@ def fit_final(state, **kwargs): return state loss, accuracy, image_count, weights = madlib_keras_serializer.deserialize_weights_merge(state) + if image_count == 0: + plpy.error("fit_final: Total images processed is 0") + # Averaging the accuracy, loss and weights loss /= image_count accuracy /= image_count @@ -558,42 +525,45 @@ def evaluate1(schema_madlib, model_table, test_table, id_col, model_arch_table, plpy.info('evaluate result acc is {}'.format(loss_acc[1])) def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname, - independent_varname, compile_params, model_arch, - model_data, gpus_per_host, segments_per_host, - seg_ids_val, - rows_per_seg_val, gp_segment_id_col): + independent_varname, compile_params, + model_arch, model_data, gpus_per_host, + segments_per_host, seg_ids, images_per_seg, + gp_segment_id_col): """ This function will call the internal keras evaluate function to get the loss and accuracy of each tuple which then gets averaged to get the final result. """ evaluate_query = plpy.prepare(""" - select {schema_madlib}.array_avg(loss_acc, True) as final_loss_acc from - ( - select ({schema_madlib}.internal_keras_evaluate({dependent_varname}, - {independent_varname}, + -- TODO: really, we should not be casting integers and big integers to smallint's + -- The right solution is either to change the datatype of the agg function from + -- SMALLINT to INTEGER, or change the output of minibatch util to produce SMALLINT + -- For the first, we should change fit_step also + select ({schema_madlib}.internal_keras_evaluate({dependent_varname}::SMALLINT[], + {independent_varname}::REAL[], $MAD${model_arch}$MAD$, - $1, {compile_params}, + $1, + {compile_params}, {gpus_per_host}, {segments_per_host}, - ARRAY{seg_ids_val}, - ARRAY{rows_per_seg_val}, + ARRAY{seg_ids}, + ARRAY{images_per_seg}, {gp_segment_id_col})) as loss_acc from {table} - ) q""".format(**locals()), ["bytea"]) + """.format(**locals()), ["bytea"]) res = plpy.execute(evaluate_query, [model_data]) - loss_acc = res[0]['final_loss_acc'] + loss_acc = res[0]['loss_acc'] return loss_acc - -def internal_keras_evaluate(dependent_var, independent_var, model_architecture, - model_data, compile_params, gpus_per_host, - segments_per_host, seg_ids_val, - rows_per_seg_val, current_seg, **kwargs): +def internal_keras_eval_transition(state, dependent_var, independent_var, + model_architecture, model_data, compile_params, + gpus_per_host, segments_per_host, seg_ids, + images_per_seg, current_seg_id, **kwargs): SD = kwargs['SD'] - device_name = get_device_name_and_set_cuda_env(gpus_per_host, - current_seg) + device_name = get_device_name_and_set_cuda_env(gpus_per_host, current_seg_id) - if 'segment_model' not in SD: + agg_loss, agg_accuracy, agg_image_count = state + + if not agg_image_count: if not is_platform_pg(): set_keras_session(gpus_per_host, segments_per_host) model = model_from_json(model_architecture) @@ -604,32 +574,70 @@ def internal_keras_evaluate(dependent_var, independent_var, model_architecture, with K.tf.device(device_name): compile_model(model, compile_params) SD['segment_model'] = model - SD['row_count'] = 0 + # These should already be 0, but just in case make sure + agg_accuracy = 0 + agg_loss = 0 else: + # Same model every time, no need to re-compile or update weights model = SD['segment_model'] - SD['row_count'] += 1 - # Since the training data is batched but the validation data isn't, - # we have to make sure that the validation data np array has the same - # number of dimensions as training data. So we prepend a dimension to - # both x and y np arrays using expand_dims. - independent_var = expand_input_dims(independent_var, target_type='float32') - dependent_var = expand_input_dims(dependent_var) + x_val = np.array(independent_var) + y_val = np.array(dependent_var) with K.tf.device(device_name): - res = model.evaluate(independent_var, dependent_var) + res = model.evaluate(x_val, y_val) + + loss, accuracy = res + + image_count = len(dependent_var) + + agg_image_count += image_count + agg_loss += (image_count * loss) + agg_accuracy += (image_count * accuracy) + if is_platform_pg(): - total_rows = rows_per_seg_val[0] + total_images = images_per_seg[0] else: - total_rows = rows_per_seg_val[seg_ids_val.index(current_seg)] + total_images = images_per_seg[seg_ids.index(current_seg_id)] - if is_last_row_in_seg(SD['row_count'], total_rows): + if agg_image_count == total_images: SD.pop('segment_model', None) if not is_platform_pg(): clear_keras_session() + elif agg_image_count > total_images: + plpy.error("Evaluated too many images.") + + state[0] = agg_loss + state[1] = agg_accuracy + state[2] = agg_image_count + + return state - return res +def internal_keras_eval_merge(state1, state2, **kwargs): + # If either state is None, return the other one + if not state1 or not state2: + return state1 or state2 + loss1, accuracy1, image_count1 = state1 + loss2, accuracy2, image_count2 = state2 + + merged_loss = loss1 + loss2 + merged_accuracy = accuracy1 + accuracy2 + + total_image_count = image_count1 + image_count2 + + merged_state = [ merged_loss, merged_accuracy , total_image_count ] + + return merged_state + +def internal_keras_eval_final(state, **kwargs): + loss, accuracy, image_count = state + + if image_count == 0: + plpy.error("internal_keras_eval_final: Total images processed is 0") + + loss /= image_count + accuracy /= image_count -def is_last_row_in_seg(row_count, total_rows): - return row_count == total_rows + state = loss, accuracy, image_count + return state diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in index 1844a29..2d7170b 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in @@ -96,8 +96,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.fit_transition( dep_var SMALLINT[], current_seg_id INTEGER, num_classes INTEGER, - all_seg_ids INTEGER[], - total_images_per_seg INTEGER[], + seg_ids INTEGER[], + images_per_seg INTEGER[], architecture TEXT, compile_params TEXT, fit_params TEXT, @@ -147,8 +147,8 @@ CREATE AGGREGATE MADLIB_SCHEMA.fit_step( /* dep_var */ SMALLINT[], /* current_seg_id */ INTEGER, /* num_classes */ INTEGER, - /* all_seg_ids*/ INTEGER[], - /* total_buffers_per_seg*/ INTEGER[], + /* seg_ids*/ INTEGER[], + /* images_per_seg*/ INTEGER[], /* architecture */ TEXT, /* compile_params */ TEXT, /* fit_params */ TEXT, @@ -257,20 +257,69 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.madlib_keras_evaluate1( $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); -CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_evaluate( - dependent_var double precision [], - independent_var double precision [], - model_architecture TEXT, - model_data bytea, - compile_params TEXT, - gpus_per_host INTEGER, - segments_per_host INTEGER, - seg_ids_val INTEGER[], - rows_per_seg_val INTEGER[], - current_seg INTEGER -) RETURNS DOUBLE PRECISION[] AS $$ - PythonFunctionBodyOnly(`deep_learning', `madlib_keras') - with AOControl(False): - return madlib_keras.internal_keras_evaluate(**globals()) -$$ LANGUAGE plpythonu VOLATILE -m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_transition( + state REAL[3], + dependent_var SMALLINT[], + independent_var REAL[], + model_architecture TEXT, + model_data BYTEA, + compile_params TEXT, + gpus_per_host INTEGER, + segments_per_host INTEGER, + seg_ids INTEGER[], + images_per_seg INTEGER[], + current_seg_id INTEGER +) RETURNS REAL[3] AS $$ +PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras') + return madlib_keras.internal_keras_eval_transition(**globals()) +$$ LANGUAGE plpythonu STRICT +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_merge( + state1 REAL[3], + state2 REAL[3] +) RETURNS REAL[3] AS $$ +PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras') + return madlib_keras.internal_keras_eval_merge(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_eval_final( + state REAL[3] +) RETURNS REAL[2] AS $$ +PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras') + return madlib_keras.internal_keras_eval_final(**globals()) +$$ LANGUAGE plpythonu +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `NO SQL', `') STRICT; + +DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.internal_keras_evaluate( + SMALLINT[], + REAL[], + TEXT, + BYTEA, + TEXT, + BOOLEAN, + INTEGER[], + INTEGER[], + INTEGER +); + +CREATE AGGREGATE MADLIB_SCHEMA.internal_keras_evaluate( + /* dependent_var */ SMALLINT[], + /* independent_var */ REAL[], + /* model_architecture */ TEXT, + /* model_data */ BYTEA, + /* compile_params */ TEXT, + /* gpus_per_host */ INTEGER, + /* segments_per_host */ INTEGER, + /* seg_ids */ INTEGER[], + /* images_per_seg*/ INTEGER[], + /* current_seg_id */ INTEGER +)( + STYPE=REAL[3], + INITCOND='{0,0,0}', + SFUNC=MADLIB_SCHEMA.internal_keras_eval_transition, + m4_ifdef(`__POSTGRESQL__', `', `prefunc=MADLIB_SCHEMA.internal_keras_eval_merge,') + FINALFUNC=MADLIB_SCHEMA.internal_keras_eval_final +); + diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in index ee667d0..d66219c 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in @@ -235,4 +235,4 @@ class FitInputValidator: if self.validation_table: _validate_input_shapes( self.validation_table, self.independent_varname, - input_shape, 1) + input_shape, 2) diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in index 9d34303..404c39c 100644 --- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in +++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in @@ -26,10 +26,10 @@ copy cifar_10_sample from stdin delimiter '|'; \. drop table if exists cifar_10_sample_val; -create table cifar_10_sample_val(id INTEGER,dependent_var SMALLINT[], independent_var REAL[] ); +create table cifar_10_sample_val(independent_var REAL[], dependent_var INTEGER[], buffer_id SMALLINT); copy cifar_10_sample_val from stdin delimiter '|'; -1|{0,1}|{{{248,248,250},{245,245,246},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247},{245,245,247}},{{248,248,250},{245,245,247},{245 [...] -2|{1,0}|{{{103,115,158},{102,114,156},{102,114,156},{103,115,155},{103,115,155},{103,115,154},{103,115,155},{104,116,157},{104,116,158},{104,115,157},{103,115,157},{104,115,158},{103,114,157},{103,114,156},{106,117,157},{104,115,156},{103,114,156},{104,114,157},{103,114,157},{108,117,154},{104,110,147},{78,83,124},{98,108,150},{104,115,158},{107,115,155},{123,132,163},{138,145,172},{138,144,172},{141,145,173},{128,132,163},{106,114,152},{102,113,155}},{{104,116,158},{103,115,157},{103,11 [...] +{{{{0.494118,0.462745,0.431373},{0.478431,0.45098,0.423529},{0.494118,0.466667,0.435294},{0.498039,0.466667,0.427451},{0.509804,0.478431,0.435294},{0.509804,0.478431,0.435294},{0.517647,0.486275,0.443137},{0.521569,0.490196,0.447059},{0.509804,0.478431,0.435294},{0.517647,0.486275,0.443137},{0.52549,0.494118,0.45098},{0.513726,0.482353,0.439216},{0.513726,0.482353,0.439216},{0.52549,0.494118,0.45098},{0.521569,0.490196,0.447059},{0.533333,0.501961,0.458824},{0.537255,0.505882,0.462745},{ [...] +{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216},{0.85098,0.858 [...] \. -- normalize the indep variable -- TODO Calling this function makes keras.fit fail with the exception (investigate later) @@ -74,7 +74,7 @@ INSERT INTO cifar_10_sample_batched_summary values ( 'x', 'smallint', ARRAY[0,1], - 2, + 1, 255.0); DROP TABLE IF EXISTS model_arch; @@ -331,7 +331,7 @@ SELECT madlib_keras_fit( -- induce failure by passing a non numeric column create table cifar_10_sample_val_failure as select * from cifar_10_sample_val; alter table cifar_10_sample_val_failure rename dependent_var to dependent_var_original; -alter table cifar_10_sample_val_failure rename id to dependent_var; +alter table cifar_10_sample_val_failure rename buffer_id to dependent_var; DROP TABLE IF EXISTS keras_out, keras_out_summary; select assert(trap_error($TRAP$madlib_keras_fit( 'cifar_10_sample_batched', diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in index d6a8e2b..8e65662 100644 --- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in @@ -163,6 +163,7 @@ class MadlibKerasFitTestCase(unittest.TestCase): '/cpu:0', state.tostring(), self.model_shapes) k = {'SD': {'model_shapes': self.model_shapes}} k['SD']['segment_model'] = self.model + new_model_state = self.subject.fit_transition( state.tostring(), self.independent_var, self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), None, self.fit_params, 0, 4, 'dummy_previous_state', **k) @@ -192,8 +193,7 @@ class MadlibKerasFitTestCase(unittest.TestCase): state.extend(self.model_weights) state = np.array(state, dtype=np.float32) - multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights] - multiplied_weights = np.rint(multiplied_weights).astype(np.int) + multiplied_weights = mult(self.total_images_per_seg[0],self.model_weights) self.subject.compile_and_set_weights(self.model, self.compile_params, '/cpu:0', state.tostring(), self.model_shapes) @@ -229,8 +229,7 @@ class MadlibKerasFitTestCase(unittest.TestCase): state.extend(self.model_weights) state = np.array(state, dtype=np.float32) - multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights] - multiplied_weights = np.rint(multiplied_weights).astype(np.int) + multiplied_weights = mult(self.total_images_per_seg[0],self.model_weights) self.subject.compile_and_set_weights(self.model, self.compile_params, '/cpu:0', state.tostring(), self.model_shapes) @@ -384,10 +383,17 @@ class MadlibKerasFitTestCase(unittest.TestCase): self.assertAlmostEqual(self.accuracy, agg_accuracy,2) self.assertTrue((self.model_weights == weights).all()) - def fit_final_none(self): + def test_fit_final_none(self): result = self.subject.fit_final(None) self.assertEqual(result, None) + def test_fit_final_image_count_zero(self): + input_state = [0, 0, 0] + input_state.extend(self.model_weights) + input_state = np.array(input_state, dtype=np.float32) + + with self.assertRaises(plpy.PLPYException): + result = self.subject.fit_final(input_state.tostring()) class MadlibKerasWrapperTestCase(unittest.TestCase): def setUp(self): @@ -413,6 +419,7 @@ class MadlibKerasWrapperTestCase(unittest.TestCase): seg_id = -1 gpus_per_host = 3 + self.assertEqual('/gpu:0', self.subject.get_device_name_and_set_cuda_env( gpus_per_host, seg_id )) self.assertEqual('0,1,2', os.environ['CUDA_VISIBLE_DEVICES']) @@ -803,6 +810,266 @@ class MadlibKerasHelperTestCase(unittest.TestCase): self.subject.strip_trailing_nulls_from_class_values( [None, None])) + +class MadlibKerasEvaluationTestCase(unittest.TestCase): + def setUp(self): + self.plpy_mock = Mock(spec='error') + patches = { + 'plpy': plpy, + 'utilities.minibatch_preprocessing': Mock() + } + + self.plpy_mock_execute = MagicMock() + plpy.execute = self.plpy_mock_execute + + self.module_patcher = patch.dict('sys.modules', patches) + self.module_patcher.start() + import madlib_keras + self.subject = madlib_keras + + self.model = Sequential() + self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu', + input_shape=(1,1,1,), padding='same')) + self.model.add(Flatten()) + + self.compile_params = "optimizer=SGD(lr=0.01, decay=1e-6, nesterov=True), loss='categorical_crossentropy', metrics=['accuracy']" + self.model_weights = [3,4,5,6] + self.model_shapes = [] + for a in self.model.get_weights(): + self.model_shapes.append(a.shape) + + self.loss = 0.5947071313858032 + self.accuracy = 1.0 + self.all_seg_ids = [0,1,2] + + #self.model.evaluate = Mock(return_value = [self.loss, self.accuracy]) + + self.independent_var = [[[[0.5]]]] * 10 + self.dependent_var = [[0,1]] * 10 + # We test on segment 0, which has 3 buffers filled with 10 identical + # images each, or 30 images total + self.total_images_per_seg = [3*len(self.dependent_var),20,40] + + def tearDown(self): + self.module_patcher.stop() + + def _test_internal_keras_eval_transition_first_buffer(self, is_platform_pg): + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + self.subject.is_platform_pg = Mock(return_value = is_platform_pg) + starting_image_count = 0 + ending_image_count = len(self.dependent_var) + + k = {'SD' : {}} + state = [0,0,0] + + serialized_weights = [0, 0, 0] # not used + serialized_weights.extend(self.model_weights) + serialized_weights = np.array(serialized_weights, dtype=np.float32).tostring() + + new_state = self.subject.internal_keras_eval_transition( + state, self.dependent_var , self.independent_var, self.model.to_json(), serialized_weights, + self.compile_params, 0, 3, self.all_seg_ids, self.total_images_per_seg, + 0, **k) + + agg_loss, agg_accuracy, image_count = new_state + + self.assertEqual(ending_image_count, image_count) + # Call set_session once for gpdb (but not for postgres) + self.assertEqual(0 if is_platform_pg else 1, self.subject.K.set_session.call_count) + # loss and accuracy should be unchanged + self.assertAlmostEqual(self.loss * image_count, agg_loss, 4) + self.assertAlmostEqual(self.accuracy * image_count, agg_accuracy, 4) + # Clear session and sess.close must not get called for the first buffer + self.assertEqual(0, self.subject.clear_keras_session.call_count) + self.assertTrue(k['SD']['segment_model']) + + def _test_internal_keras_eval_transition_middle_buffer(self, is_platform_pg): + #TODO should we mock tensorflow's close_session and keras' + # clear_session instead of mocking the function `clear_keras_session` + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + self.subject.is_platform_pg = Mock(return_value = is_platform_pg) + + starting_image_count = len(self.dependent_var) + ending_image_count = starting_image_count + len(self.dependent_var) + + k = {'SD' : {}} + + model_state = [self.loss, self.accuracy, starting_image_count] + model_state.extend(self.model_weights) + model_state = np.array(model_state, dtype=np.float32) + + self.subject.compile_and_set_weights(self.model, self.compile_params, + '/cpu:0', model_state.tostring(), self.model_shapes) + + state = [self.loss * starting_image_count, self.accuracy * starting_image_count, starting_image_count] + k['SD']['segment_model'] = self.model + + new_state = self.subject.internal_keras_eval_transition( + state, self.dependent_var , self.independent_var, self.model.to_json(), 'dummy_model_data', + None, 0, 3, self.all_seg_ids, self.total_images_per_seg, + 0, **k) + + agg_loss, agg_accuracy, image_count = new_state + + self.assertEqual(ending_image_count, image_count) + # set_session is only called in first buffer, not here + self.assertEqual(0, self.subject.K.set_session.call_count) + # loss and accuracy should be unchanged + self.assertAlmostEqual(self.loss * ending_image_count, agg_loss, 4) + self.assertAlmostEqual(self.accuracy * ending_image_count, agg_accuracy, 4) + # Clear session and sess.close must not get called for the middle buffer + self.assertEqual(0, self.subject.clear_keras_session.call_count) + + def _test_internal_keras_eval_transition_last_buffer(self, is_platform_pg): + #TODO should we mock tensorflow's close_session and keras' + # clear_session instead of mocking the function `clear_keras_session` + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + self.subject.is_platform_pg = Mock(return_value = is_platform_pg) + + starting_image_count = 2*len(self.dependent_var) + ending_image_count = starting_image_count + len(self.dependent_var) + k = {'SD' : {}} + + model_state = [self.loss, self.accuracy, starting_image_count] + model_state.extend(self.model_weights) + model_state = np.array(model_state, dtype=np.float32) + + self.subject.compile_and_set_weights(self.model, self.compile_params, + '/cpu:0', model_state.tostring(), self.model_shapes) + + state = [self.loss * starting_image_count, self.accuracy * starting_image_count, starting_image_count] + + k['SD']['segment_model'] = self.model + new_state = self.subject.internal_keras_eval_transition( + state, self.dependent_var , self.independent_var, self.model.to_json(), 'dummy_model_data', + None, 0, 3, self.all_seg_ids, self.total_images_per_seg, + 0, **k) + + agg_loss, agg_accuracy, image_count = new_state + + self.assertEqual(ending_image_count, image_count) + # set_session is only called in first buffer, not here + self.assertEqual(0, self.subject.K.set_session.call_count) + # loss and accuracy should be unchanged + self.assertAlmostEqual(self.loss * ending_image_count, agg_loss, 4) + self.assertAlmostEqual(self.accuracy * ending_image_count, agg_accuracy, 4) + # Clear session and sess.close must get called for the last buffer in gpdb, + # but not in postgres + self.assertEqual(0 if is_platform_pg else 1, self.subject.clear_keras_session.call_count) + + def test_internal_keras_eval_transition_first_buffer_pg(self): + self._test_internal_keras_eval_transition_first_buffer(True) + + def test_internal_keras_eval_transition_first_buffer_gpdb(self): + self._test_internal_keras_eval_transition_first_buffer(False) + + def test_internal_keras_eval_transition_middle_buffer_pg(self): + self._test_internal_keras_eval_transition_middle_buffer(True) + + def test_internal_keras_eval_transition_middle_buffer_gpdb(self): + self._test_internal_keras_eval_transition_middle_buffer(False) + + def test_internal_keras_eval_transition_last_buffer_pg(self): + self._test_internal_keras_eval_transition_last_buffer(True) + + def test_internal_keras_eval_transition_last_buffer_gpdb(self): + self._test_internal_keras_eval_transition_last_buffer(False) + + def test_internal_keras_eval_merge(self): + image_count = self.total_images_per_seg[0] + state1 = [3.0*self.loss, 3.0*self.accuracy, image_count] + state1 = state1 + state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30] + state2 = state2 + merged_state = self.subject.internal_keras_eval_merge(state1,state2) + agg_loss = merged_state[0] + agg_accuracy = merged_state[1] + image_count_total = merged_state[2] + + self.assertEqual( 2*image_count+30 , image_count_total ) + self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2) + self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2) + + def test_internal_keras_eval_merge_none_first(self): + image_count = self.total_images_per_seg[0] + input_state = [self.loss, self.accuracy, image_count] + merged_state = self.subject.internal_keras_eval_merge(None, input_state) + agg_loss = merged_state[0] + agg_accuracy = merged_state[1] + image_count_total = merged_state[2] + + self.assertEqual(image_count, image_count_total) + self.assertAlmostEqual(self.loss, agg_loss, 2) + self.assertAlmostEqual(self.accuracy, agg_accuracy, 2) + + def test_internal_keras_eval_merge_none_second(self): + image_count = self.total_images_per_seg[0] + input_state = [self.loss, self.accuracy, image_count] + merged_state = self.subject.internal_keras_eval_merge(input_state, None) + agg_loss = merged_state[0] + agg_accuracy = merged_state[1] + image_count_total = merged_state[2] + + self.assertEqual(image_count, image_count_total) + self.assertAlmostEqual(self.loss, agg_loss, 2) + self.assertAlmostEqual(self.accuracy, agg_accuracy, 2) + + def test_internal_keras_eval_merge_both_none(self): + result = self.subject.internal_keras_eval_merge(None,None) + self.assertEqual(None, result) + + def test_internal_keras_eval_final(self): + image_count = self.total_images_per_seg[0] + input_state = [image_count*self.loss, image_count*self.accuracy, image_count] + + output_state = self.subject.internal_keras_eval_final(input_state) + agg_loss = output_state[0] + agg_accuracy = output_state[1] + image_count_output = output_state[2] + + self.assertEqual(image_count, image_count_output) + self.assertAlmostEqual(self.loss, agg_loss,2) + self.assertAlmostEqual(self.accuracy, agg_accuracy,2) + + def internal_keras_eval_final_none(self): + result = self.subject.internal_keras_eval_final(None) + self.assertEqual(result, None) + + def test_internal_keras_eval_transition_too_many_images(self): + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + + starting_image_count = 5 + + k = {'SD' : {}} + model_state = [self.loss, self.accuracy, starting_image_count] + model_state.extend(self.model_weights) + model_state = np.array(model_state, dtype=np.float32) + + self.subject.compile_and_set_weights(self.model, self.compile_params, + '/cpu:0', model_state.tostring(), self.model_shapes) + + state = [self.loss * starting_image_count, self.accuracy * starting_image_count, starting_image_count] + + k['SD']['segment_model'] = self.model + + total_images_per_seg = [10, 10, 10] + + with self.assertRaises(plpy.PLPYException): + self.subject.internal_keras_eval_transition( + state, self.dependent_var , self.independent_var, self.model.to_json(), 'dummy_model_data', + None, 0, 3, self.all_seg_ids, total_images_per_seg, + 0, **k) + + def test_internal_keras_eval_final_image_count_zero(self): + input_state = [0, 0, 0] + + with self.assertRaises(plpy.PLPYException): + result = self.subject.internal_keras_eval_final(input_state) + if __name__ == '__main__': unittest.main() # ---------------------------------------------------------------------
