This is an automated email from the ASF dual-hosted git repository.

nkak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git

commit 642e5a501166ef44287eeee58041fd74f4e4d975
Author: Nikhil Kak <[email protected]>
AuthorDate: Mon May 13 17:02:02 2019 -0700

    DL: Improve performance for predict
    
    JIRA: MADLIB-1343
    
    Performance improvements
    1. Using SD to cache the model and set the weights only once for the
    first row for each segment. This also meant that we had to clear the SD
    for the last row for each segment.
    
    2. We replaced `PythonFunctionBodyOnly` with
    `PythonFunctionBodyOnlyNoSchema` in the internal keras predict sql.
    Using `PythonFunctionBodyOnly` made the query much slower because it
    added the overhead of executing the schema query for every row in the
    test table. We don't really need to know the schema name for the
    internal UDF so now we use `PythonFunctionBodyOnlyNoSchema` instead.
    
    Additionally:
    1. Replace the use of predict_classes and proba with predict since non
    sequential models do not support predict_classes.
    2. Modify the internal keras predict query to not join the test table
    and the model table because it caused weird inconsistencies with
    the segment id due to which SD was not getting set/cleared properly.
    3. Add try catch in the internal predict UDF so that we can clear out
    the SD in case of an error.
    
    Closes #392
    
    Co-authored-by: Ekta Khanna <[email protected]>
