This is an automated email from the ASF dual-hosted git repository.

okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git


The following commit(s) were added to refs/heads/master by this push:
     new 640392d  Correlation: Process deconstruction in chunks for grouping
640392d is described below

commit 640392dcb8bdd57f83fb1706959936730b3e4531
Author: Orhan Kislal <okis...@pivotal.io>
AuthorDate: Thu Jul 18 14:24:08 2019 -0700

    Correlation: Process deconstruction in chunks for grouping
    
    JIRA: MADLIB-1301
    While deconstructing the correlation matrix to create the output table,
    a big UNION ALL query was created, with one sub-query for each distinct
    grouping value. This was causing the memory, stack, and performance
    related issues.
    The fix is to run multiple queries, with each query processing the
    deconstruction of the correlation matrix for a limited number of
    groups (we have defaulted the value to 10). This value can be
    parameterized by the user with a newly introduced optional parameter
    named `n_groups_per_run` for both correlation and covariance.
    
    Closes #422
    
    Co-authored-by: Nandish Jayaram <njaya...@apache.org>
---
 src/ports/postgres/modules/stats/correlation.py_in | 89 ++++++++++++++++------
 .../postgres/modules/stats/correlation.sql_in      | 72 ++++++++++++++---
 .../postgres/modules/stats/test/correlation.sql_in | 28 +++++++
 3 files changed, 155 insertions(+), 34 deletions(-)

diff --git a/src/ports/postgres/modules/stats/correlation.py_in 
b/src/ports/postgres/modules/stats/correlation.py_in
index 005de75..679d304 100644
--- a/src/ports/postgres/modules/stats/correlation.py_in
+++ b/src/ports/postgres/modules/stats/correlation.py_in
@@ -9,6 +9,7 @@ from time import time
 
 import plpy
 from utilities.control import MinWarning
+from utilities.utilities import _assert
 from utilities.utilities import add_postfix
 from utilities.utilities import get_table_qualified_col_str
 from utilities.utilities import py_list_to_sql_string
@@ -20,10 +21,9 @@ from utilities.validate_args import get_cols_and_types
 from utilities.validate_args import input_tbl_valid
 from utilities.validate_args import output_tbl_valid
 
-
 def correlation(schema_madlib, source_table, output_table,
                 target_cols, grouping_cols, get_cov=False,
-                verbose=False, **kwargs):
+                verbose=False, n_groups_per_run=10, **kwargs):
     """
     Populates an output table with the coefficients of correlation between
     the columns in a source table
@@ -45,10 +45,12 @@ def correlation(schema_madlib, source_table, output_table,
         _numeric_column_names, _nonnumeric_column_names = 
_get_numeric_columns(source_table)
         _target_cols = _analyze_target_cols(source_table, target_cols,
                                             function_name)
+        if n_groups_per_run is None:
+             n_groups_per_run = 10
         # Validate grouping_cols param
         if grouping_cols:
             _validate_grouping_cols_param(source_table, grouping_cols,
-                                          function_name)
+                                          function_name, n_groups_per_run)
         if _target_cols:
             # prune all non-numeric column types from target columns
             _existing_target_cols = []
@@ -82,7 +84,8 @@ def correlation(schema_madlib, source_table, output_table,
 
     run_time = _populate_output_table(schema_madlib, source_table, 
output_table,
                                       _existing_target_cols, grouping_cols,
-                                      function_name, get_cov, verbose)
+                                      n_groups_per_run, function_name, get_cov,
+                                      verbose)
     # ---- Output message ----
     output_text_list = ["Summary for '{0}' function".format(function_name)]
     output_text_list.append("Output table = " + str(output_table))
@@ -121,7 +124,8 @@ def _validate_corr_arg(source_table, output_table, 
function_name):
 # 
------------------------------------------------------------------------------
 
 
-def _validate_grouping_cols_param(source_table, grouping_cols, function_name):
+def _validate_grouping_cols_param(source_table, grouping_cols, function_name,
+                                  n_groups_per_run):
     grouping_cols_list = split_quoted_delimited_str(grouping_cols)
     # Column names that are used in summary table.
     reserved_cols_in_summary_table = set(['method',
@@ -132,6 +136,8 @@ def _validate_grouping_cols_param(source_table, 
grouping_cols, function_name):
                                           'total_rows_processed'])
     cols_in_tbl_valid(source_table, grouping_cols_list, function_name)
     does_exclude_reserved(grouping_cols_list, reserved_cols_in_summary_table)
+    _assert(n_groups_per_run>0, "{0}: n_groups_per_run has to be greater than 
0.".
+        format(function_name))
 
 def _get_numeric_columns(source_table):
     """
