This is an automated email from the ASF dual-hosted git repository.

nkak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new 897b4ed  DL: Add GPU support for evaluate
897b4ed is described below

commit 897b4ed3c30c559a92501b64c6632ddab7aee1b9
Author: Nikhil Kak <[email protected]>
AuthorDate: Wed Apr 17 18:04:34 2019 -0700

    DL: Add GPU support for evaluate
    
    In our experiments on a multi host gpdb cluster with places10 (subset of
    places365) dataset, we noticed that using CPU and no SD caused the fit
    query to fail during the validate code with the error 'Dst tensor is not
    initialized'. Adding GPU and caching the compiled model fixed this issue.
    
    1. Add GPU support to the evaluate code called by the fit function.
    2. The evaluate code previously compiled the model for each row in the 
validation table.
    This was unnecessary and takes more resources in terms of time and memory. 
This commit avoids this by using SD to cache the compiled model.
    3. Also refactored the code by creating a function to get the rows per 
segment
---
 .../modules/deep_learning/madlib_keras.py_in       | 222 ++++++++++++---------
 .../modules/deep_learning/madlib_keras.sql_in      |   4 +-
 .../test/unit_tests/test_madlib_keras.py_in        |  67 ++++++-
 3 files changed, 199 insertions(+), 94 deletions(-)

diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
index 542f489..1668a7b 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.py_in
@@ -59,10 +59,6 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
         dependent_varname, independent_varname, num_iterations)
 
     start_training_time = datetime.datetime.now()
-
-    # Disable GPU on master
-    os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
-
     use_gpu = bool(use_gpu)
 
     # Get the serialized master model
@@ -82,6 +78,19 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
     fit_validator.validate_input_shapes(input_shape)
     model_weights_serialized = query_result[Format.MODEL_WEIGHTS]
 
+    #TODO: Refactor the pg related logic in a future PR when we think
+    # about making the fit function easier to read and maintain.
+    if is_platform_pg():
+        set_keras_session(use_gpu)
+        gp_segment_id_col =  -1
+    else:
+        gp_segment_id_col = 'gp_segment_id'
+        # Disable GPU on master for gpdb
+        os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
+    seg_ids_train, rows_per_seg_train = get_rows_per_seg_from_db(source_table)
+    if validation_table:
+        seg_ids_val, rows_per_seg_val = 
get_rows_per_seg_from_db(validation_table)
+
     # Convert model from json and initialize weights
     master_model = model_from_json(model_arch)
     model_weights = master_model.get_weights()
@@ -101,27 +110,6 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
     validation_set_provided = bool(validation_table)
     validation_aggregate_accuracy = []; validation_aggregate_loss = []
 
-    if is_platform_pg():
-        total_buffers_per_seg = plpy.execute(
-            """ SELECT count(*) AS total_buffers_per_seg
-                FROM {0}
-            """.format(source_table))
-        seg_nums = "[]::integer[]"
-        gp_segment_id_col = -1
-    else:
-        # Compute total buffers on each segment
-        total_buffers_per_seg = plpy.execute(
-            """ SELECT gp_segment_id, count(*) AS total_buffers_per_seg
-                FROM {0}
-                GROUP BY gp_segment_id
-            """.format(source_table))
-        seg_nums = [int(each_buffer["gp_segment_id"])
-            for each_buffer in total_buffers_per_seg]
-        # gp_segment_id is an implicit column in GPDB tables.
-        gp_segment_id_col = "gp_segment_id"
-
-    total_buffers_per_seg = [int(each_buffer["total_buffers_per_seg"])
-        for each_buffer in total_buffers_per_seg]
     # Prepare the SQL for running distributed training via UDA
     compile_params_to_pass = "$madlib$" + compile_params + "$madlib$"
     fit_params_to_pass = "$madlib$" + fit_params + "$madlib$"
@@ -131,8 +119,8 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
             {dependent_varname}::SMALLINT[],
             {gp_segment_id_col},
             {num_classes}::INTEGER,
-            ARRAY{seg_nums},
-            ARRAY{total_buffers_per_seg},
+            ARRAY{seg_ids_train},
+            ARRAY{rows_per_seg_train},
             $MAD${model_arch}$MAD$::TEXT,
             {compile_params_to_pass}::TEXT,
             {fit_params_to_pass}::TEXT,
@@ -154,12 +142,7 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
     # Run distributed training for specified number of iterations
     for i in range(num_iterations):
         start_iteration = time.time()
-        try:
-            iteration_result = plpy.execute(
-                run_training_iteration, [model_state])[0]['iteration_result']
-        except plpy.SPIError as e:
-            plpy.error('A plpy error occurred in the step function: {0}'.
-                       format(str(e)))
+        iteration_result = plpy.execute(run_training_iteration, 
[model_state])[0]['iteration_result']
         end_iteration = time.time()
         plpy.info("Time for iteration {0}: {1} sec".
                   format(i + 1, end_iteration - start_iteration))
