This is an automated email from the ASF dual-hosted git repository. njayaram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 94e5a6ca8e7145d81111bb7b96ff2e6f241cc00a Author: Domino Valdano <[email protected]> AuthorDate: Thu Apr 25 18:14:28 2019 -0700 DL: Add new unit tests and update existing one's JIRA: MADLIB-1310 This commit also adds a commented SQL for creating validation data using minibatch_preprocessor_dl with batchsize as 1 - ensuring when we fix MADLIB-1326, we should have 2 rows as with 1 row, the fit_merge() function was never being called. This will be a better end-to-end test. Closes #378 Co-authored-by: Ekta Khanna <[email protected]> --- .../modules/deep_learning/test/madlib_keras.sql_in | 7 + .../test/unit_tests/test_madlib_keras.py_in | 287 +++++++++++++++------ 2 files changed, 214 insertions(+), 80 deletions(-) diff --git a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in index 81a088e..527d6e8 100644 --- a/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in +++ b/src/ports/postgres/modules/deep_learning/test/madlib_keras.sql_in @@ -50,6 +50,13 @@ copy cifar_10_sample_batched from stdin delimiter '|'; 0|{{0,1},{1,0}}|{{{{0.792157,0.8,0.780392},{0.792157,0.8,0.780392},{0.8,0.807843,0.788235},{0.807843,0.815686,0.796079},{0.815686,0.823529,0.803922},{0.819608,0.827451,0.807843},{0.823529,0.831373,0.811765},{0.831373,0.839216,0.823529},{0.835294,0.843137,0.831373},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.843137,0.85098,0.839216},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.843137},{0.847059,0.854902,0.839216},{0.85098,0.858824,0.839216 [...] \. +-- In order to test fit_merge, we need at least 2 rows in the batched table (1 on each segment). +-- As part of supporting Postgres, an issue was reported JIRA MADLIB-1326. +-- If we don't fix the bug, we should regenerate the batched table with this command +-- (and paste it into the file). (If we do fix the bug, we can just uncomment this line, +-- and remove the mocked output tables above.) +-- SELECT minibatch_preprocessor_dl('cifar_10_sample','cifar_10_sample_batched','y','x', 1, 255); + DROP TABLE IF EXISTS cifar_10_sample_batched_summary; CREATE TABLE cifar_10_sample_batched_summary( source_table text, diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in index 0c4072b..3d27e1b 100644 --- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in @@ -33,6 +33,10 @@ import plpy_mock as plpy m4_changequote(`<!', `!>') +# helper for multiplying array by int +def mult(k,arr): + return [ k*a for a in arr ] + class MadlibKerasFitTestCase(unittest.TestCase): def setUp(self): self.plpy_mock = Mock(spec='error') @@ -60,10 +64,15 @@ class MadlibKerasFitTestCase(unittest.TestCase): for a in self.model.get_weights(): self.model_shapes.append(a.shape) - self.loss = 1.3 - self.accuracy = 0.34 + self.loss = 13.0 + self.accuracy = 3.4 self.all_seg_ids = [0,1,2] - self.total_buffers_per_seg = [3,3,3] + + self.independent_var = [[[[0.5]]]] * 10 + self.dependent_var = [[0,1]] * 10 + # We test on segment 0, which has 3 buffers filled with 10 identical + # images each, or 30 images total + self.total_images_per_seg = [3*len(self.dependent_var),20,40] def tearDown(self): self.module_patcher.stop() @@ -76,23 +85,28 @@ class MadlibKerasFitTestCase(unittest.TestCase): self.subject.K.set_session = Mock() self.subject.clear_keras_session = Mock() self.subject.is_platform_pg = Mock(return_value = True) - buffer_count = 0 - previous_state = [self.loss, self.accuracy, buffer_count] + starting_image_count = 0 + ending_image_count = len(self.dependent_var) + previous_state = [self.loss, self.accuracy, starting_image_count] previous_state.extend(self.model_weights) previous_state = np.array(previous_state, dtype=np.float32) - k = {'SD': {'buffer_count': buffer_count}} + k = {'SD' : {}} + new_model_state = self.subject.fit_transition( - None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg, + None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), self.compile_params, self.fit_params, False, previous_state.tostring(), **k) - buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2] - self.assertEqual(1, buffer_count) - # set_session must get called ONLY once, when its the first buffer + state = np.fromstring(new_model_state, dtype=np.float32) + image_count = state[2] + weights = np.rint(state[3:]).astype(np.int) + self.assertEqual(ending_image_count, image_count) + # weights should not be modified yet + self.assertTrue((self.model_weights == weights).all()) + # set_session must be not be called in transition func for PG self.assertEqual(0, self.subject.K.set_session.call_count) # Clear session and sess.close must not get called for the first buffer self.assertEqual(0, self.subject.clear_keras_session.call_count) - self.assertEqual(1, k['SD']['buffer_count']) self.assertTrue(k['SD']['segment_model']) self.assertTrue(k['SD']['model_shapes']) @@ -104,115 +118,254 @@ class MadlibKerasFitTestCase(unittest.TestCase): self.subject.K.set_session = Mock() self.subject.clear_keras_session = Mock() self.subject.is_platform_pg = Mock(return_value = False) - buffer_count = 0 - previous_state = [self.loss, self.accuracy, buffer_count] + starting_image_count = 0 + ending_image_count = len(self.dependent_var) + previous_state = [self.loss, self.accuracy, starting_image_count] previous_state.extend(self.model_weights) previous_state = np.array(previous_state, dtype=np.float32) - k = {'SD': {'buffer_count': buffer_count}} + k = {'SD' : {}} + new_model_state = self.subject.fit_transition( - None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg, + None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), self.compile_params, self.fit_params, False, previous_state.tostring(), **k) - buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2] - self.assertEqual(1, buffer_count) + state = np.fromstring(new_model_state, dtype=np.float32) + image_count = state[2] + weights = np.rint(state[3:]).astype(np.int) + self.assertEqual(ending_image_count, image_count) + # weights should not be modified yet + self.assertTrue((self.model_weights == weights).all()) # set_session must get called ONLY once, when its the first buffer self.assertEqual(1, self.subject.K.set_session.call_count) # Clear session and sess.close must not get called for the first buffer self.assertEqual(0, self.subject.clear_keras_session.call_count) - self.assertEqual(1, k['SD']['buffer_count']) self.assertTrue(k['SD']['segment_model']) self.assertTrue(k['SD']['model_shapes']) - - def test_fit_transition_last_buffer_pass_pg(self): + def test_fit_transition_middle_buffer_pass(self): #TODO should we mock tensorflow's close_session and keras' # clear_session instead of mocking the function `clear_keras_session` self.subject.K.set_session = Mock() self.subject.clear_keras_session = Mock() - self.subject.is_platform_pg = Mock(return_value = True) + self.subject.is_platform_pg = Mock(return_value = False) - buffer_count = 2 + starting_image_count = len(self.dependent_var) + ending_image_count = starting_image_count + len(self.dependent_var) - state = [self.loss, self.accuracy, buffer_count] + state = [self.loss, self.accuracy, starting_image_count] state.extend(self.model_weights) state = np.array(state, dtype=np.float32) self.subject.compile_and_set_weights(self.model, self.compile_params, '/cpu:0', state.tostring(), self.model_shapes) - k = {'SD': {'buffer_count': buffer_count, - 'model_shapes': self.model_shapes}} + k = {'SD': {'model_shapes': self.model_shapes}} k['SD']['segment_model'] = self.model new_model_state = self.subject.fit_transition( - state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg, + state.tostring(), self.independent_var, self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k) - buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2] - self.assertEqual(3, buffer_count) + state = np.fromstring(new_model_state, dtype=np.float32) + image_count = state[2] + weights = np.rint(state[3:]).astype(np.int) + self.assertEqual(ending_image_count, image_count) + # weights should not be modified yet + self.assertTrue((self.model_weights == weights).all()) # set_session must get called ONLY once, when its the first buffer self.assertEqual(0, self.subject.K.set_session.call_count) - # Clear session and sess.close must not get called for the first buffer + # Clear session and sess.close must not get called for the middle buffer self.assertEqual(0, self.subject.clear_keras_session.call_count) - self.assertEqual(3, k['SD']['buffer_count']) - def test_fit_transition_last_buffer_pass_gpdb(self): + def test_fit_transition_last_buffer_pass_pg(self): #TODO should we mock tensorflow's close_session and keras' # clear_session instead of mocking the function `clear_keras_session` self.subject.K.set_session = Mock() self.subject.clear_keras_session = Mock() - self.subject.is_platform_pg = Mock(return_value = False) + self.subject.is_platform_pg = Mock(return_value = True) - buffer_count = 2 + starting_image_count = 2*len(self.dependent_var) + ending_image_count = starting_image_count + len(self.dependent_var) - state = [self.loss, self.accuracy, buffer_count] + state = [self.loss, self.accuracy, starting_image_count] state.extend(self.model_weights) state = np.array(state, dtype=np.float32) + multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights] + multiplied_weights = np.rint(multiplied_weights).astype(np.int) + self.subject.compile_and_set_weights(self.model, self.compile_params, '/cpu:0', state.tostring(), self.model_shapes) - k = {'SD': {'buffer_count': buffer_count, - 'model_shapes': self.model_shapes}} + k = {'SD': { 'model_shapes': self.model_shapes}} k['SD']['segment_model'] = self.model new_model_state = self.subject.fit_transition( - state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg, + state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k) - buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2] - self.assertEqual(3, buffer_count) - # set_session must get called ONLY once, when its the first buffer + state = np.fromstring(new_model_state, dtype=np.float32) + image_count = state[2] + weights = np.rint(state[3:]).astype(np.int) + self.assertEqual(ending_image_count, image_count) + # weights should be multiplied by final image count + self.assertTrue((multiplied_weights == weights).all()) + # set_session must be not be called in transition func for PG self.assertEqual(0, self.subject.K.set_session.call_count) - # Clear session and sess.close must not get called for the first buffer - self.assertEqual(1, self.subject.clear_keras_session.call_count) - self.assertEqual(3, k['SD']['buffer_count']) + # Clear session and sess.close must get called for the last buffer in gpdb, + # but not in postgres + self.assertEqual(0, self.subject.clear_keras_session.call_count) - def test_fit_transition_middle_buffer_pass(self): + def test_fit_transition_last_buffer_pass_gpdb(self): #TODO should we mock tensorflow's close_session and keras' # clear_session instead of mocking the function `clear_keras_session` self.subject.K.set_session = Mock() self.subject.clear_keras_session = Mock() + self.subject.is_platform_pg = Mock(return_value = False) - buffer_count = 1 + starting_image_count = 2*len(self.dependent_var) + ending_image_count = starting_image_count + len(self.dependent_var) - state = [self.loss, self.accuracy, buffer_count] + state = [self.loss, self.accuracy, starting_image_count] state.extend(self.model_weights) state = np.array(state, dtype=np.float32) + multiplied_weights = [ 1.0*self.total_images_per_seg[0]*w for w in self.model_weights] + multiplied_weights = np.rint(multiplied_weights).astype(np.int) + self.subject.compile_and_set_weights(self.model, self.compile_params, '/cpu:0', state.tostring(), self.model_shapes) - k = {'SD': {'buffer_count': buffer_count, - 'model_shapes': self.model_shapes}} + k = {'SD': { 'model_shapes': self.model_shapes}} k['SD']['segment_model'] = self.model new_model_state = self.subject.fit_transition( - state.tostring(), [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, self.total_buffers_per_seg, + state.tostring(), self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, self.total_images_per_seg, self.model.to_json(), None, self.fit_params, False, 'dummy_previous_state', **k) - buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2] - self.assertEqual(2, buffer_count) + state = np.fromstring(new_model_state, dtype=np.float32) + image_count = state[2] + weights = np.rint(state[3:]).astype(np.int) + self.assertEqual(ending_image_count, image_count) + # weights should be multiplied by final image count + self.assertTrue((multiplied_weights == weights).all()) # set_session must get called ONLY once, when its the first buffer self.assertEqual(0, self.subject.K.set_session.call_count) - # Clear session and sess.close must not get called for the first buffer - self.assertEqual(0, self.subject.clear_keras_session.call_count) - self.assertEqual(2, k['SD']['buffer_count']) + # Clear session and sess.close must get called for the last buffer in gpdb, + # but not in postgres + self.assertEqual(1, self.subject.clear_keras_session.call_count) + + def test_fit_transition_ending_image_count_zero(self): + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + starting_image_count = 0 + previous_state = [self.loss, self.accuracy, starting_image_count] + previous_state.extend(self.model_weights) + previous_state = np.array(previous_state, dtype=np.float32) + + k = {'SD' : {}} + + total_images_per_seg = [0,1,1] + + with self.assertRaises(plpy.PLPYException): + new_model_state = self.subject.fit_transition( + None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg, + self.model.to_json(), self.compile_params, self.fit_params, False, + previous_state.tostring(), **k) + + def test_fit_transition_too_many_images(self): + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + starting_image_count = 0 + previous_state = [self.loss, self.accuracy, starting_image_count] + previous_state.extend(self.model_weights) + previous_state = np.array(previous_state, dtype=np.float32) + + k = {'SD' : {}} + + total_images_per_seg = [1,1,1] + + with self.assertRaises(plpy.PLPYException): + new_model_state = self.subject.fit_transition( + None, self.independent_var , self.dependent_var, 0, 2, self.all_seg_ids, total_images_per_seg, + self.model.to_json(), self.compile_params, self.fit_params, False, + previous_state.tostring(), **k) + + + def test_fit_merge(self): + image_count = self.total_images_per_seg[0] + state1 = [3.0*self.loss, 3.0*self.accuracy, image_count] + state1.extend(mult(3,self.model_weights)) + state1 = np.array(state1, dtype=np.float32) + state2 = [2.0*self.loss, 2.0*self.accuracy, image_count+30] + state2.extend(mult(2,self.model_weights)) + state2 = np.array(state2, dtype=np.float32) + merged_state = self.subject.fit_merge(state1.tostring(),state2.tostring()) + state = np.fromstring(merged_state, dtype=np.float32) + agg_loss = state[0] + agg_accuracy = state[1] + image_count_total = state[2] + weights = np.rint(state[3:]).astype(np.int) + + self.assertEqual( 2*image_count+30 , image_count_total ) + self.assertAlmostEqual( 5.0*self.loss, agg_loss, 2) + self.assertAlmostEqual( 5.0*self.accuracy, agg_accuracy, 2) + self.assertTrue( (mult(5,self.model_weights) == weights).all()) + + def test_fit_merge_none_first(self): + image_count = self.total_images_per_seg[0] + input_state = [self.loss, self.accuracy, image_count] + input_state.extend(self.model_weights) + input_state = np.array(input_state, dtype=np.float32) + merged_state = self.subject.fit_merge(None, input_state.tostring()) + state = np.fromstring(merged_state, dtype=np.float32) + agg_loss = state[0] + agg_accuracy = state[1] + image_count_total = state[2] + weights = np.rint(state[3:]).astype(np.int) + + self.assertEqual(image_count, image_count_total) + self.assertAlmostEqual(self.loss, agg_loss, 2) + self.assertAlmostEqual(self.accuracy, agg_accuracy, 2) + self.assertTrue((self.model_weights == weights).all()) + + def test_fit_merge_none_second(self): + image_count = self.total_images_per_seg[0] + input_state = [self.loss, self.accuracy, image_count] + input_state.extend(self.model_weights) + input_state = np.array(input_state, dtype=np.float32) + merged_state = self.subject.fit_merge(input_state.tostring(), None) + state = np.fromstring(merged_state, dtype=np.float32) + agg_loss = state[0] + agg_accuracy = state[1] + image_count_total = state[2] + weights = np.rint(state[3:]).astype(np.int) + + self.assertEqual(image_count, image_count_total) + self.assertAlmostEqual(self.loss, agg_loss, 2) + self.assertAlmostEqual(self.accuracy, agg_accuracy, 2) + self.assertTrue((self.model_weights == weights).all()) + + def test_fit_merge_both_none(self): + result = self.subject.fit_merge(None,None) + self.assertEqual(None, result) + + def test_fit_final(self): + image_count = self.total_images_per_seg[0] + input_state = [image_count*self.loss, image_count*self.accuracy, image_count] + input_state.extend(mult(image_count,self.model_weights)) + input_state = np.array(input_state, dtype=np.float32) + + output_state = self.subject.fit_final(input_state.tostring()) + output_state = np.fromstring(output_state, dtype=np.float32) + agg_loss = output_state[0] + agg_accuracy = output_state[1] + image_count_output = output_state[2] + weights = np.rint(output_state[3:]).astype(np.int) + + self.assertEqual(image_count, image_count_output) + self.assertAlmostEqual(self.loss, agg_loss,2) + self.assertAlmostEqual(self.accuracy, agg_accuracy,2) + self.assertTrue((self.model_weights == weights).all()) + + def fit_final_none(self): + result = self.subject.fit_final(None) + self.assertEqual(result, None) def test_get_device_name_and_set_cuda_env(self): import os @@ -305,19 +458,6 @@ class MadlibKerasValidatorTestCase(unittest.TestCase): import madlib_keras_validator self.subject = madlib_keras_validator - self.model = Sequential() - self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu', - input_shape=(1,1,1,), padding='same')) - self.model.add(Flatten()) - - self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']" - self.fit_params = "'batch_size'=1, 'epochs'=1" - self.model_weights = [3,4,5,6] - self.loss = 1.3 - self.accuracy = 0.34 - self.all_seg_ids = [0,1,2] - self.total_buffers_per_seg = [3,3,3] - def tearDown(self): self.module_patcher.stop() @@ -359,19 +499,6 @@ class MadlibSerializerTestCase(unittest.TestCase): import madlib_keras_serializer self.subject = madlib_keras_serializer - self.model = Sequential() - self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu', - input_shape=(1,1,1,), padding='same')) - self.model.add(Flatten()) - - self.compile_params = "'optimizer'=SGD(lr=0.01, decay=1e-6, nesterov=True), 'loss'='categorical_crossentropy', 'metrics'=['accuracy']" - self.fit_params = "'batch_size'=1, 'epochs'=1" - self.model_weights = [3,4,5,6] - self.loss = 1.3 - self.accuracy = 0.34 - self.all_seg_ids = [0,1,2] - self.total_buffers_per_seg = [3,3,3] - def tearDown(self): self.module_patcher.stop()