@@ -175,8 +181,8 @@ def _analyze_target_cols(source_table, target_cols, 
function_name):
 
 
 def _populate_output_table(schema_madlib, source_table, output_table,
-                           col_names, grouping_cols, function_name,
-                           get_cov=False, verbose=False):
+                           col_names, grouping_cols, n_groups_per_run,
+                           function_name, get_cov=False, verbose=False):
     """
     Creates a relation with the appropriate number of columns given a list of
     column names and populates with the correlation coefficients. If the table
@@ -267,18 +273,17 @@ def _populate_output_table(schema_madlib, source_table, 
output_table,
         plpy.execute(create_temp_output_table_query)
 
         # Prepare the query for converting the matrix into the lower triangle
-        deconstruction_query = _create_deconstruction_query(schema_madlib,
+        deconstruction_query_list = _create_deconstruction_query(schema_madlib,
                                                             col_names,
                                                             grouping_cols,
                                                             temp_output_table,
-                                                            cor_mat)
+                                                            cor_mat,
+                                                            n_groups_per_run)
 
         variable_subquery = unique_string(desp='variable_subq')
         matrix_subquery = unique_string(desp='matrix_subq')
         # create output table
-        create_output_table_query = """
-
-            CREATE TABLE {output_table} AS
+        select_deconstruct_query = """
             SELECT *
             FROM
             (
@@ -291,9 +296,32 @@ def _populate_output_table(schema_madlib, source_table, 
output_table,
                 {deconstruction_query}
             ) {matrix_subquery}
             USING (column_position)
-            """.format(num_cols=len(col_names), **locals())
+        """
+
+        # Create the output table.
+        # If there are no groupin cols, the query list will have a single 
element.
+        # Therefore, we always execute the 0'th element of the list.
+        # If there are grouping cols, we loop through the rest of the list and
+        # execute them one by one.
+        create_output_table_query = """
+            CREATE TABLE {output_table} AS
+            {select_deconstruct_query}
+            """.format(**locals()).format(
+                deconstruction_query=deconstruction_query_list[0],
+                num_cols=len(col_names), **locals())
         plpy.execute(create_output_table_query)
 
+        if grouping_cols:
+            for i in range(1, len(deconstruction_query_list)):
+                insert_to_output_table_query = """
+                    INSERT INTO {output_table}
+                    {select_deconstruct_query}
+                """.format(**locals()).format(
+                    deconstruction_query=deconstruction_query_list[i],
+                    num_cols=len(col_names), **locals())
+                plpy.execute(insert_to_output_table_query)
+
+
          # create summary table
         summary_table = add_postfix(output_table, "_summary")
         create_summary_table_query = """
@@ -318,7 +346,7 @@ def _populate_output_table(schema_madlib, source_table, 
output_table,
 # 
------------------------------------------------------------------------------
 
 def _create_deconstruction_query(schema_madlib, col_names, grouping_cols,
-                                 temp_output_table, cor_mat):
+                                 temp_output_table, cor_mat, n_groups_per_run):
     """
     Creates the query to convert the matrix into the lower-traingular format.
 
@@ -330,9 +358,13 @@ def _create_deconstruction_query(schema_madlib, col_names, 
grouping_cols,
                                     the matrix to deconstruct
         @param cor_mat              Name of column that containss the matrix
                                     to deconstruct
+        @param n_groups_per_run     Number of groups to deconstruct in a single
+                                    sub-query
 
     Returns:
-        String (SQL querry for deconstructing the matrix)
+        List of Strings where each string is a SQL sub-query for deconstructing
+        the matrix. Each sub-query covers n_groups_per_run number of union all
+        queries
     """
     # The matrix that holds the PCC computation must be converted to a
     # table capturing all pair wise PCC values. That is done using
