This is an automated email from the ASF dual-hosted git repository. nkak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit 642e5a501166ef44287eeee58041fd74f4e4d975 Author: Nikhil Kak <[email protected]> AuthorDate: Mon May 13 17:02:02 2019 -0700 DL: Improve performance for predict JIRA: MADLIB-1343 Performance improvements 1. Using SD to cache the model and set the weights only once for the first row for each segment. This also meant that we had to clear the SD for the last row for each segment. 2. We replaced `PythonFunctionBodyOnly` with `PythonFunctionBodyOnlyNoSchema` in the internal keras predict sql. Using `PythonFunctionBodyOnly` made the query much slower because it added the overhead of executing the schema query for every row in the test table. We don't really need to know the schema name for the internal UDF so now we use `PythonFunctionBodyOnlyNoSchema` instead. Additionally: 1. Replace the use of predict_classes and proba with predict since non sequential models do not support predict_classes. 2. Modify the internal keras predict query to not join the test table and the model table because it caused weird inconsistencies with the segment id due to which SD was not getting set/cleared properly. 3. Add try catch in the internal predict UDF so that we can clear out the SD in case of an error. Closes #392 Co-authored-by: Ekta Khanna <[email protected]> --- .../modules/deep_learning/madlib_keras.py_in | 1 - .../modules/deep_learning/madlib_keras.sql_in | 31 ++--- .../deep_learning/madlib_keras_predict.py_in | 153 ++++++++++++++++----- .../test/unit_tests/test_madlib_keras.py_in | 105 ++++++++++++++ 4 files changed, 236 insertions(+), 54 deletions(-) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in index 40409f8..82384d4 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in @@ -440,7 +440,6 @@ def fit_transition(state, dependent_var, independent_var, model_architecture, compile_and_set_weights(segment_model, compile_params, device_name, previous_state, SD['model_shapes']) SD['segment_model'] = segment_model - image_count = 0 agg_loss = 0 agg_accuracy = 0 agg_image_count = 0 diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in index 5d21ef8..12bcb39 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in @@ -237,26 +237,19 @@ $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA'); CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_predict( - independent_var DOUBLE PRECISION [], - model_architecture TEXT, - model_data BYTEA, - input_shape INTEGER[], - is_response BOOLEAN, - normalizing_const DOUBLE PRECISION, - gpus_per_host INTEGER, - seg INTEGER + independent_var REAL[], + model_architecture TEXT, + model_data BYTEA, + is_response BOOLEAN, + normalizing_const DOUBLE PRECISION, + current_seg_id INTEGER, + seg_ids INTEGER[], + images_per_seg INTEGER[], + gpus_per_host INTEGER, + segments_per_host INTEGER ) RETURNS DOUBLE PRECISION[] AS $$ - PythonFunctionBodyOnly(`deep_learning', `madlib_keras_predict') - with AOControl(False): - return madlib_keras_predict.internal_keras_predict( - independent_var, - model_architecture, - model_data, - input_shape, - is_response, - normalizing_const, - gpus_per_host, - seg) + PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras_predict') + return madlib_keras_predict.internal_keras_predict(**globals()) $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in index 75295bf..eb42bf7 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in @@ -37,10 +37,11 @@ from predict_input_params import PredictParamsProcessor from utilities.model_arch_info import get_input_shape from utilities.utilities import add_postfix from utilities.utilities import create_cols_from_array_sql_string +from utilities.utilities import get_segments_per_host from utilities.utilities import is_platform_pg from utilities.utilities import unique_string -import madlib_keras_serializer +from madlib_keras_wrapper import * MODULE_NAME = 'madlib_keras_predict' @@ -78,49 +79,133 @@ def predict(schema_madlib, model_table, test_table, id_col, class_values, intermediate_col, pred_col_name, pred_col_type, is_response, MODULE_NAME) - segment_id = -1 if is_platform_pg() else '{0}.gp_segment_id'.format( - test_table) + gp_segment_id_col, seg_ids_test, \ + images_per_seg_test = get_images_per_seg_for_non_minibatched_data(test_table) + segments_per_host = get_segments_per_host() - plpy.execute(""" + if is_platform_pg(): + set_keras_session(gpus_per_host, segments_per_host) + else: + # we want to disable gpu on gpdb's master node because GPUs will only be used + # for segment nodes. + set_cuda_env('-1') + + predict_query = plpy.prepare(""" CREATE TABLE {output_table} AS SELECT {id_col}, {prediction_select_clause} FROM ( SELECT {test_table}.{id_col}, ({schema_madlib}.internal_keras_predict ({independent_varname}, - $MAD${model_arch}$MAD$, - {0}, - ARRAY{input_shape}, + $1, + $2, {is_response}, {normalizing_const}, + {gp_segment_id_col}, + ARRAY{seg_ids_test}, + ARRAY{images_per_seg_test}, {gpus_per_host}, - {segment_id}) + {segments_per_host}) ) AS {intermediate_col} - FROM {test_table}, {model_table} + FROM {test_table} ) q - """.format(MODEL_DATA_COLNAME, **locals())) - -def internal_keras_predict(x_test, model_arch, model_data, input_shape, - is_response, normalizing_const, gpus_per_host, seg): - model = model_from_json(model_arch) - device_name = get_device_name_and_set_cuda_env(gpus_per_host, seg) - model_shapes = madlib_keras_serializer.get_model_shapes(model) - set_model_weights(model, device_name, model_data, model_shapes) - # Since the test data isn't mini-batched, - # we have to make sure that the test data np array has the same - # number of dimensions as input_shape. So we add a dimension to x. - x_test = expand_input_dims(x_test, target_type='float32') - x_test /= normalizing_const - if is_response: - proba_argmax = model.predict_classes(x_test) - # proba_argmax is a list with exactly one element in it. That element - # refers to the index containing the largest probability value in the - # output of Keras' predict function. - return proba_argmax + """.format(**locals()), ["text", "bytea"]) + plpy.execute(predict_query, [model_arch, model_data]) + + if is_platform_pg(): + clear_keras_session() + +def get_images_per_seg_for_non_minibatched_data(table_name): + """ + This function queries the given table and returns the total rows per segment. + Since we cannot pass a dictionary to the keras fit step function we create arrays + out of the segment numbers and the rows per segment values. + This function assumes that the table is not empty. + :param table_name: + :return: gp segment id col name and two arrays + 1. An array containing all the segment numbers in ascending order + 2. An array containing the total rows for each of the segments in the + segment array + """ + if is_platform_pg(): + images_per_seg = plpy.execute( + """ SELECT count(*) AS images_per_seg + FROM {0} + """.format(table_name)) + seg_ids = [0] + gp_segment_id_col = -1 else: - probs = model.predict_proba(x_test) - # probs is a list containing a list of probability values, of all - # class levels. Since we are assuming each input is a single image, - # and not mini-batched, this list contains exactly one list in it, - # so return back the first list in probs. - return probs[0] + # Compute total buffers on each segment + images_per_seg = plpy.execute( + """ SELECT gp_segment_id, count(*) AS images_per_seg + FROM {0} + GROUP BY gp_segment_id + """.format(table_name)) + seg_ids = [int(image["gp_segment_id"]) for image in images_per_seg] + gp_segment_id_col = '{0}.gp_segment_id'.format(table_name) + + images_per_seg = [int(image["images_per_seg"]) for image in images_per_seg] + return gp_segment_id_col, seg_ids, images_per_seg + +def internal_keras_predict(independent_var, model_architecture, model_data, + is_response, normalizing_const, current_seg_id, seg_ids, + images_per_seg, gpus_per_host, segments_per_host, + **kwargs): + SD = kwargs['SD'] + model_key = 'segment_model_predict' + row_count_key = 'row_count' + try: + device_name = get_device_name_and_set_cuda_env(gpus_per_host, + current_seg_id) + if model_key not in SD: + if not is_platform_pg(): + set_keras_session(gpus_per_host, segments_per_host) + model = model_from_json(model_architecture) + model_shapes = madlib_keras_serializer.get_model_shapes(model) + set_model_weights(model, device_name, model_data, model_shapes) + SD[model_key] = model + SD[row_count_key] = 0 + else: + model = SD[model_key] + SD[row_count_key] += 1 + + # Since the test data isn't mini-batched, + # we have to make sure that the test data np array has the same + # number of dimensions as input_shape. So we add a dimension to x. + independent_var = expand_input_dims(independent_var, target_type='float32') + independent_var /= normalizing_const + + if is_response: + with K.tf.device(device_name): + y_prob = model.predict(independent_var) + proba_argmax = y_prob.argmax(axis=-1) + # proba_argmax is a list with exactly one element in it. That element + # refers to the index containing the largest probability value in the + # output of Keras' predict function. + result = proba_argmax + else: + with K.tf.device(device_name): + probs = model.predict(independent_var) + # probs is a list containing a list of probability values, of all + # class levels. Since we are assuming each input is a single image, + # and not mini-batched, this list contains exactly one list in it, + # so return back the first list in probs. + result = probs[0] + + if is_platform_pg(): + total_images = images_per_seg[0] + else: + total_images = images_per_seg[seg_ids.index(current_seg_id)] + + if SD[row_count_key] == total_images: + SD.pop(model_key, None) + SD.pop(row_count_key, None) + if not is_platform_pg(): + clear_keras_session() + return result + except Exception as ex: + SD.pop(model_key, None) + SD.pop(row_count_key, None) + if not is_platform_pg(): + clear_keras_session() + plpy.error(ex) diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in index 51c3afa..ea17c79 100644 --- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in +++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in @@ -417,6 +417,111 @@ class MadlibKerasFitTestCase(unittest.TestCase): with self.assertRaises(plpy.PLPYException): result = self.subject.fit_final(input_state.tostring()) +class MadlibKerasPredictTestCase(unittest.TestCase): + def setUp(self): + self.plpy_mock = Mock(spec='error') + patches = { + 'plpy': plpy + } + + self.plpy_mock_execute = MagicMock() + plpy.execute = self.plpy_mock_execute + + self.module_patcher = patch.dict('sys.modules', patches) + self.module_patcher.start() + import madlib_keras_predict + self.subject = madlib_keras_predict + + self.model = Sequential() + self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu', + input_shape=(1,1,1,), padding='same')) + self.model.add(Flatten()) + + self.all_seg_ids = [0,1,2] + + self.independent_var = [[[240]]] + self.total_images_per_seg = [3,3,4] + self.subject.K.set_session = Mock() + self.subject.clear_keras_session = Mock() + + + def tearDown(self): + self.module_patcher.stop() + + def test_predict_first_image_pass_gpdb(self): + self.subject.is_platform_pg = Mock(return_value = False) + model_weights = [1,2,3,4,5,6] + serialized_weights = [0, 0, 0] # not used + serialized_weights.extend(model_weights) + serialized_weights = np.array(serialized_weights, dtype=np.float32).tostring() + + k = {'SD': {}} + is_response = True + result = self.subject.internal_keras_predict( + self.independent_var, self.model.to_json(), + serialized_weights, is_response, 255, 0, self.all_seg_ids, + self.total_images_per_seg, 0, 4, **k) + self.assertEqual(1, len(result)) + self.assertEqual(1, k['SD']['row_count']) + self.assertEqual(True, 'segment_model_predict' in k['SD']) + + def test_predict_middle_image_pass_gpdb(self): + self.subject.is_platform_pg = Mock(return_value = False) + + k = {'SD': { 'row_count': 1}} + k['SD']['segment_model_predict'] = self.model + is_response = True + result = self.subject.internal_keras_predict( + self.independent_var, None, None, is_response, 255, 0, + self.all_seg_ids, self.total_images_per_seg, 0, 4, **k) + self.assertEqual(1, len(result)) + self.assertEqual(2, k['SD']['row_count']) + self.assertEqual(True, 'segment_model_predict' in k['SD']) + + def test_predict_last_image_pass_gpdb(self): + self.subject.is_platform_pg = Mock(return_value = False) + self.model.add(Dense(3)) + + k = {'SD': { 'row_count': 2}} + k['SD']['segment_model_predict'] = self.model + is_response = True + result = self.subject.internal_keras_predict( + self.independent_var, None, None, is_response, 255, 0, + self.all_seg_ids, self.total_images_per_seg, 0, 4, **k) + self.assertEqual(1, len(result)) + self.assertEqual(False, 'row_count' in k['SD']) + self.assertEqual(False, 'segment_model_predict' in k['SD']) + + k = {'SD': { 'row_count': 2}} + k['SD']['segment_model_predict'] = self.model + is_response = False + result = self.subject.internal_keras_predict( + self.independent_var, None, None, is_response, 255, 0, + self.all_seg_ids, self.total_images_per_seg, 0, 4, **k) + + # we except len(result) to be 3 because we have 3 dense layers in the + # architecture + self.assertEqual(3, len(result)) + self.assertEqual(False, 'row_count' in k['SD']) + self.assertEqual(False, 'segment_model_predict' in k['SD']) + + + def test_predict_error_should_clear_sd(self): + self.subject.is_platform_pg = Mock(return_value = False) + self.model.add(Dense(3)) + + # inject error by passing 0 as the normalizing const so that we get a + # divide by zero error + normalizing_const = 0 + k = {'SD':{}} + is_response = True + with self.assertRaises(plpy.PLPYException): + self.subject.internal_keras_predict( + self.independent_var, None, None, is_response, normalizing_const, + 0, self.all_seg_ids, self.total_images_per_seg, 0, 4, **k) + self.assertEqual(False, 'row_count' in k['SD']) + self.assertEqual(False, 'segment_model_predict' in k['SD']) + class MadlibKerasWrapperTestCase(unittest.TestCase): def setUp(self): self.plpy_mock = Mock(spec='error')