---
 .../modules/deep_learning/madlib_keras.py_in       |   1 -
 .../modules/deep_learning/madlib_keras.sql_in      |  31 ++---
 .../deep_learning/madlib_keras_predict.py_in       | 153 ++++++++++++++++-----
 .../test/unit_tests/test_madlib_keras.py_in        | 105 ++++++++++++++
 4 files changed, 236 insertions(+), 54 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index 40409f8..82384d4 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -440,7 +440,6 @@ def fit_transition(state, dependent_var, independent_var, 
model_architecture,
         compile_and_set_weights(segment_model, compile_params, device_name,
                                 previous_state, SD['model_shapes'])
         SD['segment_model'] = segment_model
-        image_count = 0
         agg_loss = 0
         agg_accuracy = 0
         agg_image_count = 0
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 5d21ef8..12bcb39 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -237,26 +237,19 @@ $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA');
 
 CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.internal_keras_predict(
-   independent_var    DOUBLE PRECISION [],
-   model_architecture TEXT,
-   model_data         BYTEA,
-   input_shape        INTEGER[],
-   is_response        BOOLEAN,
-   normalizing_const  DOUBLE PRECISION,
-   gpus_per_host      INTEGER,
-   seg                INTEGER
+    independent_var    REAL[],
+    model_architecture TEXT,
+    model_data         BYTEA,
+    is_response        BOOLEAN,
+    normalizing_const  DOUBLE PRECISION,
+    current_seg_id     INTEGER,
+    seg_ids            INTEGER[],
+    images_per_seg     INTEGER[],
+    gpus_per_host      INTEGER,
+    segments_per_host  INTEGER
 ) RETURNS DOUBLE PRECISION[] AS $$
-    PythonFunctionBodyOnly(`deep_learning', `madlib_keras_predict')
-    with AOControl(False):
-        return madlib_keras_predict.internal_keras_predict(
-               independent_var,
-               model_architecture,
-               model_data,
-               input_shape,
-               is_response,
-               normalizing_const,
-               gpus_per_host,
-               seg)
+    PythonFunctionBodyOnlyNoSchema(`deep_learning', `madlib_keras_predict')
+    return madlib_keras_predict.internal_keras_predict(**globals())
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
diff --git 
a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
index 75295bf..eb42bf7 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_predict.py_in
@@ -37,10 +37,11 @@ from predict_input_params import PredictParamsProcessor
 from utilities.model_arch_info import get_input_shape
 from utilities.utilities import add_postfix
 from utilities.utilities import create_cols_from_array_sql_string
+from utilities.utilities import get_segments_per_host
 from utilities.utilities import is_platform_pg
 from utilities.utilities import unique_string
 
-import madlib_keras_serializer
+from madlib_keras_wrapper import *
 
 MODULE_NAME = 'madlib_keras_predict'
 
@@ -78,49 +79,133 @@ def predict(schema_madlib, model_table, test_table, id_col,
         class_values, intermediate_col, pred_col_name,
         pred_col_type, is_response, MODULE_NAME)
 
-    segment_id = -1 if is_platform_pg() else '{0}.gp_segment_id'.format(
-        test_table)
+    gp_segment_id_col, seg_ids_test, \
+    images_per_seg_test = 
get_images_per_seg_for_non_minibatched_data(test_table)
+    segments_per_host = get_segments_per_host()
 
-    plpy.execute("""
+    if is_platform_pg():
+        set_keras_session(gpus_per_host, segments_per_host)
+    else:
+        # we want to disable gpu on gpdb's master node because GPUs will only 
be used
+        # for segment nodes.
+        set_cuda_env('-1')
+
+    predict_query = plpy.prepare("""
         CREATE TABLE {output_table} AS
         SELECT {id_col}, {prediction_select_clause}
         FROM (
             SELECT {test_table}.{id_col},
                    ({schema_madlib}.internal_keras_predict
                        ({independent_varname},
-                        $MAD${model_arch}$MAD$,
-                        {0},
-                        ARRAY{input_shape},
+                        $1,
+                        $2,
                         {is_response},
                         {normalizing_const},
+                        {gp_segment_id_col},
+                        ARRAY{seg_ids_test},
+                        ARRAY{images_per_seg_test},
                         {gpus_per_host},
-                        {segment_id})
+                        {segments_per_host})
                    ) AS {intermediate_col}
-        FROM {test_table}, {model_table}
+        FROM {test_table}
         ) q
-        """.format(MODEL_DATA_COLNAME, **locals()))
-
-def internal_keras_predict(x_test, model_arch, model_data, input_shape,
-                           is_response, normalizing_const, gpus_per_host, seg):
-    model = model_from_json(model_arch)
-    device_name = get_device_name_and_set_cuda_env(gpus_per_host, seg)
-    model_shapes = madlib_keras_serializer.get_model_shapes(model)
-    set_model_weights(model, device_name, model_data, model_shapes)
-    # Since the test data isn't mini-batched,
-    # we have to make sure that the test data np array has the same
-    # number of dimensions as input_shape. So we add a dimension to x.
-    x_test = expand_input_dims(x_test, target_type='float32')
-    x_test /= normalizing_const
-    if is_response:
-        proba_argmax = model.predict_classes(x_test)
-        # proba_argmax is a list with exactly one element in it. That element
-        # refers to the index containing the largest probability value in the
-        # output of Keras' predict function.
-        return proba_argmax
+        """.format(**locals()), ["text", "bytea"])
+    plpy.execute(predict_query, [model_arch, model_data])
+
+    if is_platform_pg():
+        clear_keras_session()
+
+def get_images_per_seg_for_non_minibatched_data(table_name):
+    """
+    This function queries the given table and returns the total rows per 
segment.
+    Since we cannot pass a dictionary to the keras fit step function we create 
arrays
+    out of the segment numbers and the rows per segment values.
+    This function assumes that the table is not empty.
+    :param table_name:
+    :return: gp segment id col name and two arrays
+    1. An array containing all the segment numbers in ascending order
+    2. An array containing the total rows for each of the segments in the
+    segment array
+    """
+    if is_platform_pg():
+        images_per_seg = plpy.execute(
+            """ SELECT count(*) AS images_per_seg
+                FROM {0}
+            """.format(table_name))
+        seg_ids = [0]
+        gp_segment_id_col = -1
     else:
-        probs = model.predict_proba(x_test)
-        # probs is a list containing a list of probability values, of all
-        # class levels. Since we are assuming each input is a single image,
-        # and not mini-batched, this list contains exactly one list in it,
-        # so return back the first list in probs.
-        return probs[0]
+        # Compute total buffers on each segment
+        images_per_seg = plpy.execute(
+            """ SELECT gp_segment_id, count(*) AS images_per_seg
+                FROM {0}
+                GROUP BY gp_segment_id
+            """.format(table_name))
+        seg_ids = [int(image["gp_segment_id"]) for image in images_per_seg]
+        gp_segment_id_col = '{0}.gp_segment_id'.format(table_name)
+
+    images_per_seg = [int(image["images_per_seg"]) for image in images_per_seg]
+    return gp_segment_id_col, seg_ids, images_per_seg
+
+def internal_keras_predict(independent_var, model_architecture, model_data,
+                           is_response, normalizing_const, current_seg_id, 
seg_ids,
+                           images_per_seg, gpus_per_host, segments_per_host,
+                           **kwargs):
+    SD = kwargs['SD']
+    model_key = 'segment_model_predict'
+    row_count_key = 'row_count'
+    try:
+        device_name = get_device_name_and_set_cuda_env(gpus_per_host,
+                                                       current_seg_id)
+        if model_key not in SD:
+            if not is_platform_pg():
+                set_keras_session(gpus_per_host, segments_per_host)
+            model = model_from_json(model_architecture)
+            model_shapes = madlib_keras_serializer.get_model_shapes(model)
+            set_model_weights(model, device_name, model_data, model_shapes)
+            SD[model_key] = model
+            SD[row_count_key] = 0
+        else:
+            model = SD[model_key]
+        SD[row_count_key] += 1
+
+        # Since the test data isn't mini-batched,
+        # we have to make sure that the test data np array has the same
+        # number of dimensions as input_shape. So we add a dimension to x.
+        independent_var = expand_input_dims(independent_var, 
target_type='float32')
+        independent_var /= normalizing_const
+
+        if is_response:
+            with K.tf.device(device_name):
+                y_prob = model.predict(independent_var)
+                proba_argmax = y_prob.argmax(axis=-1)
+            # proba_argmax is a list with exactly one element in it. That 
element
+            # refers to the index containing the largest probability value in 
the
+            # output of Keras' predict function.
+            result = proba_argmax
+        else:
+            with K.tf.device(device_name):
+                probs = model.predict(independent_var)
+            # probs is a list containing a list of probability values, of all
+            # class levels. Since we are assuming each input is a single image,
+            # and not mini-batched, this list contains exactly one list in it,
+            # so return back the first list in probs.
+            result = probs[0]
+
+        if is_platform_pg():
+            total_images = images_per_seg[0]
+        else:
+            total_images = images_per_seg[seg_ids.index(current_seg_id)]
+
+        if SD[row_count_key] == total_images:
+            SD.pop(model_key, None)
+            SD.pop(row_count_key, None)
+            if not is_platform_pg():
+                clear_keras_session()
+        return result
+    except Exception as ex:
+        SD.pop(model_key, None)
+        SD.pop(row_count_key, None)
+        if not is_platform_pg():
+            clear_keras_session()
+        plpy.error(ex)
diff --git 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 51c3afa..ea17c79 100644
--- 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -417,6 +417,111 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         with self.assertRaises(plpy.PLPYException):
             result = self.subject.fit_final(input_state.tostring())
 
+class MadlibKerasPredictTestCase(unittest.TestCase):
+    def setUp(self):
+        self.plpy_mock = Mock(spec='error')
+        patches = {
+            'plpy': plpy
+        }
+
+        self.plpy_mock_execute = MagicMock()
+        plpy.execute = self.plpy_mock_execute
+
+        self.module_patcher = patch.dict('sys.modules', patches)
+        self.module_patcher.start()
+        import madlib_keras_predict
+        self.subject = madlib_keras_predict
+
+        self.model = Sequential()
+        self.model.add(Conv2D(2, kernel_size=(1, 1), activation='relu',
+                              input_shape=(1,1,1,), padding='same'))
+        self.model.add(Flatten())
+
+        self.all_seg_ids = [0,1,2]
+
+        self.independent_var = [[[240]]]
+        self.total_images_per_seg = [3,3,4]
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+
+
+    def tearDown(self):
+        self.module_patcher.stop()
+
+    def test_predict_first_image_pass_gpdb(self):
+        self.subject.is_platform_pg = Mock(return_value = False)
+        model_weights = [1,2,3,4,5,6]
+        serialized_weights = [0, 0, 0] # not used
+        serialized_weights.extend(model_weights)
+        serialized_weights = np.array(serialized_weights, 
dtype=np.float32).tostring()
+
+        k = {'SD': {}}
+        is_response = True
+        result = self.subject.internal_keras_predict(
+            self.independent_var, self.model.to_json(),
+            serialized_weights, is_response, 255, 0, self.all_seg_ids,
+            self.total_images_per_seg, 0, 4, **k)
+        self.assertEqual(1, len(result))
+        self.assertEqual(1,  k['SD']['row_count'])
+        self.assertEqual(True, 'segment_model_predict' in k['SD'])
+
+    def test_predict_middle_image_pass_gpdb(self):
+        self.subject.is_platform_pg = Mock(return_value = False)
+
+        k = {'SD': { 'row_count': 1}}
+        k['SD']['segment_model_predict'] = self.model
+        is_response = True
+        result = self.subject.internal_keras_predict(
+            self.independent_var, None, None, is_response, 255, 0,
+            self.all_seg_ids, self.total_images_per_seg, 0, 4, **k)
+        self.assertEqual(1, len(result))
+        self.assertEqual(2,  k['SD']['row_count'])
+        self.assertEqual(True, 'segment_model_predict' in k['SD'])
+
+    def test_predict_last_image_pass_gpdb(self):
+        self.subject.is_platform_pg = Mock(return_value = False)
+        self.model.add(Dense(3))
+
+        k = {'SD': { 'row_count': 2}}
+        k['SD']['segment_model_predict'] = self.model
+        is_response = True
+        result = self.subject.internal_keras_predict(
+            self.independent_var, None, None, is_response, 255, 0,
+            self.all_seg_ids, self.total_images_per_seg, 0, 4, **k)
+        self.assertEqual(1, len(result))
+        self.assertEqual(False, 'row_count' in k['SD'])
+        self.assertEqual(False, 'segment_model_predict' in k['SD'])
+
+        k = {'SD': { 'row_count': 2}}
+        k['SD']['segment_model_predict'] = self.model
+        is_response = False
+        result = self.subject.internal_keras_predict(
+            self.independent_var, None, None, is_response, 255, 0,
+            self.all_seg_ids, self.total_images_per_seg, 0, 4, **k)
+
+        # we except len(result) to be 3 because we have 3 dense layers in the
+        # architecture
+        self.assertEqual(3, len(result))
+        self.assertEqual(False, 'row_count' in k['SD'])
+        self.assertEqual(False, 'segment_model_predict' in k['SD'])
+
+
+    def test_predict_error_should_clear_sd(self):
+        self.subject.is_platform_pg = Mock(return_value = False)
+        self.model.add(Dense(3))
+
+        # inject error by passing 0 as the normalizing const so that we get a
+        # divide by zero error
+        normalizing_const = 0
+        k = {'SD':{}}
+        is_response = True
+        with self.assertRaises(plpy.PLPYException):
+            self.subject.internal_keras_predict(
+                self.independent_var, None, None, is_response, 
normalizing_const,
+                0, self.all_seg_ids, self.total_images_per_seg, 0, 4, **k)
+        self.assertEqual(False, 'row_count' in k['SD'])
+        self.assertEqual(False, 'segment_model_predict' in k['SD'])
+
 class MadlibKerasWrapperTestCase(unittest.TestCase):
     def setUp(self):
         self.plpy_mock = Mock(spec='error')

Reply via email to