@@ -341,8 +373,9 @@ def _create_deconstruction_query(schema_madlib, col_names, 
grouping_cols,
     # construct the query accordingly.
 
     COL_WIDTH = 10
-    # split the col_names to equal size sets with newline between to prevent a 
long query
-    # Build a 2d array of the col_names, each inner array with COL_WIDTH 
number of names.
+    # split the col_names to equal size sets with newline between to prevent a
+    # long query. Build a 2d array of the col_names, each inner array with
+    # COL_WIDTH number of names.
     col_names_split = [col_names[x : x + COL_WIDTH]
                                  for x in range(0, len(col_names), COL_WIDTH)]
     variable_list_str = ', \n'.join([', '.join(
@@ -351,25 +384,33 @@ def _create_deconstruction_query(schema_madlib, 
col_names, grouping_cols,
                                          ]) for cols_blob in col_names_split
                                      ])
 
-
+    # Fix for MADLIB-1301.
+    # Creating a huge chain of union all sub-queries might create problems with
+    # memory/stack/execution time. We divide them into chunks of 10 (or
+    # whatever the user decides) and insert these chunks one by one. So create
+    # a list of these sub-query chunks. If no grouping cols are provided, the
+    # list will contain only one sub-query that will be executed.
+    deconstruction_query_list = []
     if grouping_cols:
         grp_dict_rows = plpy.execute("SELECT {0} FROM {1}".format(
                                         grouping_cols,
                                         temp_output_table))
-        deconstruction_queries_list = list()
+        deconstruction_grp_queries_list = list()
         for grp_dict in grp_dict_rows:
             where_condition = 'WHERE ' + ' AND '.join("{0} = '{1}'".format(k, 
v)
                                              for k, v in grp_dict.items())
 
             select_grouping_cols = ' , '.join("'{1}' AS {0}".format(k, v)
                                          for k, v in grp_dict.items())
-            deconstruction_queries_list.append("""
+            deconstruction_grp_queries_list.append("""
                     SELECT {select_grouping_cols}, *
                     FROM {schema_madlib}.__deconstruct_lower_triangle(
                         (SELECT {cor_mat} FROM {temp_output_table} 
{where_condition})
                     ) AS deconstructed(column_position integer, 
{variable_list_str})
                 """.format(**locals()))
-        deconstruction_query = ' UNION ALL '.join(deconstruction_queries_list)
+        for i in range(0, len(deconstruction_grp_queries_list), 
n_groups_per_run):
+            sublist = deconstruction_grp_queries_list[i:i+n_groups_per_run]
+            deconstruction_query_list.append(' UNION ALL '.join(sublist))
     else:
         deconstruction_query = """
             SELECT * FROM
@@ -377,7 +418,8 @@ def _create_deconstruction_query(schema_madlib, col_names, 
grouping_cols,
                 (SELECT {cor_mat} FROM {temp_output_table})
             ) AS deconstructed(column_position integer, {variable_list_str})
         """.format(**locals())
-    return deconstruction_query
+        deconstruction_query_list = [deconstruction_query]
+    return deconstruction_query_list
 
 def correlation_help_message(schema_madlib, message, cov=False, **kwargs):
     """
@@ -397,7 +439,8 @@ SELECT {schema_madlib}.{func}
     target_cols    TEXT,    -- Comma separated columns for which summary is 
desired
                             --   (Default: '*' - produces result for all 
columns)
     verbose        BOOLEAN, -- Verbosity
-    grouping_cols  TEXT     -- Comma separated columns for grouping
+    grouping_cols  TEXT,    -- Comma separated columns for grouping
+    n_groups_per_run INTEGER -- number of groups to process at a time
 )
 -----------------------------------------------------------------------
 Output will be a table with N+2 columns and N rows, where N is the number
diff --git a/src/ports/postgres/modules/stats/correlation.sql_in 
b/src/ports/postgres/modules/stats/correlation.sql_in
index 1a1bdc5..4932385 100644
--- a/src/ports/postgres/modules/stats/correlation.sql_in
+++ b/src/ports/postgres/modules/stats/correlation.sql_in
@@ -53,7 +53,8 @@ correlation( source_table,
              output_table,
              target_cols,
              verbose,
-             grouping_cols
+             grouping_cols,
+             n_groups_per_run
            )
 </pre>
 
@@ -63,7 +64,8 @@ covariance( source_table,
             output_table,
             target_cols,
             verbose,
-            grouping_cols
+            grouping_cols,
+            n_groups_per_run
           )
 </pre>
 
@@ -133,6 +135,27 @@ If NULL or <tt>'*'</tt>, results are produced for all 
numeric columns.</dd>
 
 <dt>grouping_cols (optional)</dt>
 <dd>TEXT, default: NULL. A comma-separated list of the columns to group 
by.</dd>
+
+<dt>n_groups_per_run (optional)</dt>
+<dd>INTEGER, default: 10. Number of groups to process at a time.
+This parameter is ignored if 'grouping_cols' is not specified.
+Generally the default value will work fine, but there may be cases
+(see below) where you will want to experiment with it
+to reduce execution time and memory usage.
+</dd>
+@note
+This is a lower level parameter that can potentially be used to
+improve performance, but should be used with caution.
+It is designed to handle the case where you have a large number
+of groups.
+In general, increasing 'n_groups_per_run' means we
+construct a larger 'UNION ALL' query which uses more memory and may slow down 
execution
+if it gets too big.
+If you have a large number of groups and a smaller data size, there may
+be benefits to increasing this value.
+Conversely, decreasing 'n_groups_per_run' means we issue
+more 'plpy.execute' commands.  This increases overhead and can modestly
+affect the execution time.
 </dl>
 
 
@@ -442,7 +465,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
     output_table varchar, -- output table name
     target_cols  varchar, -- comma separated list of output cols (default = 
'*')
     verbose      boolean, -- flag to determine verbosity
-    grouping_cols varchar -- comma separated column names to be used for 
grouping
+    grouping_cols varchar, -- comma separated column names to be used for 
grouping
+    n_groups_per_run integer -- number of groups to process at a time
 ) RETURNS TEXT AS $$
     PythonFunction(stats, correlation, correlation)
 $$ LANGUAGE plpythonu VOLATILE
