This is an automated email from the ASF dual-hosted git repository.

khannaekta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new ac148f6  DL: Add support in preprocessor to evenly distribute data for 
GPDB
ac148f6 is described below

commit ac148f63080bc863fc23ed562a11797879d65a6f
Author: Yuhao Zhang <[email protected]>
AuthorDate: Thu Sep 5 17:11:19 2019 -0700

    DL: Add support in preprocessor to evenly distribute data for GPDB
    
    JIRA: MADLIB-1378
    
    Currently, `training_preprocessor_dl` and `validation_preprocessor_dl`
    doesn't guarantee even distribution of the data among segments,
    especially when the number of buffers is not much larger than the number
    of segments. This commit adds support to the preprocessor so that it
    always distributes the data as evenly as possible among the segments
    
    Co-authored-by: Ekta Khanna <[email protected]>
---
 .../deep_learning/input_data_preprocessor.py_in    | 40 +++++++++++++++++++---
 .../deep_learning/madlib_keras_helper.py_in        |  4 +--
 .../test/input_data_preprocessor.sql_in            | 20 +++++++++++
 3 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in 
b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
index 6a03eca..a6b6a83 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
@@ -40,7 +40,7 @@ from utilities.utilities import split_quoted_delimited_str
 from utilities.utilities import strip_end_quotes
 from utilities.utilities import unique_string
 from utilities.utilities import validate_module_input_params
-
+from utilities.utilities import get_seg_number
 from utilities.validate_args import input_tbl_valid
 from utilities.validate_args import get_expr_type
 
@@ -171,14 +171,45 @@ class InputDataPreprocessorDL(object):
             FROM {self.source_table} {order_by_clause}
             """.format(FLOAT32_SQL_TYPE=FLOAT32_SQL_TYPE, **locals())
         plpy.execute(scalar_mult_sql)
+
+        series_tbl = unique_string(desp='series')
+        dist_key_tbl = unique_string(desp='dist_key')
         # Create the mini-batched output table
         if is_platform_pg():
             distributed_by_clause = ''
+            dist_key_clause = ''
+            join_clause = ''
+            select_clause = 'b.*'
+
         else:
-            distributed_by_clause= ' DISTRIBUTED BY (buffer_id) '
+            dist_key = DISTRIBUTION_KEY_COLNAME
+            # Create large temp table such that there is atleast 1 row on each 
segment
+            # Using 999999 would distribute data(atleast 1 row on each 
segment) for
+            # a cluster as large as 20000
+            query = """
+                CREATE TEMP TABLE {series_tbl}
+                AS
+                SELECT generate_series(0, 999999) {dist_key}
+                DISTRIBUTED BY ({dist_key})
+            """.format(**locals())
+            plpy.execute(query)
+
+            # Create temp table to get unique distribution key values for each 
segment
+            query = """
+                    CREATE TEMP TABLE {dist_key_tbl} AS
+                    SELECT gp_segment_id AS id, min({dist_key}) AS {dist_key}
+                    FROM {series_tbl}
+                    GROUP BY gp_segment_id
+            """.format(**locals())
+            plpy.execute(query)
+
+            num_segments = get_seg_number()
+            join_clause = 'JOIN {dist_key_tbl} ON 
(b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
+            distributed_by_clause= ' DISTRIBUTED BY ({dist_key}) 
'.format(**locals())
+            select_clause = '{dist_key}, b.*'.format(**locals())
         sql = """
             CREATE TABLE {self.output_table} AS
-            SELECT * FROM
+            SELECT  {select_clause}  FROM
             (
                 SELECT {self.schema_madlib}.agg_array_concat(
                             ARRAY[{norm_tbl}.x_norm::{FLOAT32_SQL_TYPE}[]]) AS 
{x},
@@ -188,13 +219,14 @@ class InputDataPreprocessorDL(object):
                 FROM {norm_tbl}
                 GROUP BY buffer_id
             ) b
+            {join_clause}
             {distributed_by_clause}
             """.format(x=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL,
                        y=MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL,
                        FLOAT32_SQL_TYPE=FLOAT32_SQL_TYPE,
                        **locals())
         plpy.execute(sql)
-        plpy.execute("DROP TABLE IF EXISTS {0}".format(norm_tbl))
+        plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(norm_tbl, 
series_tbl, dist_key_tbl))
         # Create summary table
         self._create_output_summary_table()
 
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in 
b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
index e8218a6..d8a01b6 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
@@ -35,12 +35,12 @@ MODEL_ARCH_ID_COLNAME = "model_arch_id"
 MODEL_DATA_COLNAME = "model_data"
 METRIC_TYPE_COLNAME = "metrics_type"
 
-# Name of independent and dependent colnames in batched table.
+# Name of independent, dependent and distribution key colnames in batched 
table.
 # These are readonly variables, do not modify.
 # MADLIB-1300 Adding these variables for DL only at this time.
 MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL = "dependent_var"
 MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL = "independent_var"
-
+DISTRIBUTION_KEY_COLNAME = "__dist_key__"
 ## sql variable types
 FLOAT32_SQL_TYPE = 'REAL'
 SMALLINT_SQL_TYPE = 'SMALLINT'
diff --git 
a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in 
b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
index 8626ecd..a3ff1d9 100644
--- 
a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
+++ 
b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
@@ -18,6 +18,7 @@
  * under the License.
  *
  *//* ----------------------------------------------------------------------- 
*/
+m4_include(`SQLCommon.m4')
 
 DROP TABLE IF EXISTS data_preprocessor_input;
 CREATE TABLE data_preprocessor_input(id serial, x double precision[], label 
TEXT);
@@ -88,6 +89,25 @@ SELECT training_preprocessor_dl(
   'label',
   'x');
 
+-- Test data is evenly distributed across all segments (GPDB only)
+m4_changequote(`<!', `!>')
+m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!
+DROP TABLE IF EXISTS data_preprocessor_input_batch, 
data_preprocessor_input_batch_summary;
+SELECT training_preprocessor_dl(
+  'data_preprocessor_input',
+  'data_preprocessor_input_batch',
+  'id',
+  'x',
+  1);
+
+-- This test expects that total number of images(17 for input table 
data_preprocessor_input)
+-- are equally distributed across all segments.
+-- Therefore, after preprocessing seg0 will have 17/(# of segs) buffers.
+SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from 
gp_segment_configuration WHERE role = 'p' and content != -1), 'Even 
distribution of buffers failed.')
+FROM data_preprocessor_input_batch
+WHERE gp_segment_id = 0;
+!>)
+
 DROP TABLE IF EXISTS data_preprocessor_input;
 CREATE TABLE data_preprocessor_input(id serial, x double precision[], y 
INTEGER, y1 BOOLEAN, y2 TEXT, y3 DOUBLE PRECISION, y4 DOUBLE PRECISION[], y5 
INTEGER[]);
 INSERT INTO data_preprocessor_input(x, y, y1, y2, y3, y4, y5) VALUES

Reply via email to