@@ -173,10 +156,18 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
             _, _, _, updated_weights = 
madlib_keras_serializer.deserialize_weights(
                 model_state, model_shapes)
             master_model.set_weights(updated_weights)
-            evaluate_result = get_loss_acc_from_keras_eval(
-                schema_madlib, validation_table, dependent_varname,
-                independent_varname, compile_params_to_pass, model_arch,
-                model_state, use_gpu, gp_segment_id_col)
+            start_val = time.time()
+            evaluate_result = get_loss_acc_from_keras_eval(schema_madlib,
+                                                           validation_table,
+                                                           dependent_varname,
+                                                           independent_varname,
+                                                           
compile_params_to_pass,
+                                                           model_arch, 
model_state,
+                                                           use_gpu, 
seg_ids_val,
+                                                           rows_per_seg_val,
+                                                           gp_segment_id_col)
+            end_val = time.time()
+            plpy.info("Time for validation in iteration {0}: {1} sec". 
format(i + 1, end_val - start_val))
             if len(evaluate_result) < 2:
                 plpy.error('Calling evaluate on validation data returned < 2 '
                            'metrics. Expected metrics are loss and accuracy')
@@ -279,35 +270,39 @@ def fit(schema_madlib, source_table, model, 
dependent_varname,
         SELECT $1 as model_data""".format(model), ["bytea"])
     plpy.execute(create_output_table, [model_state])
 
+    if is_platform_pg():
+        clear_keras_session()
 
-def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname,
-                                 independent_varname, compile_params, 
model_arch,
-                                 model_data, use_gpu, gp_segment_id_col):
+def get_rows_per_seg_from_db(table_name):
     """
-    This function will call the internal keras evaluate function to get the 
loss
-    and accuracy of each tuple which then gets averaged to get the final 
result.
-    :param schema_madlib:
-    :param table:
-    :param dependent_varname:
-    :param independent_varname:
-    :param compile_params:
-    :param model_arch:
-    :param model_data:
-    :return:
+    This function queries the given table and returns the total rows per 
segment.
+    Since we cannot pass a dictionary to the keras fit step function we create 
arrays
+    out of the segment numbers and the rows per segment values.
+    This function assumes that the table is not empty.
+    :param table_name:
+    :return: Returns two arrays
+    1. An array containing all the segment numbers in ascending order
+    1. An array containing the total rows for each of the segments in the
+    segment array
     """
-    evaluate_query = plpy.prepare("""
-    select {schema_madlib}.array_avg(loss_acc, True) as final_loss_acc from
-    (
-        select ({schema_madlib}.internal_keras_evaluate({dependent_varname},
-                                            {independent_varname},
-                                            $MAD${model_arch}$MAD$,
-                                            $1, {compile_params},
-                                            {use_gpu}, {gp_segment_id_col})) 
as loss_acc
-        from {table}
-    ) q""".format(**locals()), ["bytea"])
-    res = plpy.execute(evaluate_query, [model_data])
-    loss_acc = res[0]['final_loss_acc']
-    return loss_acc
+    if is_platform_pg():
+        rows = plpy.execute(
+            """ SELECT count(*) AS rows_per_seg
+                FROM {0}
+            """.format(table_name))
+        seg_ids = "[]::integer[]"
+    else:
+        # Compute total buffers on each segment
+        rows = plpy.execute(
+            """ SELECT gp_segment_id, count(*) AS rows_per_seg
+                FROM {0}
+                GROUP BY gp_segment_id
+            """.format(table_name))
+        seg_ids = [int(row["gp_segment_id"]) for row in rows]
+
+    rows = [int(row["rows_per_seg"]) for row in rows]
+    return seg_ids, rows
+
 
 def fit_transition(state, ind_var, dep_var, current_seg_id, num_classes,
                    all_seg_ids, total_buffers_per_seg, architecture,
@@ -336,23 +331,14 @@ def fit_transition(state, ind_var, dep_var, 
current_seg_id, num_classes,
 
     start_transition = time.time()
     SD = kwargs['SD']
-    is_pg = False
-    if current_seg_id == -1:
-        is_pg = True
-    if is_pg:
-        # This is postgres
-        total_buffers = total_buffers_per_seg[0]
-    else:
-        # This is GPDB
-        total_buffers = total_buffers_per_seg[all_seg_ids.index(
-            current_seg_id)]
     # Configure GPUs/CPUs
     device_name = get_device_name_and_set_cuda_env(use_gpu, current_seg_id)
 
     # Set up system if this is the first buffer on segment'
 
     if not state:
-        set_keras_session(use_gpu)
+        if not is_platform_pg():
+            set_keras_session(use_gpu)
         segment_model = model_from_json(architecture)
         SD['model_shapes'] = 
madlib_keras_serializer.get_model_shapes(segment_model)
         compile_and_set_weights(segment_model, compile_params, device_name,
@@ -389,14 +375,17 @@ def fit_transition(state, ind_var, dep_var, 
current_seg_id, num_classes,
 
     with K.tf.device(device_name):
         updated_weights = segment_model.get_weights()
-
-    if SD['buffer_count'] == total_buffers:
-        if total_buffers == 0:
-            plpy.error('total buffers is 0')
-
-        agg_loss /= total_buffers
-        agg_accuracy /= total_buffers
-        if not is_pg:
+    if is_platform_pg():
+        total_buffers_per_seg = total_buffers_per_seg[0]
+    else:
+        total_buffers_per_seg = 
total_buffers_per_seg[all_seg_ids.index(current_seg_id)]
+    if total_buffers_per_seg == 0:
+        plpy.error('Got 0 rows. Expected at least 1.')
+
+    if SD['buffer_count'] == total_buffers_per_seg:
+        agg_loss /= total_buffers_per_seg
+        agg_accuracy /= total_buffers_per_seg
+        if not is_platform_pg():
             # In GPDB, each segment would have a keras session, so clear
             # them after the last buffer is processed.
             clear_keras_session()
@@ -483,16 +472,53 @@ def evaluate1(schema_madlib, model_table, test_table, 
id_col, model_arch_table,
     plpy.info('evaluate result loss is {}'.format(loss_acc[0]))
     plpy.info('evaluate result acc is {}'.format(loss_acc[1]))
 
+def get_loss_acc_from_keras_eval(schema_madlib, table, dependent_varname,
+                                 independent_varname, compile_params, 
model_arch,
+                                 model_data, use_gpu, seg_ids_val,
+                                 rows_per_seg_val, gp_segment_id_col):
+    """
+    This function will call the internal keras evaluate function to get the 
loss
+    and accuracy of each tuple which then gets averaged to get the final 
result.
+    """
+    evaluate_query = plpy.prepare("""
+    select {schema_madlib}.array_avg(loss_acc, True) as final_loss_acc from
+    (
+        select ({schema_madlib}.internal_keras_evaluate({dependent_varname},
+                                            {independent_varname},
+                                            $MAD${model_arch}$MAD$,
+                                            $1, {compile_params},
+                                            {use_gpu}, 
+                                            ARRAY{seg_ids_val}, 
+                                            ARRAY{rows_per_seg_val},
+                                            {gp_segment_id_col})) as loss_acc 
+        from {table}
+    ) q""".format(**locals()), ["bytea"])
+    res = plpy.execute(evaluate_query, [model_data])
+    loss_acc = res[0]['final_loss_acc']
+    return loss_acc
+
+
 def internal_keras_evaluate(dependent_var, independent_var, model_architecture,
-                            model_data, compile_params, use_gpu, seg, 
**kwargs):
-    device_name = get_device_name_and_set_cuda_env(use_gpu, seg)
-    model = model_from_json(model_architecture)
-    model_shapes = madlib_keras_serializer.get_model_shapes(model)
-    _, _, _, model_weights = madlib_keras_serializer.deserialize_weights(
-        model_data, model_shapes)
-    model.set_weights(model_weights)
-    with K.tf.device(device_name):
-        compile_model(model, compile_params)
+                            model_data, compile_params, use_gpu, seg_ids_val,
+                            rows_per_seg_val, current_seg, **kwargs):
+    SD = kwargs['SD']
+    device_name = get_device_name_and_set_cuda_env(use_gpu, current_seg)
+
+    if 'segment_model' not in SD:
+        if not is_platform_pg():
+            set_keras_session(use_gpu)
+        model = model_from_json(model_architecture)
+        model_shapes = madlib_keras_serializer.get_model_shapes(model)
+        _, _, _, model_weights = madlib_keras_serializer.deserialize_weights(
+            model_data, model_shapes)
+        model.set_weights(model_weights)
+        with K.tf.device(device_name):
+            compile_model(model, compile_params)
+        SD['segment_model'] = model
+        SD['row_count'] = 0
+    else:
+        model = SD['segment_model']
+    SD['row_count'] += 1
 
     # Since the training data is batched but the validation data isn't,
     # we have to make sure that the validation data np array has the same
@@ -503,4 +529,18 @@ def internal_keras_evaluate(dependent_var, 
independent_var, model_architecture,
 
     with K.tf.device(device_name):
         res = model.evaluate(independent_var, dependent_var)
+    if is_platform_pg():
+        total_rows = rows_per_seg_val[0]
+    else:
+        total_rows = rows_per_seg_val[seg_ids_val.index(current_seg)]
+
+    if is_last_row_in_seg(SD['row_count'], total_rows):
+        SD.pop('segment_model', None)
+        if not is_platform_pg():
+            clear_keras_session()
+
     return res
+
+
+def is_last_row_in_seg(row_count, total_rows):
+    return row_count == total_rows
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
index 295276b..8e13933 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras.sql_in
@@ -253,7 +253,9 @@ CREATE OR REPLACE FUNCTION 
MADLIB_SCHEMA.internal_keras_evaluate(
    model_data bytea,
    compile_params TEXT,
    use_gpu BOOLEAN,
-   seg INTEGER
+   seg_ids_val INTEGER[],
+   rows_per_seg_val INTEGER[],
+   current_seg INTEGER
 ) RETURNS DOUBLE PRECISION[] AS $$
     PythonFunctionBodyOnly(`deep_learning', `madlib_keras')
     with AOControl(False):
diff --git 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
index 84e4ce7..2a9a427 100644
--- 
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
+++ 
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras.py_in
@@ -68,11 +68,42 @@ class MadlibKerasFitTestCase(unittest.TestCase):
     def tearDown(self):
         self.module_patcher.stop()
 
-    def test_fit_transition_first_buffer_pass(self):
+    def test_fit_transition_first_buffer_pass_pg(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
+
+        #postgres
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = True)
+        buffer_count = 0
+        previous_state = [self.loss, self.accuracy, buffer_count]
+        previous_state.extend(self.model_weights)
+        previous_state = np.array(previous_state, dtype=np.float32)
+
+        k = {'SD': {'buffer_count': buffer_count}}
+        new_model_state = self.subject.fit_transition(
+            None, [[[[0.5]]]] , [[0,1]], 1, 2, self.all_seg_ids, 
self.total_buffers_per_seg,
+            self.model.to_json(), self.compile_params, self.fit_params, False,
+            previous_state.tostring(), **k)
+        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
+        self.assertEqual(1, buffer_count)
+        # set_session must get called ONLY once, when its the first buffer
+        self.assertEqual(0, self.subject.K.set_session.call_count)
+        # Clear session and sess.close must not get called for the first buffer
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
+        self.assertEqual(1, k['SD']['buffer_count'])
+        self.assertTrue(k['SD']['segment_model'])
+        self.assertTrue(k['SD']['model_shapes'])
+
+    def test_fit_transition_first_buffer_pass_gpdb(self):
+        #TODO should we mock tensorflow's close_session and keras'
+        # clear_session instead of mocking the function `clear_keras_session`
+
+        #postgres
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = False)
         buffer_count = 0
         previous_state = [self.loss, self.accuracy, buffer_count]
         previous_state.extend(self.model_weights)
@@ -93,11 +124,43 @@ class MadlibKerasFitTestCase(unittest.TestCase):
         self.assertTrue(k['SD']['segment_model'])
         self.assertTrue(k['SD']['model_shapes'])
 
-    def test_fit_transition_last_buffer_pass(self):
+
+    def test_fit_transition_last_buffer_pass_pg(self):
+        #TODO should we mock tensorflow's close_session and keras'
+        # clear_session instead of mocking the function `clear_keras_session`
+        self.subject.K.set_session = Mock()
+        self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = True)
+
+        buffer_count = 2
+
+        state = [self.loss, self.accuracy, buffer_count]
+        state.extend(self.model_weights)
+        state = np.array(state, dtype=np.float32)
+
+        self.subject.compile_and_set_weights(self.model, self.compile_params,
+                                             '/cpu:0', state.tostring(), 
self.model_shapes)
+        k = {'SD': {'buffer_count': buffer_count,
+                   'model_shapes': self.model_shapes}}
+        k['SD']['segment_model'] = self.model
+        new_model_state = self.subject.fit_transition(
+            state.tostring(), [[[[0.5]]]] , [[1,0]], 1, 2, self.all_seg_ids, 
self.total_buffers_per_seg,
+            self.model.to_json(), None, self.fit_params, False, 
'dummy_previous_state', **k)
+
+        buffer_count = np.fromstring(new_model_state, dtype=np.float32)[2]
+        self.assertEqual(3, buffer_count)
+        # set_session must get called ONLY once, when its the first buffer
+        self.assertEqual(0, self.subject.K.set_session.call_count)
+        # Clear session and sess.close must not get called for the first buffer
+        self.assertEqual(0, self.subject.clear_keras_session.call_count)
+        self.assertEqual(3, k['SD']['buffer_count'])
+
+    def test_fit_transition_last_buffer_pass_gpdb(self):
         #TODO should we mock tensorflow's close_session and keras'
         # clear_session instead of mocking the function `clear_keras_session`
         self.subject.K.set_session = Mock()
         self.subject.clear_keras_session = Mock()
+        self.subject.is_platform_pg = Mock(return_value = False)
 
         buffer_count = 2
 

Reply via email to