@@ -455,10 +479,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
     source_table varchar, --  input table name
     output_table varchar, -- output table name
     target_cols  varchar,  -- comma separated list of output cols (default = 
'*')
+    verbose      boolean,  -- flag to determine verbosity
+    grouping_cols varchar -- comma separated column names to be used for 
grouping
+)
+RETURNS TEXT AS $$
+    select MADLIB_SCHEMA.correlation($1, $2, $3, $4, $5, 10)
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
+    source_table varchar, --  input table name
+    output_table varchar, -- output table name
+    target_cols  varchar,  -- comma separated list of output cols (default = 
'*')
     verbose      boolean  -- flag to determine verbosity
 )
 RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL)
+    select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -468,7 +504,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
     target_cols  varchar  -- comma separated list of output cols (default = 
'*')
 )
 RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL)
+    select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -477,7 +513,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
     source_table varchar, --  input table name
     output_table varchar  -- output table name
 ) RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL)
+    select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -526,12 +562,14 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
     output_table varchar, -- output table name
     target_cols  varchar, -- comma separated list of output cols (default = 
'*')
     verbose      boolean, -- flag to determine verbosity
-    grouping_cols varchar -- comma separated column names to be used for 
grouping
+    grouping_cols varchar, -- comma separated column names to be used for 
grouping
+    n_groups_per_run integer -- number of groups to process at a time
 ) RETURNS TEXT AS $$
 PythonFunctionBodyOnly(`stats', `correlation')
     with AOControl(False):
         return correlation.correlation(schema_madlib, source_table, 
output_table,
-                                       target_cols, grouping_cols, True, 
verbose)
+                                       target_cols, grouping_cols, True, 
verbose,
+                                       n_groups_per_run)
 $$ LANGUAGE plpythonu VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -542,10 +580,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
     source_table varchar, --  input table name
     output_table varchar, -- output table name
     target_cols  varchar, -- comma separated list of output cols (default = 
'*')
+    verbose BOOLEAN,       -- flag to determine verbosity
+    grouping_cols varchar -- comma separated column names to be used for 
grouping
+
+)
+RETURNS TEXT AS $$
+    select MADLIB_SCHEMA.covariance($1, $2, $3, $4, $5, 10)
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
+    source_table varchar, --  input table name
+    output_table varchar, -- output table name
+    target_cols  varchar, -- comma separated list of output cols (default = 
'*')
     verbose BOOLEAN       -- flag to determine verbosity
 )
 RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL)
