This is an automated email from the ASF dual-hosted git repository. okislal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
The following commit(s) were added to refs/heads/master by this push: new 640392d Correlation: Process deconstruction in chunks for grouping 640392d is described below commit 640392dcb8bdd57f83fb1706959936730b3e4531 Author: Orhan Kislal <okis...@pivotal.io> AuthorDate: Thu Jul 18 14:24:08 2019 -0700 Correlation: Process deconstruction in chunks for grouping JIRA: MADLIB-1301 While deconstructing the correlation matrix to create the output table, a big UNION ALL query was created, with one sub-query for each distinct grouping value. This was causing the memory, stack, and performance related issues. The fix is to run multiple queries, with each query processing the deconstruction of the correlation matrix for a limited number of groups (we have defaulted the value to 10). This value can be parameterized by the user with a newly introduced optional parameter named `n_groups_per_run` for both correlation and covariance. Closes #422 Co-authored-by: Nandish Jayaram <njaya...@apache.org> --- src/ports/postgres/modules/stats/correlation.py_in | 89 ++++++++++++++++------ .../postgres/modules/stats/correlation.sql_in | 72 ++++++++++++++--- .../postgres/modules/stats/test/correlation.sql_in | 28 +++++++ 3 files changed, 155 insertions(+), 34 deletions(-) diff --git a/src/ports/postgres/modules/stats/correlation.py_in b/src/ports/postgres/modules/stats/correlation.py_in index 005de75..679d304 100644 --- a/src/ports/postgres/modules/stats/correlation.py_in +++ b/src/ports/postgres/modules/stats/correlation.py_in @@ -9,6 +9,7 @@ from time import time import plpy from utilities.control import MinWarning +from utilities.utilities import _assert from utilities.utilities import add_postfix from utilities.utilities import get_table_qualified_col_str from utilities.utilities import py_list_to_sql_string @@ -20,10 +21,9 @@ from utilities.validate_args import get_cols_and_types from utilities.validate_args import input_tbl_valid from utilities.validate_args import output_tbl_valid - def correlation(schema_madlib, source_table, output_table, target_cols, grouping_cols, get_cov=False, - verbose=False, **kwargs): + verbose=False, n_groups_per_run=10, **kwargs): """ Populates an output table with the coefficients of correlation between the columns in a source table @@ -45,10 +45,12 @@ def correlation(schema_madlib, source_table, output_table, _numeric_column_names, _nonnumeric_column_names = _get_numeric_columns(source_table) _target_cols = _analyze_target_cols(source_table, target_cols, function_name) + if n_groups_per_run is None: + n_groups_per_run = 10 # Validate grouping_cols param if grouping_cols: _validate_grouping_cols_param(source_table, grouping_cols, - function_name) + function_name, n_groups_per_run) if _target_cols: # prune all non-numeric column types from target columns _existing_target_cols = [] @@ -82,7 +84,8 @@ def correlation(schema_madlib, source_table, output_table, run_time = _populate_output_table(schema_madlib, source_table, output_table, _existing_target_cols, grouping_cols, - function_name, get_cov, verbose) + n_groups_per_run, function_name, get_cov, + verbose) # ---- Output message ---- output_text_list = ["Summary for '{0}' function".format(function_name)] output_text_list.append("Output table = " + str(output_table)) @@ -121,7 +124,8 @@ def _validate_corr_arg(source_table, output_table, function_name): # ------------------------------------------------------------------------------ -def _validate_grouping_cols_param(source_table, grouping_cols, function_name): +def _validate_grouping_cols_param(source_table, grouping_cols, function_name, + n_groups_per_run): grouping_cols_list = split_quoted_delimited_str(grouping_cols) # Column names that are used in summary table. reserved_cols_in_summary_table = set(['method', @@ -132,6 +136,8 @@ def _validate_grouping_cols_param(source_table, grouping_cols, function_name): 'total_rows_processed']) cols_in_tbl_valid(source_table, grouping_cols_list, function_name) does_exclude_reserved(grouping_cols_list, reserved_cols_in_summary_table) + _assert(n_groups_per_run>0, "{0}: n_groups_per_run has to be greater than 0.". + format(function_name)) def _get_numeric_columns(source_table): """ @@ -175,8 +181,8 @@ def _analyze_target_cols(source_table, target_cols, function_name): def _populate_output_table(schema_madlib, source_table, output_table, - col_names, grouping_cols, function_name, - get_cov=False, verbose=False): + col_names, grouping_cols, n_groups_per_run, + function_name, get_cov=False, verbose=False): """ Creates a relation with the appropriate number of columns given a list of column names and populates with the correlation coefficients. If the table @@ -267,18 +273,17 @@ def _populate_output_table(schema_madlib, source_table, output_table, plpy.execute(create_temp_output_table_query) # Prepare the query for converting the matrix into the lower triangle - deconstruction_query = _create_deconstruction_query(schema_madlib, + deconstruction_query_list = _create_deconstruction_query(schema_madlib, col_names, grouping_cols, temp_output_table, - cor_mat) + cor_mat, + n_groups_per_run) variable_subquery = unique_string(desp='variable_subq') matrix_subquery = unique_string(desp='matrix_subq') # create output table - create_output_table_query = """ - - CREATE TABLE {output_table} AS + select_deconstruct_query = """ SELECT * FROM ( @@ -291,9 +296,32 @@ def _populate_output_table(schema_madlib, source_table, output_table, {deconstruction_query} ) {matrix_subquery} USING (column_position) - """.format(num_cols=len(col_names), **locals()) + """ + + # Create the output table. + # If there are no groupin cols, the query list will have a single element. + # Therefore, we always execute the 0'th element of the list. + # If there are grouping cols, we loop through the rest of the list and + # execute them one by one. + create_output_table_query = """ + CREATE TABLE {output_table} AS + {select_deconstruct_query} + """.format(**locals()).format( + deconstruction_query=deconstruction_query_list[0], + num_cols=len(col_names), **locals()) plpy.execute(create_output_table_query) + if grouping_cols: + for i in range(1, len(deconstruction_query_list)): + insert_to_output_table_query = """ + INSERT INTO {output_table} + {select_deconstruct_query} + """.format(**locals()).format( + deconstruction_query=deconstruction_query_list[i], + num_cols=len(col_names), **locals()) + plpy.execute(insert_to_output_table_query) + + # create summary table summary_table = add_postfix(output_table, "_summary") create_summary_table_query = """ @@ -318,7 +346,7 @@ def _populate_output_table(schema_madlib, source_table, output_table, # ------------------------------------------------------------------------------ def _create_deconstruction_query(schema_madlib, col_names, grouping_cols, - temp_output_table, cor_mat): + temp_output_table, cor_mat, n_groups_per_run): """ Creates the query to convert the matrix into the lower-traingular format. @@ -330,9 +358,13 @@ def _create_deconstruction_query(schema_madlib, col_names, grouping_cols, the matrix to deconstruct @param cor_mat Name of column that containss the matrix to deconstruct + @param n_groups_per_run Number of groups to deconstruct in a single + sub-query Returns: - String (SQL querry for deconstructing the matrix) + List of Strings where each string is a SQL sub-query for deconstructing + the matrix. Each sub-query covers n_groups_per_run number of union all + queries """ # The matrix that holds the PCC computation must be converted to a # table capturing all pair wise PCC values. That is done using @@ -341,8 +373,9 @@ def _create_deconstruction_query(schema_madlib, col_names, grouping_cols, # construct the query accordingly. COL_WIDTH = 10 - # split the col_names to equal size sets with newline between to prevent a long query - # Build a 2d array of the col_names, each inner array with COL_WIDTH number of names. + # split the col_names to equal size sets with newline between to prevent a + # long query. Build a 2d array of the col_names, each inner array with + # COL_WIDTH number of names. col_names_split = [col_names[x : x + COL_WIDTH] for x in range(0, len(col_names), COL_WIDTH)] variable_list_str = ', \n'.join([', '.join( @@ -351,25 +384,33 @@ def _create_deconstruction_query(schema_madlib, col_names, grouping_cols, ]) for cols_blob in col_names_split ]) - + # Fix for MADLIB-1301. + # Creating a huge chain of union all sub-queries might create problems with + # memory/stack/execution time. We divide them into chunks of 10 (or + # whatever the user decides) and insert these chunks one by one. So create + # a list of these sub-query chunks. If no grouping cols are provided, the + # list will contain only one sub-query that will be executed. + deconstruction_query_list = [] if grouping_cols: grp_dict_rows = plpy.execute("SELECT {0} FROM {1}".format( grouping_cols, temp_output_table)) - deconstruction_queries_list = list() + deconstruction_grp_queries_list = list() for grp_dict in grp_dict_rows: where_condition = 'WHERE ' + ' AND '.join("{0} = '{1}'".format(k, v) for k, v in grp_dict.items()) select_grouping_cols = ' , '.join("'{1}' AS {0}".format(k, v) for k, v in grp_dict.items()) - deconstruction_queries_list.append(""" + deconstruction_grp_queries_list.append(""" SELECT {select_grouping_cols}, * FROM {schema_madlib}.__deconstruct_lower_triangle( (SELECT {cor_mat} FROM {temp_output_table} {where_condition}) ) AS deconstructed(column_position integer, {variable_list_str}) """.format(**locals())) - deconstruction_query = ' UNION ALL '.join(deconstruction_queries_list) + for i in range(0, len(deconstruction_grp_queries_list), n_groups_per_run): + sublist = deconstruction_grp_queries_list[i:i+n_groups_per_run] + deconstruction_query_list.append(' UNION ALL '.join(sublist)) else: deconstruction_query = """ SELECT * FROM @@ -377,7 +418,8 @@ def _create_deconstruction_query(schema_madlib, col_names, grouping_cols, (SELECT {cor_mat} FROM {temp_output_table}) ) AS deconstructed(column_position integer, {variable_list_str}) """.format(**locals()) - return deconstruction_query + deconstruction_query_list = [deconstruction_query] + return deconstruction_query_list def correlation_help_message(schema_madlib, message, cov=False, **kwargs): """ @@ -397,7 +439,8 @@ SELECT {schema_madlib}.{func} target_cols TEXT, -- Comma separated columns for which summary is desired -- (Default: '*' - produces result for all columns) verbose BOOLEAN, -- Verbosity - grouping_cols TEXT -- Comma separated columns for grouping + grouping_cols TEXT, -- Comma separated columns for grouping + n_groups_per_run INTEGER -- number of groups to process at a time ) ----------------------------------------------------------------------- Output will be a table with N+2 columns and N rows, where N is the number diff --git a/src/ports/postgres/modules/stats/correlation.sql_in b/src/ports/postgres/modules/stats/correlation.sql_in index 1a1bdc5..4932385 100644 --- a/src/ports/postgres/modules/stats/correlation.sql_in +++ b/src/ports/postgres/modules/stats/correlation.sql_in @@ -53,7 +53,8 @@ correlation( source_table, output_table, target_cols, verbose, - grouping_cols + grouping_cols, + n_groups_per_run ) </pre> @@ -63,7 +64,8 @@ covariance( source_table, output_table, target_cols, verbose, - grouping_cols + grouping_cols, + n_groups_per_run ) </pre> @@ -133,6 +135,27 @@ If NULL or <tt>'*'</tt>, results are produced for all numeric columns.</dd> <dt>grouping_cols (optional)</dt> <dd>TEXT, default: NULL. A comma-separated list of the columns to group by.</dd> + +<dt>n_groups_per_run (optional)</dt> +<dd>INTEGER, default: 10. Number of groups to process at a time. +This parameter is ignored if 'grouping_cols' is not specified. +Generally the default value will work fine, but there may be cases +(see below) where you will want to experiment with it +to reduce execution time and memory usage. +</dd> +@note +This is a lower level parameter that can potentially be used to +improve performance, but should be used with caution. +It is designed to handle the case where you have a large number +of groups. +In general, increasing 'n_groups_per_run' means we +construct a larger 'UNION ALL' query which uses more memory and may slow down execution +if it gets too big. +If you have a large number of groups and a smaller data size, there may +be benefits to increasing this value. +Conversely, decreasing 'n_groups_per_run' means we issue +more 'plpy.execute' commands. This increases overhead and can modestly +affect the execution time. </dl> @@ -442,7 +465,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation( output_table varchar, -- output table name target_cols varchar, -- comma separated list of output cols (default = '*') verbose boolean, -- flag to determine verbosity - grouping_cols varchar -- comma separated column names to be used for grouping + grouping_cols varchar, -- comma separated column names to be used for grouping + n_groups_per_run integer -- number of groups to process at a time ) RETURNS TEXT AS $$ PythonFunction(stats, correlation, correlation) $$ LANGUAGE plpythonu VOLATILE @@ -455,10 +479,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation( source_table varchar, -- input table name output_table varchar, -- output table name target_cols varchar, -- comma separated list of output cols (default = '*') + verbose boolean, -- flag to determine verbosity + grouping_cols varchar -- comma separated column names to be used for grouping +) +RETURNS TEXT AS $$ + select MADLIB_SCHEMA.correlation($1, $2, $3, $4, $5, 10) +$$ LANGUAGE sql VOLATILE +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); + +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation( + source_table varchar, -- input table name + output_table varchar, -- output table name + target_cols varchar, -- comma separated list of output cols (default = '*') verbose boolean -- flag to determine verbosity ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL) + select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -468,7 +504,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation( target_cols varchar -- comma separated list of output cols (default = '*') ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL) + select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -477,7 +513,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation( source_table varchar, -- input table name output_table varchar -- output table name ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL) + select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -526,12 +562,14 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance( output_table varchar, -- output table name target_cols varchar, -- comma separated list of output cols (default = '*') verbose boolean, -- flag to determine verbosity - grouping_cols varchar -- comma separated column names to be used for grouping + grouping_cols varchar, -- comma separated column names to be used for grouping + n_groups_per_run integer -- number of groups to process at a time ) RETURNS TEXT AS $$ PythonFunctionBodyOnly(`stats', `correlation') with AOControl(False): return correlation.correlation(schema_madlib, source_table, output_table, - target_cols, grouping_cols, True, verbose) + target_cols, grouping_cols, True, verbose, + n_groups_per_run) $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -542,10 +580,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance( source_table varchar, -- input table name output_table varchar, -- output table name target_cols varchar, -- comma separated list of output cols (default = '*') + verbose BOOLEAN, -- flag to determine verbosity + grouping_cols varchar -- comma separated column names to be used for grouping + +) +RETURNS TEXT AS $$ + select MADLIB_SCHEMA.covariance($1, $2, $3, $4, $5, 10) +$$ LANGUAGE sql VOLATILE +m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); +CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance( + source_table varchar, -- input table name + output_table varchar, -- output table name + target_cols varchar, -- comma separated list of output cols (default = '*') verbose BOOLEAN -- flag to determine verbosity ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL) + select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -555,7 +605,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance( target_cols varchar -- comma separated list of output cols (default = '*') ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL) + select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -564,7 +614,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance( source_table varchar, -- input table name output_table varchar -- output table name ) RETURNS TEXT AS $$ - select MADLIB_SCHEMA.covariance($1, $2, NULL, NULL) + select MADLIB_SCHEMA.covariance($1, $2, NULL, FALSE, NULL, 10) $$ LANGUAGE sql VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); diff --git a/src/ports/postgres/modules/stats/test/correlation.sql_in b/src/ports/postgres/modules/stats/test/correlation.sql_in index 8e17284..96ee2e2 100644 --- a/src/ports/postgres/modules/stats/test/correlation.sql_in +++ b/src/ports/postgres/modules/stats/test/correlation.sql_in @@ -122,6 +122,19 @@ SELECT assert(relative_error(humidity, -0.3502) < 0.0001, 'calculated correlatio SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation value ' || id || 'does not match expected: ' || 0.4122) FROM example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1'; +DROP TABLE IF EXISTS example_data_gr2_output, example_data_gr2_output_summary; +SELECT correlation( 'example_data_gr2', + 'example_data_gr2_output', + 'temperature, humidity, id, new_col', + FALSE, + 'gr,gr2', 2); + +SELECT assert(relative_error(temperature, 0.1606) < 0.0001, 'calculated correlation value: '|| temperature ||' does not match expected: ' || 0.1606) FROM example_data_gr2_output WHERE variable ='humidity' AND gr='1' AND gr2='1'; + +SELECT assert(relative_error(humidity, -0.3502) < 0.0001, 'calculated correlation value ' || humidity || 'does not match expected: ' || -0.3502) FROM example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1'; + +SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation value ' || id || 'does not match expected: ' || 0.4122) FROM example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1'; + DROP TABLE IF EXISTS example_data_gr2_output_covariance, example_data_gr2_output_covariance_summary; SELECT covariance( 'example_data_gr2', 'example_data_gr2_output_covariance', @@ -134,3 +147,18 @@ SELECT assert(relative_error(temperature, 17.1500) < 0.0001, 'calculated covaria SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND gr2='1'; SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance value ' || id || 'does not match expected: ' || 15.4688) FROM example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND gr2='1'; + +DROP TABLE IF EXISTS example_data_gr2_output_covariance, example_data_gr2_output_covariance_summary; +SELECT covariance( 'example_data_gr2', + 'example_data_gr2_output_covariance', + 'temperature, humidity, id, new_col', + FALSE, + 'gr,gr2', 50); + +SELECT assert(count(*) = 12, 'the output should have 3 groups X 4 variables = 12 rows') FROM example_data_gr2_output_covariance; + +SELECT assert(relative_error(temperature, 17.1500) < 0.0001, 'calculated covariance value: '|| temperature ||' does not match expected: ' || 17.1500) FROM example_data_gr2_output_covariance WHERE variable ='humidity' AND gr='1' AND gr2='1'; + +SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND gr2='1'; + +SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance value ' || id || 'does not match expected: ' || 15.4688) FROM example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND gr2='1';