This is an automated email from the ASF dual-hosted git repository.
okislal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
The following commit(s) were added to refs/heads/master by this push:
new 640392d Correlation: Process deconstruction in chunks for grouping
640392d is described below
commit 640392dcb8bdd57f83fb1706959936730b3e4531
Author: Orhan Kislal <[email protected]>
AuthorDate: Thu Jul 18 14:24:08 2019 -0700
Correlation: Process deconstruction in chunks for grouping
JIRA: MADLIB-1301
While deconstructing the correlation matrix to create the output table,
a big UNION ALL query was created, with one sub-query for each distinct
grouping value. This was causing the memory, stack, and performance
related issues.
The fix is to run multiple queries, with each query processing the
deconstruction of the correlation matrix for a limited number of
groups (we have defaulted the value to 10). This value can be
parameterized by the user with a newly introduced optional parameter
named `n_groups_per_run` for both correlation and covariance.
Closes #422
Co-authored-by: Nandish Jayaram <[email protected]>
---
src/ports/postgres/modules/stats/correlation.py_in | 89 ++++++++++++++++------
.../postgres/modules/stats/correlation.sql_in | 72 ++++++++++++++---
.../postgres/modules/stats/test/correlation.sql_in | 28 +++++++
3 files changed, 155 insertions(+), 34 deletions(-)
diff --git a/src/ports/postgres/modules/stats/correlation.py_in
b/src/ports/postgres/modules/stats/correlation.py_in
index 005de75..679d304 100644
--- a/src/ports/postgres/modules/stats/correlation.py_in
+++ b/src/ports/postgres/modules/stats/correlation.py_in
@@ -9,6 +9,7 @@ from time import time
import plpy
from utilities.control import MinWarning
+from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import get_table_qualified_col_str
from utilities.utilities import py_list_to_sql_string
@@ -20,10 +21,9 @@ from utilities.validate_args import get_cols_and_types
from utilities.validate_args import input_tbl_valid
from utilities.validate_args import output_tbl_valid
-
def correlation(schema_madlib, source_table, output_table,
target_cols, grouping_cols, get_cov=False,
- verbose=False, **kwargs):
+ verbose=False, n_groups_per_run=10, **kwargs):
"""
Populates an output table with the coefficients of correlation between
the columns in a source table
@@ -45,10 +45,12 @@ def correlation(schema_madlib, source_table, output_table,
_numeric_column_names, _nonnumeric_column_names =
_get_numeric_columns(source_table)
_target_cols = _analyze_target_cols(source_table, target_cols,
function_name)
+ if n_groups_per_run is None:
+ n_groups_per_run = 10
# Validate grouping_cols param
if grouping_cols:
_validate_grouping_cols_param(source_table, grouping_cols,
- function_name)
+ function_name, n_groups_per_run)
if _target_cols:
# prune all non-numeric column types from target columns
_existing_target_cols = []
@@ -82,7 +84,8 @@ def correlation(schema_madlib, source_table, output_table,
run_time = _populate_output_table(schema_madlib, source_table,
output_table,
_existing_target_cols, grouping_cols,
- function_name, get_cov, verbose)
+ n_groups_per_run, function_name, get_cov,
+ verbose)
# ---- Output message ----
output_text_list = ["Summary for '{0}' function".format(function_name)]
output_text_list.append("Output table = " + str(output_table))
@@ -121,7 +124,8 @@ def _validate_corr_arg(source_table, output_table,
function_name):
#
------------------------------------------------------------------------------
-def _validate_grouping_cols_param(source_table, grouping_cols, function_name):
+def _validate_grouping_cols_param(source_table, grouping_cols, function_name,
+ n_groups_per_run):
grouping_cols_list = split_quoted_delimited_str(grouping_cols)
# Column names that are used in summary table.
reserved_cols_in_summary_table = set(['method',
@@ -132,6 +136,8 @@ def _validate_grouping_cols_param(source_table,
grouping_cols, function_name):
'total_rows_processed'])
cols_in_tbl_valid(source_table, grouping_cols_list, function_name)
does_exclude_reserved(grouping_cols_list, reserved_cols_in_summary_table)
+ _assert(n_groups_per_run>0, "{0}: n_groups_per_run has to be greater than
0.".
+ format(function_name))
def _get_numeric_columns(source_table):
"""
@@ -175,8 +181,8 @@ def _analyze_target_cols(source_table, target_cols,
function_name):
def _populate_output_table(schema_madlib, source_table, output_table,
- col_names, grouping_cols, function_name,
- get_cov=False, verbose=False):
+ col_names, grouping_cols, n_groups_per_run,
+ function_name, get_cov=False, verbose=False):
"""
Creates a relation with the appropriate number of columns given a list of
column names and populates with the correlation coefficients. If the table
@@ -267,18 +273,17 @@ def _populate_output_table(schema_madlib, source_table,
output_table,
plpy.execute(create_temp_output_table_query)
# Prepare the query for converting the matrix into the lower triangle
- deconstruction_query = _create_deconstruction_query(schema_madlib,
+ deconstruction_query_list = _create_deconstruction_query(schema_madlib,
col_names,
grouping_cols,
temp_output_table,
- cor_mat)
+ cor_mat,
+ n_groups_per_run)
variable_subquery = unique_string(desp='variable_subq')
matrix_subquery = unique_string(desp='matrix_subq')
# create output table
- create_output_table_query = """
-
- CREATE TABLE {output_table} AS
+ select_deconstruct_query = """
SELECT *
FROM
(
@@ -291,9 +296,32 @@ def _populate_output_table(schema_madlib, source_table,
output_table,
{deconstruction_query}
) {matrix_subquery}
USING (column_position)
- """.format(num_cols=len(col_names), **locals())
+ """
+
+ # Create the output table.
+ # If there are no groupin cols, the query list will have a single
element.
+ # Therefore, we always execute the 0'th element of the list.
+ # If there are grouping cols, we loop through the rest of the list and
+ # execute them one by one.
+ create_output_table_query = """
+ CREATE TABLE {output_table} AS
+ {select_deconstruct_query}
+ """.format(**locals()).format(
+ deconstruction_query=deconstruction_query_list[0],
+ num_cols=len(col_names), **locals())
plpy.execute(create_output_table_query)
+ if grouping_cols:
+ for i in range(1, len(deconstruction_query_list)):
+ insert_to_output_table_query = """
+ INSERT INTO {output_table}
+ {select_deconstruct_query}
+ """.format(**locals()).format(
+ deconstruction_query=deconstruction_query_list[i],
+ num_cols=len(col_names), **locals())
+ plpy.execute(insert_to_output_table_query)
+
+
# create summary table
summary_table = add_postfix(output_table, "_summary")
create_summary_table_query = """
@@ -318,7 +346,7 @@ def _populate_output_table(schema_madlib, source_table,
output_table,
#
------------------------------------------------------------------------------
def _create_deconstruction_query(schema_madlib, col_names, grouping_cols,
- temp_output_table, cor_mat):
+ temp_output_table, cor_mat, n_groups_per_run):
"""
Creates the query to convert the matrix into the lower-traingular format.
@@ -330,9 +358,13 @@ def _create_deconstruction_query(schema_madlib, col_names,
grouping_cols,
the matrix to deconstruct
@param cor_mat Name of column that containss the matrix
to deconstruct
+ @param n_groups_per_run Number of groups to deconstruct in a single
+ sub-query
Returns:
- String (SQL querry for deconstructing the matrix)
+ List of Strings where each string is a SQL sub-query for deconstructing
+ the matrix. Each sub-query covers n_groups_per_run number of union all
+ queries
"""
# The matrix that holds the PCC computation must be converted to a
# table capturing all pair wise PCC values. That is done using
@@ -341,8 +373,9 @@ def _create_deconstruction_query(schema_madlib, col_names,
grouping_cols,
# construct the query accordingly.
COL_WIDTH = 10
- # split the col_names to equal size sets with newline between to prevent a
long query
- # Build a 2d array of the col_names, each inner array with COL_WIDTH
number of names.
+ # split the col_names to equal size sets with newline between to prevent a
+ # long query. Build a 2d array of the col_names, each inner array with
+ # COL_WIDTH number of names.
col_names_split = [col_names[x : x + COL_WIDTH]
for x in range(0, len(col_names), COL_WIDTH)]
variable_list_str = ', \n'.join([', '.join(
@@ -351,25 +384,33 @@ def _create_deconstruction_query(schema_madlib,
col_names, grouping_cols,
]) for cols_blob in col_names_split
])
-
+ # Fix for MADLIB-1301.
+ # Creating a huge chain of union all sub-queries might create problems with
+ # memory/stack/execution time. We divide them into chunks of 10 (or
+ # whatever the user decides) and insert these chunks one by one. So create
+ # a list of these sub-query chunks. If no grouping cols are provided, the
+ # list will contain only one sub-query that will be executed.
+ deconstruction_query_list = []
if grouping_cols:
grp_dict_rows = plpy.execute("SELECT {0} FROM {1}".format(
grouping_cols,
temp_output_table))
- deconstruction_queries_list = list()
+ deconstruction_grp_queries_list = list()
for grp_dict in grp_dict_rows:
where_condition = 'WHERE ' + ' AND '.join("{0} = '{1}'".format(k,
v)
for k, v in grp_dict.items())
select_grouping_cols = ' , '.join("'{1}' AS {0}".format(k, v)
for k, v in grp_dict.items())
- deconstruction_queries_list.append("""
+ deconstruction_grp_queries_list.append("""
SELECT {select_grouping_cols}, *
FROM {schema_madlib}.__deconstruct_lower_triangle(
(SELECT {cor_mat} FROM {temp_output_table}
{where_condition})
) AS deconstructed(column_position integer,
{variable_list_str})
""".format(**locals()))
- deconstruction_query = ' UNION ALL '.join(deconstruction_queries_list)
+ for i in range(0, len(deconstruction_grp_queries_list),
n_groups_per_run):
+ sublist = deconstruction_grp_queries_list[i:i+n_groups_per_run]
+ deconstruction_query_list.append(' UNION ALL '.join(sublist))
else:
deconstruction_query = """
SELECT * FROM
@@ -377,7 +418,8 @@ def _create_deconstruction_query(schema_madlib, col_names,
grouping_cols,
(SELECT {cor_mat} FROM {temp_output_table})
) AS deconstructed(column_position integer, {variable_list_str})
""".format(**locals())
- return deconstruction_query
+ deconstruction_query_list = [deconstruction_query]
+ return deconstruction_query_list
def correlation_help_message(schema_madlib, message, cov=False, **kwargs):
"""
@@ -397,7 +439,8 @@ SELECT {schema_madlib}.{func}
target_cols TEXT, -- Comma separated columns for which summary is
desired
-- (Default: '*' - produces result for all
columns)
verbose BOOLEAN, -- Verbosity
- grouping_cols TEXT -- Comma separated columns for grouping
+ grouping_cols TEXT, -- Comma separated columns for grouping
+ n_groups_per_run INTEGER -- number of groups to process at a time
)
-----------------------------------------------------------------------
Output will be a table with N+2 columns and N rows, where N is the number
diff --git a/src/ports/postgres/modules/stats/correlation.sql_in
b/src/ports/postgres/modules/stats/correlation.sql_in
index 1a1bdc5..4932385 100644
--- a/src/ports/postgres/modules/stats/correlation.sql_in
+++ b/src/ports/postgres/modules/stats/correlation.sql_in
@@ -53,7 +53,8 @@ correlation( source_table,
output_table,
target_cols,
verbose,
- grouping_cols
+ grouping_cols,
+ n_groups_per_run
)
</pre>
@@ -63,7 +64,8 @@ covariance( source_table,
output_table,
target_cols,
verbose,
- grouping_cols
+ grouping_cols,
+ n_groups_per_run
)
</pre>
@@ -133,6 +135,27 @@ If NULL or <tt>'*'</tt>, results are produced for all
numeric columns.</dd>
<dt>grouping_cols (optional)</dt>
<dd>TEXT, default: NULL. A comma-separated list of the columns to group
by.</dd>
+
+<dt>n_groups_per_run (optional)</dt>
+<dd>INTEGER, default: 10. Number of groups to process at a time.
+This parameter is ignored if 'grouping_cols' is not specified.
+Generally the default value will work fine, but there may be cases
+(see below) where you will want to experiment with it
+to reduce execution time and memory usage.
+</dd>
+@note
+This is a lower level parameter that can potentially be used to
+improve performance, but should be used with caution.
+It is designed to handle the case where you have a large number
+of groups.
+In general, increasing 'n_groups_per_run' means we
+construct a larger 'UNION ALL' query which uses more memory and may slow down
execution
+if it gets too big.
+If you have a large number of groups and a smaller data size, there may
+be benefits to increasing this value.
+Conversely, decreasing 'n_groups_per_run' means we issue
+more 'plpy.execute' commands. This increases overhead and can modestly
+affect the execution time.
</dl>
@@ -442,7 +465,8 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
output_table varchar, -- output table name
target_cols varchar, -- comma separated list of output cols (default =
'*')
verbose boolean, -- flag to determine verbosity
- grouping_cols varchar -- comma separated column names to be used for
grouping
+ grouping_cols varchar, -- comma separated column names to be used for
grouping
+ n_groups_per_run integer -- number of groups to process at a time
) RETURNS TEXT AS $$
PythonFunction(stats, correlation, correlation)
$$ LANGUAGE plpythonu VOLATILE
@@ -455,10 +479,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
source_table varchar, -- input table name
output_table varchar, -- output table name
target_cols varchar, -- comma separated list of output cols (default =
'*')
+ verbose boolean, -- flag to determine verbosity
+ grouping_cols varchar -- comma separated column names to be used for
grouping
+)
+RETURNS TEXT AS $$
+ select MADLIB_SCHEMA.correlation($1, $2, $3, $4, $5, 10)
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
+ source_table varchar, -- input table name
+ output_table varchar, -- output table name
+ target_cols varchar, -- comma separated list of output cols (default =
'*')
verbose boolean -- flag to determine verbosity
)
RETURNS TEXT AS $$
- select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL)
+ select MADLIB_SCHEMA.correlation($1, $2, $3, $4, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -468,7 +504,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
target_cols varchar -- comma separated list of output cols (default =
'*')
)
RETURNS TEXT AS $$
- select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL)
+ select MADLIB_SCHEMA.correlation($1, $2, $3, FALSE, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -477,7 +513,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.correlation(
source_table varchar, -- input table name
output_table varchar -- output table name
) RETURNS TEXT AS $$
- select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL)
+ select MADLIB_SCHEMA.correlation($1, $2, NULL, FALSE, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -526,12 +562,14 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
output_table varchar, -- output table name
target_cols varchar, -- comma separated list of output cols (default =
'*')
verbose boolean, -- flag to determine verbosity
- grouping_cols varchar -- comma separated column names to be used for
grouping
+ grouping_cols varchar, -- comma separated column names to be used for
grouping
+ n_groups_per_run integer -- number of groups to process at a time
) RETURNS TEXT AS $$
PythonFunctionBodyOnly(`stats', `correlation')
with AOControl(False):
return correlation.correlation(schema_madlib, source_table,
output_table,
- target_cols, grouping_cols, True,
verbose)
+ target_cols, grouping_cols, True,
verbose,
+ n_groups_per_run)
$$ LANGUAGE plpythonu VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -542,10 +580,22 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
source_table varchar, -- input table name
output_table varchar, -- output table name
target_cols varchar, -- comma separated list of output cols (default =
'*')
+ verbose BOOLEAN, -- flag to determine verbosity
+ grouping_cols varchar -- comma separated column names to be used for
grouping
+
+)
+RETURNS TEXT AS $$
+ select MADLIB_SCHEMA.covariance($1, $2, $3, $4, $5, 10)
+$$ LANGUAGE sql VOLATILE
+m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
+ source_table varchar, -- input table name
+ output_table varchar, -- output table name
+ target_cols varchar, -- comma separated list of output cols (default =
'*')
verbose BOOLEAN -- flag to determine verbosity
)
RETURNS TEXT AS $$
- select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL)
+ select MADLIB_SCHEMA.covariance($1, $2, $3, $4, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -555,7 +605,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
target_cols varchar -- comma separated list of output cols (default =
'*')
)
RETURNS TEXT AS $$
- select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL)
+ select MADLIB_SCHEMA.covariance($1, $2, $3, FALSE, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
@@ -564,7 +614,7 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.covariance(
source_table varchar, -- input table name
output_table varchar -- output table name
) RETURNS TEXT AS $$
- select MADLIB_SCHEMA.covariance($1, $2, NULL, NULL)
+ select MADLIB_SCHEMA.covariance($1, $2, NULL, FALSE, NULL, 10)
$$ LANGUAGE sql VOLATILE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `');
diff --git a/src/ports/postgres/modules/stats/test/correlation.sql_in
b/src/ports/postgres/modules/stats/test/correlation.sql_in
index 8e17284..96ee2e2 100644
--- a/src/ports/postgres/modules/stats/test/correlation.sql_in
+++ b/src/ports/postgres/modules/stats/test/correlation.sql_in
@@ -122,6 +122,19 @@ SELECT assert(relative_error(humidity, -0.3502) < 0.0001,
'calculated correlatio
SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation
value ' || id || 'does not match expected: ' || 0.4122) FROM
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
+DROP TABLE IF EXISTS example_data_gr2_output, example_data_gr2_output_summary;
+SELECT correlation( 'example_data_gr2',
+ 'example_data_gr2_output',
+ 'temperature, humidity, id, new_col',
+ FALSE,
+ 'gr,gr2', 2);
+
+SELECT assert(relative_error(temperature, 0.1606) < 0.0001, 'calculated
correlation value: '|| temperature ||' does not match expected: ' || 0.1606)
FROM example_data_gr2_output WHERE variable ='humidity' AND gr='1' AND gr2='1';
+
+SELECT assert(relative_error(humidity, -0.3502) < 0.0001, 'calculated
correlation value ' || humidity || 'does not match expected: ' || -0.3502) FROM
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
+
+SELECT assert(relative_error(id, 0.4122) < 0.0001, 'calculated correlation
value ' || id || 'does not match expected: ' || 0.4122) FROM
example_data_gr2_output WHERE variable ='new_col' AND gr='1' AND gr2='1';
+
DROP TABLE IF EXISTS example_data_gr2_output_covariance,
example_data_gr2_output_covariance_summary;
SELECT covariance( 'example_data_gr2',
'example_data_gr2_output_covariance',
@@ -134,3 +147,18 @@ SELECT assert(relative_error(temperature, 17.1500) <
0.0001, 'calculated covaria
SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated
covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND
gr2='1';
SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance
value ' || id || 'does not match expected: ' || 15.4688) FROM
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND
gr2='1';
+
+DROP TABLE IF EXISTS example_data_gr2_output_covariance,
example_data_gr2_output_covariance_summary;
+SELECT covariance( 'example_data_gr2',
+ 'example_data_gr2_output_covariance',
+ 'temperature, humidity, id, new_col',
+ FALSE,
+ 'gr,gr2', 50);
+
+SELECT assert(count(*) = 12, 'the output should have 3 groups X 4 variables =
12 rows') FROM example_data_gr2_output_covariance;
+
+SELECT assert(relative_error(temperature, 17.1500) < 0.0001, 'calculated
covariance value: '|| temperature ||' does not match expected: ' || 17.1500)
FROM example_data_gr2_output_covariance WHERE variable ='humidity' AND gr='1'
AND gr2='1';
+
+SELECT assert(relative_error(humidity, -28.7000) < 0.0001, 'calculated
covariance value ' || humidity || 'does not match expected: ' || -28.7000) FROM
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND
gr2='1';
+
+SELECT assert(relative_error(id, 15.4688) < 0.0001, 'calculated covariance
value ' || id || 'does not match expected: ' || 15.4688) FROM
example_data_gr2_output_covariance WHERE variable ='new_col' AND gr='1' AND
gr2='1';