+    select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -555,7 +605,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
     target_cols  varchar  -- comma separated list of output cols (default = 
'*')
 )
 RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL)
+    select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
@@ -564,7 +614,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
     source_table varchar, --  input table name
     output_table varchar  -- output table name
 ) RETURNS TEXT AS $$
-    select MADLIB_SCHEMA.covariance($1, $2, NULL, NULL)
+    select MADLIB_SCHEMA.covariance($1, $2, NULL, FALSE, NULL, 10)
 $$ LANGUAGE sql VOLATILE
 m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
 
diff --git a/src/ports/postgres/modules/stats/test/correlation.sql_in 
b/src/ports/postgres/modules/stats/test/correlation.sql_in
index 8e17284..96ee2e2 100644
--- a/src/ports/postgres/modules/stats/test/correlation.sql_in
+++ b/src/ports/postgres/modules/stats/test/correlation.sql_in
@@ -122,6 +122,19 @@ SELECT assert(relative_error(humidity, -0.3502) < 0.0001, 
'calculated correlatio
 
 SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation 
value ' || id || 'does not match expected: ' || 0.4122) FROM 
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
 
+DROP TABLE IF EXISTS example_data_gr2_output, example_data_gr2_output_summary;
+SELECT correlation( 'example_data_gr2',
+                    'example_data_gr2_output',
+                    'temperature, humidity, id, new_col',
+                    FALSE,
+                    'gr,gr2', 2);
+
+SELECT assert(relative_error(temperature, 0.1606) < 0.0001, 'calculated 
correlation value: '|| temperature ||' does not match expected: ' || 0.1606) 
FROM example_data_gr2_output WHERE variable ='humidity' AND gr='1' AND gr2='1';
+
+SELECT assert(relative_error(humidity, -0.3502) < 0.0001, 'calculated 
correlation value ' || humidity || 'does not match expected: ' || -0.3502) FROM 
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
+
+SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation 
value ' || id || 'does not match expected: ' || 0.4122) FROM 
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
+
 DROP TABLE IF EXISTS example_data_gr2_output_covariance, 
example_data_gr2_output_covariance_summary;
 SELECT covariance( 'example_data_gr2',
                     'example_data_gr2_output_covariance',
@@ -134,3 +147,18 @@ SELECT assert(relative_error(temperature, 17.1500) < 
0.0001, 'calculated covaria
 SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated 
covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM 
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND 
gr2='1';
 
 SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance 
value ' || id || 'does not match expected: ' || 15.4688) FROM 
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND 
gr2='1';
+
+DROP TABLE IF EXISTS example_data_gr2_output_covariance, 
example_data_gr2_output_covariance_summary;
+SELECT covariance( 'example_data_gr2',
+                    'example_data_gr2_output_covariance',
+                    'temperature, humidity, id, new_col',
+                    FALSE,
+                    'gr,gr2', 50);
+
+SELECT assert(count(*) = 12, 'the output should have 3 groups X 4 variables = 
12 rows') FROM example_data_gr2_output_covariance;
+
+SELECT assert(relative_error(temperature, 17.1500) < 0.0001, 'calculated 
covariance value: '|| temperature ||' does not match expected: ' || 17.1500) 
FROM example_data_gr2_output_covariance WHERE variable ='humidity' AND gr='1' 
AND gr2='1';
+
+SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated 
covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM 
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND 
gr2='1';
+
+SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance 
value ' || id || 'does not match expected: ' || 15.4688) FROM 
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND 
gr2='1';

Reply via email to