This is an automated email from the ASF dual-hosted git repository.
domino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/madlib.git
The following commit(s) were added to refs/heads/master by this push:
new 85eba29 DL: Improve performance of mini-batch preprocessor (#467)
85eba29 is described below
commit 85eba2968b5651b6242a398df569b1f3f6412703
Author: Domino Valdano <[email protected]>
AuthorDate: Thu Jan 9 18:06:56 2020 -0800
DL: Improve performance of mini-batch preprocessor (#467)
JIRA MADLIB-1334
This commit adds the following optimizations to minibatch preprocessor:
- Skip normalization if normalizing contant is 1.0
- Split the batching query to generate buffer_id's (based of row_id)
without moving around any data. Previously, calling `ROW_NUMBER()
OVER()` to add row_id's to the table was causing the data to be gathered
on the master node and then numbering the rows, which for large datasets
would be taking most of the time.
- Separate out the JOIN (called for even distribution) as well as
converting to bytea from the batching query. This avoids any VMEM limit
issues.
- num_buffers gets rounded up to the nearest multiple of num_segments
for even distribution across buffers on segments.
- Add new C function `array_to_bytea()` to convert array to bytea, and
some tests for it. This is much faster than the python version we were
using, speeding up the query significantly.
Additionally, this commit adds a new function `plpy_execute_debug()`
in the utilities module that prints EXPLAIN plans and execution time
for debugging a specific query.
Co-authored-by: Ekta Khanna <[email protected]>
Co-authored-by: Ekta Khanna <[email protected]>
---
methods/array_ops/src/pg_gp/array_ops.c | 30 ++
methods/array_ops/src/pg_gp/array_ops.sql_in | 12 +
methods/array_ops/src/pg_gp/test/array_ops.sql_in | 71 +++
.../deep_learning/input_data_preprocessor.py_in | 520 +++++++++++++++------
.../deep_learning/madlib_keras_helper.py_in | 1 -
.../deep_learning/madlib_keras_validator.py_in | 3 +-
.../test/input_data_preprocessor.sql_in | 136 ++++--
.../test/madlib_keras_cifar.setup.sql_in | 4 +-
.../unit_tests/test_input_data_preprocessor.py_in | 8 +
.../utilities/minibatch_preprocessing.py_in | 7 +-
.../postgres/modules/utilities/utilities.py_in | 21 +
11 files changed, 617 insertions(+), 196 deletions(-)
diff --git a/methods/array_ops/src/pg_gp/array_ops.c
b/methods/array_ops/src/pg_gp/array_ops.c
index 48880a6..a842a60 100644
--- a/methods/array_ops/src/pg_gp/array_ops.c
+++ b/methods/array_ops/src/pg_gp/array_ops.c
@@ -2107,3 +2107,33 @@ General_Array_to_Cumulative_Array(
return pgarray;
}
+
+PG_FUNCTION_INFO_V1(array_to_bytea);
+Datum
+array_to_bytea(PG_FUNCTION_ARGS)
+{
+ ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
+ Oid element_type = ARR_ELEMTYPE(a);
+ TypeCacheEntry * TI;
+ int data_length, nitems, items_avail;
+
+ data_length = VARSIZE(a) - ARR_DATA_OFFSET(a);
+ nitems = ArrayGetNItems(ARR_NDIM(a), ARR_DIMS(a));
+ TI = lookup_type_cache(element_type, TYPECACHE_CMP_PROC_FINFO);
+ items_avail = (data_length / TI->typlen);
+
+ if (nitems > items_avail) {
+ elog(ERROR, "Unexpected end of array: expected %d elements but
received only %d", nitems, data_length);
+ } else if (nitems < items_avail) {
+ elog(WARNING, "to_bytea(): Ignoring %d extra elements after end of
%d-element array!", items_avail - nitems, nitems);
+ data_length = (nitems * TI->typlen);
+ }
+
+ bytea *ba = palloc(VARHDRSZ + data_length);
+
+ SET_VARSIZE(ba, VARHDRSZ + data_length);
+
+ memcpy(((char *)ba) + VARHDRSZ, ARR_DATA_PTR(a), data_length);
+
+ PG_RETURN_BYTEA_P(ba);
+}
diff --git a/methods/array_ops/src/pg_gp/array_ops.sql_in
b/methods/array_ops/src/pg_gp/array_ops.sql_in
index e1aa368..c1ec853 100644
--- a/methods/array_ops/src/pg_gp/array_ops.sql_in
+++ b/methods/array_ops/src/pg_gp/array_ops.sql_in
@@ -733,3 +733,15 @@ ORDER BY 1,2;
""".format(schema_madlib='MADLIB_SCHEMA')
$$ LANGUAGE PLPYTHONU IMMUTABLE
m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `CONTAINS SQL', `');
+
+m4_changequote(<!, !>)
+m4_ifelse(__PORT__ __DBMS_VERSION_MAJOR__, <!GREENPLUM 4!>,,
+<!
+CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.array_to_bytea(ANYARRAY)
+RETURNS BYTEA
+AS
+'MODULE_PATHNAME', 'array_to_bytea'
+LANGUAGE C IMMUTABLE
+!>)
+m4_changequote(`,')
+m4_ifdef(__HAS_FUNCTION_PROPERTIES__', `NO SQL', `');
diff --git a/methods/array_ops/src/pg_gp/test/array_ops.sql_in
b/methods/array_ops/src/pg_gp/test/array_ops.sql_in
index b05d0b7..511564f 100644
--- a/methods/array_ops/src/pg_gp/test/array_ops.sql_in
+++ b/methods/array_ops/src/pg_gp/test/array_ops.sql_in
@@ -7,6 +7,9 @@
-- all objects created in the default schema will be cleaned-up outside.
---------------------------------------------------------------------------
+m4_include(`SQLCommon.m4')
+m4_changequote(`<!', `!>')
+
---------------------------------------------------------------------------
-- Setup:
---------------------------------------------------------------------------
@@ -307,3 +310,71 @@ FROM (
unnest_2d_tbl05_groundtruth t2
USING (id,unnest_row_id)
) t3;
+
+-- TESTING array_to_bytea() function - skip for gpdb 4.3
+m4_ifelse(__PORT__ __DBMS_VERSION_MAJOR__, <!GREENPLUM 4!>,,
+<!
+
+-- create input table ( n = 3 x 5 x 7 dim SMALLINT[], r = 2 x 3 x 5 dim
REAL[] )
+DROP TABLE IF EXISTS array_input_tbl;
+CREATE TABLE array_input_tbl (id SMALLINT, n SMALLINT[], r REAL[]);
+INSERT INTO array_input_tbl SELECT generate_series(1, 10);
+SELECT id, count(*), array_agg(n) from (select id, unnest(n) as n from
array_input_tbl) u group by id order by id;
+UPDATE array_input_tbl SET
+ n=array_fill(2*id, ARRAY[3, 5, 7]),
+ r=array_fill(id + 0.4, array[2, 3, 5]);
+
+-- create flattened input table
+DROP TABLE IF EXISTS flat_array_input_tbl;
+CREATE TABLE flat_array_input_tbl (id SMALLINT, n SMALLINT[], n_length
SMALLINT, r REAL[], r_length SMALLINT);
+INSERT INTO flat_array_input_tbl
+ SELECT n.id, n, n_length, r, r_length
+ FROM
+ (
+ SELECT id, array_agg(n) AS n, 2*COUNT(*) AS n_length
+ FROM
+ (
+ SELECT id, unnest(n) AS n FROM array_input_tbl
+ ) n_values
+ GROUP BY id
+ ) n
+ JOIN
+ (
+ SELECT id, array_agg(r) AS r, 4*COUNT(*) AS r_length
+ FROM
+ (
+ SELECT id, unnest(r) AS r FROM array_input_tbl
+ ) r_values
+ GROUP BY id
+ ) r
+ USING (id);
+
+CREATE TABLE bytea_tbl AS SELECT id, array_to_bytea(n) AS n, array_to_bytea(r)
AS r FROM array_input_tbl;
+
+ -- verify lengths of BYTEA output is correct for SMALLINT & REAL arrays
+ SELECT assert(
+ length(o.n) = i.n_length AND length(o.r) = i.r_length,
+ 'array_to_bytea() returned incorrect lengths:\n' ||
+ ' Expected length(n) = ' || n_length::TEXT || ', got ' ||
length(o.n) ||
+ ' Expected ' || r_length::TEXT || ', got ' || length(o.r)
+ )
+ FROM flat_array_input_tbl i JOIN bytea_tbl o USING (id);
+
+ -- convert BYTEA back to flat arrays of SMALLINT's & REAL's
+
+ CREATE TABLE array_output_tbl AS
+ SELECT
+ id,
+ convert_bytea_to_smallint_array(n) AS n,
+ convert_bytea_to_real_array(r) AS r
+ FROM bytea_tbl;
+
+ -- verify that data in above table matches flattened input table exactly
+ SELECT assert(
+ i.n = o.n AND i.r = o.r,
+ 'output of array_to_bytea() does not convert back to flattened input'
+ )
+ FROM flat_array_input_tbl i JOIN array_output_tbl o USING (id);
+!>)
+
+m4_changequote(,)
diff --git
a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
index 757a5bc..351e6a5 100644
--- a/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
+++ b/src/ports/postgres/modules/deep_learning/input_data_preprocessor.py_in
@@ -29,6 +29,8 @@ from internal.db_utils import get_distinct_col_levels
from internal.db_utils import quote_literal
from internal.db_utils import get_product_of_dimensions
from utilities.minibatch_preprocessing import MiniBatchBufferSizeCalculator
+from utilities.control import OptimizerControl
+from utilities.control import HashaggControl
from utilities.utilities import _assert
from utilities.utilities import add_postfix
from utilities.utilities import is_platform_pg
@@ -46,6 +48,7 @@ from utilities.validate_args import input_tbl_valid
from utilities.validate_args import get_expr_type
from madlib_keras_helper import *
+import time
NUM_CLASSES_COLNAME = "num_classes"
@@ -59,9 +62,9 @@ class InputDataPreprocessorDL(object):
self.dependent_varname = dependent_varname
self.independent_varname = independent_varname
self.buffer_size = buffer_size
- self.normalizing_const = normalizing_const if normalizing_const is not
None else DEFAULT_NORMALIZING_CONST
+ self.normalizing_const = normalizing_const
self.num_classes = num_classes
- self.distribution_rules = distribution_rules if distribution_rules
else DEFAULT_GPU_CONFIG
+ self.distribution_rules = distribution_rules if distribution_rules
else 'all_segments'
self.module_name = module_name
self.output_summary_table = None
self.dependent_vartype = None
@@ -73,7 +76,6 @@ class InputDataPreprocessorDL(object):
## Validating input args prior to using them in
_set_validate_vartypes()
self._validate_args()
self._set_validate_vartypes()
- self.num_of_buffers = self._get_num_buffers()
self.dependent_levels = None
# The number of padded zeros to include in 1-hot vector
self.padding_size = 0
@@ -199,160 +201,382 @@ class InputDataPreprocessorDL(object):
3) One-hot encodes the dependent variable.
4) Minibatches the one-hot encoded dependent variable.
"""
+ # setup for 1-hot encoding
self._set_one_hot_encoding_variables()
- # Create a temp table that has independent var normalized.
- norm_tbl = unique_string(desp='normalized')
-
- # Always one-hot encode the dependent var. For now, we are assuming
- # that input_preprocessor_dl will be used only for deep
- # learning and mostly for classification. So make a strong
- # assumption that it is only for classification, so one-hot
- # encode the dep var, unless it's already a numeric array in
- # which case we assume it's already one-hot encoded.
- one_hot_dep_var_array_expr = \
- self.get_one_hot_encoded_dep_var_expr()
- order_by_clause = " ORDER BY RANDOM() " if order_by_random else ""
- scalar_mult_sql = """
- CREATE TEMP TABLE {norm_tbl} AS
- SELECT {self.schema_madlib}.array_scalar_mult(
- {self.independent_varname}::{FLOAT32_SQL_TYPE}[],
- (1/{self.normalizing_const})::{FLOAT32_SQL_TYPE}) AS x_norm,
- {one_hot_dep_var_array_expr} AS y,
- row_number() over() AS row_id
- FROM {self.source_table} {order_by_clause}
- """.format(FLOAT32_SQL_TYPE=FLOAT32_SQL_TYPE, **locals())
- plpy.execute(scalar_mult_sql)
+ # Generate random strings for TEMP tables
series_tbl = unique_string(desp='series')
dist_key_tbl = unique_string(desp='dist_key')
- dep_shape_col = add_postfix(
- MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL, "_shape")
- ind_shape_col = add_postfix(
- MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL, "_shape")
+ normalized_tbl = unique_string(desp='normalized_table')
+ batched_table = unique_string(desp='batched_table')
+
+ # Used later in locals() for formatting queries
+ x=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL
+ y=MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL
+ float32=FLOAT32_SQL_TYPE
+ dep_shape_col = add_postfix(y, "_shape")
+ ind_shape_col = add_postfix(x, "_shape")
ind_shape = self._get_independent_var_shape()
ind_shape = ','.join([str(i) for i in ind_shape])
dep_shape = self._get_dependent_var_shape()
dep_shape = ','.join([str(i) for i in dep_shape])
+ one_hot_dep_var_array_expr = self.get_one_hot_encoded_dep_var_expr()
+
+ # skip normalization step if normalizing_const = 1.0
+ if self.normalizing_const and (self.normalizing_const < 0.999999 or
self.normalizing_const > 1.000001):
+ rescale_independent_var =
"""{self.schema_madlib}.array_scalar_mult(
+
{self.independent_varname}::{float32}[],
+
(1/{self.normalizing_const})::{float32})
+ """.format(**locals())
+ else:
+ self.normalizing_const = DEFAULT_NORMALIZING_CONST
+ rescale_independent_var =
"{self.independent_varname}::{float32}[]".format(**locals())
+
+ # It's important that we shuffle all rows before batching for fit(),
but
+ # we can skip that for predict()
+ order_by_clause = " ORDER BY RANDOM()" if order_by_random else ""
+
+ # This query template will be used later in pg & gp specific code
paths,
+ # where {make_buffer_id} and {dist_by_buffer_id} are filled in
+ batching_query = """
+ CREATE TEMP TABLE {batched_table} AS SELECT
+ {{make_buffer_id}} buffer_id,
+ {self.schema_madlib}.agg_array_concat(
+ ARRAY[x_norm::{float32}[]]) AS {x},
+ {self.schema_madlib}.agg_array_concat(
+ ARRAY[y]) AS {y},
+ COUNT(*) AS count
+ FROM {normalized_tbl}
+ GROUP BY buffer_id
+ {{dist_by_buffer_id}}
+ """.format(**locals())
+
+ # This query template will be used later in pg & gp specific code
paths,
+ # where {dist_key_col_comma} and {dist_by_dist_key} will be filled in
+ bytea_query = """
+ CREATE TABLE {self.output_table} AS SELECT
+ {{dist_key_col_comma}}
+ {self.schema_madlib}.array_to_bytea({x}) AS {x},
+ {self.schema_madlib}.array_to_bytea({y}) AS {y},
+ ARRAY[count,{ind_shape}]::SMALLINT[] AS {ind_shape_col},
+ ARRAY[count,{dep_shape}]::SMALLINT[] AS {dep_shape_col},
+ buffer_id
+ FROM {batched_table}
+ {{dist_by_dist_key}}
+ """.format(**locals())
+
if is_platform_pg():
+ # used later for writing summary table
+ self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
+
+ #
+ # For postgres, we just need 3 simple queries:
+ # 1-hot-encode/normalize + batching + bytea conversion
+ #
+
+ # see note in gpdb code branch (lower down) on
+ # 1-hot-encoding of dependent var
+ one_hot_sql = """
+ CREATE TEMP TABLE {normalized_tbl} AS SELECT
+ (ROW_NUMBER() OVER({order_by_clause}) - 1)::INTEGER as
row_id,
+ {rescale_independent_var} AS x_norm,
+ {one_hot_dep_var_array_expr} AS y
+ FROM {self.source_table}
+ """.format(**locals())
+
+ plpy.execute(one_hot_sql)
+
+ self.buffer_size = self._get_buffer_size(1)
+
+ # Used to format query templates with locals()
+ make_buffer_id = 'row_id / {0} AS '.format(self.buffer_size)
+ dist_by_dist_key = ''
+ dist_by_buffer_id = ''
+ dist_key_col_comma = ''
+
+ # Disable hashagg since large number of arrays being concatenated
+ # could result in excessive memory usage.
+ with HashaggControl(False):
+ # Batch rows with GROUP BY
+ plpy.execute(batching_query.format(**locals()))
+
+ plpy.execute("DROP TABLE {0}".format(normalized_tbl))
+
+ # Convert to BYTEA and output final (permanent table)
+ plpy.execute(bytea_query.format(**locals()))
+
+ plpy.execute("DROP TABLE {0}".format(batched_table))
+
+ self._create_output_summary_table()
+
+ return
+
+ # Done with postgres, rest is all for gpdb
+ #
+ # This gpdb code path is far more complex, and depends on
+ # how the user wishes to distribute the data. Even if
+ # it's to be spread evenly across all segments, we still
+ # need to do some extra work to ensure that happens.
+
+ if self.distribution_rules == 'all_segments':
+ all_segments = True
self.distribution_rules = '$__madlib__$all_segments$__madlib__$'
- distributed_by_clause = ''
- dist_key_clause = ''
- join_clause = ''
- dist_key_comma = ''
+ num_segments = get_seg_number()
else:
- dist_key = DISTRIBUTION_KEY_COLNAME
- # Create large temp table such that there is atleast 1 row on each
segment
- # Using 999999 would distribute data(atleast 1 row on each
segment) for
- # a cluster as large as 20000
- query = """
- CREATE TEMP TABLE {series_tbl}
- AS
- SELECT generate_series(0, 999999) {dist_key}
- DISTRIBUTED BY ({dist_key})
- """.format(**locals())
- plpy.execute(query)
- distributed_by_clause= ' DISTRIBUTED BY ({dist_key})
'.format(**locals())
- dist_key_comma = dist_key + ' ,'
- gpu_join_clause = """JOIN {dist_key_tbl} ON
- ({self.gpu_config})[b.buffer_id%{num_segments}+1] =
{dist_key_tbl}.id
- """
-
- if self.distribution_rules == 'gpu_segments':
- gpu_info_table = unique_string(desp='gpu_info')
- plpy.execute("""
- SELECT
{self.schema_madlib}.gpu_configuration('{gpu_info_table}')
- """.format(**locals()))
- gpu_query = """
- SELECT array_agg(DISTINCT(hostname)) as gpu_config
- FROM {gpu_info_table}
- """.format(**locals())
- gpu_query_result = plpy.execute(gpu_query)[0]['gpu_config']
- if not gpu_query_result:
- plpy.error("{self.module_name}: No GPUs configured on
hosts.".format(self=self))
-
- gpu_config_hostnames = "ARRAY{0}".format(gpu_query_result)
- # find hosts with gpus
- get_segment_query = """
- SELECT array_agg(content) as segment_ids,
- array_agg(dbid) as dbid,
- count(*) as count
- FROM gp_segment_configuration
- WHERE content != -1 AND role = 'p'
- AND hostname=ANY({gpu_config_hostnames})
- """.format(**locals())
- segment_ids_result = plpy.execute(get_segment_query)[0]
- plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table))
-
- self.gpu_config =
"ARRAY{0}".format(sorted(segment_ids_result['segment_ids']))
- self.distribution_rules =
"ARRAY{0}".format(sorted(segment_ids_result['dbid']))
-
- num_segments = segment_ids_result['count']
- where_clause = "WHERE
gp_segment_id=ANY({self.gpu_config})".format(**locals())
- join_clause = gpu_join_clause.format(**locals())
-
- elif self.distribution_rules == DEFAULT_GPU_CONFIG:
-
- self.distribution_rules =
'$__madlib__$all_segments$__madlib__$'
- where_clause = ''
- num_segments = get_seg_number()
- join_clause = 'JOIN {dist_key_tbl} ON
(b.buffer_id%{num_segments})= {dist_key_tbl}.id'.format(**locals())
-
- else: # Read from a table with dbids to distribute the data
-
- self._validate_distribution_table()
- gpu_query = """
- SELECT array_agg(content) as gpu_config,
- array_agg(gp_segment_configuration.dbid) as dbid
- FROM {self.distribution_rules} JOIN
gp_segment_configuration
- ON {self.distribution_rules}.dbid =
gp_segment_configuration.dbid
- """.format(**locals())
- gpu_query_result = plpy.execute(gpu_query)[0]
- self.gpu_config =
"ARRAY{0}".format(sorted(gpu_query_result['gpu_config']))
- where_clause = "WHERE
gp_segment_id=ANY({self.gpu_config})".format(**locals())
- num_segments = plpy.execute("SELECT count(*) as count FROM
{self.distribution_rules}".format(**locals()))[0]['count']
- join_clause = gpu_join_clause.format(**locals())
- self.distribution_rules =
"ARRAY{0}".format(sorted(gpu_query_result['dbid']))
-
- dist_key_query = """
- CREATE TEMP TABLE {dist_key_tbl} AS
- SELECT gp_segment_id AS id, min({dist_key}) AS {dist_key}
- FROM {series_tbl}
- {where_clause}
- GROUP BY gp_segment_id
- """
- plpy.execute(dist_key_query.format(**locals()))
-
- # Create the mini-batched output table
+ all_segments = False
+
+ if self.distribution_rules == 'gpu_segments':
+ gpu_info_table = unique_string(desp='gpu_info')
+ plpy.execute("""
+ SELECT
{self.schema_madlib}.gpu_configuration('{gpu_info_table}')
+ """.format(**locals()))
+ gpu_query = """
+ SELECT array_agg(DISTINCT(hostname)) as gpu_config
+ FROM {gpu_info_table}
+ """.format(**locals())
+ gpu_query_result = plpy.execute(gpu_query)[0]['gpu_config']
+ if not gpu_query_result:
+ plpy.error("{self.module_name}: No GPUs configured on
hosts.".format(self=self))
+
+ gpu_config_hostnames = "ARRAY{0}".format(gpu_query_result)
+ # find hosts with gpus
+ get_segment_query = """
+ SELECT array_agg(content) as segment_ids,
+ array_agg(dbid) as dbid,
+ count(*) as count
+ FROM gp_segment_configuration
+ WHERE content != -1 AND role = 'p'
+ AND hostname=ANY({gpu_config_hostnames})
+ """.format(**locals())
+ segment_ids_result = plpy.execute(get_segment_query)[0]
+ plpy.execute("DROP TABLE IF EXISTS {0}".format(gpu_info_table))
+
+ self.gpu_config =
"ARRAY{0}".format(sorted(segment_ids_result['segment_ids']))
+ self.distribution_rules =
"ARRAY{0}".format(sorted(segment_ids_result['dbid']))
+
+ num_segments = segment_ids_result['count']
+
+ elif not all_segments: # Read from a table with dbids to distribute
the data
+ self._validate_distribution_table()
+ gpu_query = """
+ SELECT array_agg(content) as gpu_config,
+ array_agg(gp_segment_configuration.dbid) as dbid
+ FROM {self.distribution_rules} JOIN gp_segment_configuration
+ ON {self.distribution_rules}.dbid =
gp_segment_configuration.dbid
+ """.format(**locals())
+ gpu_query_result = plpy.execute(gpu_query)[0]
+ self.gpu_config =
"ARRAY{0}".format(sorted(gpu_query_result['gpu_config']))
+ num_segments = plpy.execute("SELECT count(*) as count FROM
{self.distribution_rules}".format(**locals()))[0]['count']
+ self.distribution_rules =
"ARRAY{0}".format(sorted(gpu_query_result['dbid']))
+
+ join_key = 't.buffer_id % {num_segments}'.format(**locals())
+
+ if not all_segments:
+ join_key = '({self.gpu_config})[{join_key} + 1]'.format(**locals())
+
+ # Create large temp table such that there is atleast 1 row on each
segment
+ # Using 999999 would distribute data(atleast 1 row on each segment) for
+ # a cluster as large as 20000
+ dist_key_col = DISTRIBUTION_KEY_COLNAME
+ query = """
+ CREATE TEMP TABLE {series_tbl} AS
+ SELECT generate_series(0, 999999) {dist_key_col}
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+
+ plpy.execute(query)
+
+ # Used in locals() to format queries, including template queries
+ # bytea_query & batching_query defined in section common to
+ # pg & gp (very beginning of this function)
+ dist_by_dist_key = 'DISTRIBUTED BY ({dist_key_col})'.format(**locals())
+ dist_by_buffer_id = 'DISTRIBUTED BY (buffer_id)'
+ dist_key_col_comma = dist_key_col + ' ,'
+ make_buffer_id = ''
+
+ dist_key_query = """
+ CREATE TEMP TABLE {dist_key_tbl} AS
+ SELECT min({dist_key_col}) AS {dist_key_col}
+ FROM {series_tbl}
+ GROUP BY gp_segment_id
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+
+ plpy.execute(dist_key_query)
+
+ plpy.execute("DROP TABLE {0}".format(series_tbl))
+
+ # Always one-hot encode the dependent var. For now, we are assuming
+ # that input_preprocessor_dl will be used only for deep
+ # learning and mostly for classification. So make a strong
+ # assumption that it is only for classification, so one-hot
+ # encode the dep var, unless it's already a numeric array in
+ # which case we assume it's already one-hot encoded.
+
+ # While 1-hot-encoding is done, we also normalize the independent
+ # var and randomly shuffle the rows on each segment. (The dist key
+ # we're adding avoids any rows moving between segments. This may
+ # make things slightly less random, but helps with speed--probably
+ # a safe tradeoff to make.)
+
+ norm_tbl = unique_string(desp='norm_table')
+
+ one_hot_sql = """
+ CREATE TEMP TABLE {norm_tbl} AS
+ SELECT {dist_key_col},
+ {rescale_independent_var} AS x_norm,
+ {one_hot_dep_var_array_expr} AS y
+ FROM {self.source_table} s JOIN {dist_key_tbl} AS d
+ ON (s.gp_segment_id = d.gp_segment_id)
+ {order_by_clause}
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+ plpy.execute(one_hot_sql)
+
+ rows_per_seg_tbl = unique_string(desp='rows_per_seg')
+ start_rows_tbl = unique_string(desp='start_rows')
+
+ # Generate rows_per_segment table; this small table will
+ # just have one row on each segment containing the number
+ # of rows on that segment in the norm_tbl
+ sql = """
+ CREATE TEMP TABLE {rows_per_seg_tbl} AS SELECT
+ COUNT(*) as rows_per_seg,
+ {dist_key_col}
+ FROM {norm_tbl}
+ GROUP BY {dist_key_col}
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+
+ plpy.execute(sql)
+
+ # Generate start_rows_tbl from rows_per_segment table.
+ # This assigns a start_row number for each segment based on
+ # the sum of all rows in previous segments. These will be
+ # added to the row numbers within each segment to get an
+ # absolute index into the table. All of this is to accomplish
+ # the equivalent of ROW_NUMBER() OVER() on the whole table,
+ # but this way is much faster because we don't have to do an
+ # N:1 Gather Motion (moving entire table to a single segment
+ # and scanning through it).
+ #
sql = """
- CREATE TABLE {self.output_table} AS
- SELECT {dist_key_comma}
- {self.schema_madlib}.convert_array_to_bytea({x}) AS {x},
- {self.schema_madlib}.convert_array_to_bytea({y}) AS {y},
- ARRAY[count,{ind_shape}]::SMALLINT[] AS {ind_shape_col},
- ARRAY[count,{dep_shape}]::SMALLINT[] AS {dep_shape_col},
- buffer_id
- FROM
- (
- SELECT
- {self.schema_madlib}.agg_array_concat(
- ARRAY[{norm_tbl}.x_norm::{FLOAT32_SQL_TYPE}[]]) AS {x},
- {self.schema_madlib}.agg_array_concat(
- ARRAY[{norm_tbl}.y]) AS {y},
- ({norm_tbl}.row_id%{self.num_of_buffers})::smallint AS
buffer_id,
- count(*) AS count
- FROM {norm_tbl}
- GROUP BY buffer_id
- ) b
- {join_clause}
- {distributed_by_clause}
- """.format(x=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME_DL,
- y=MINIBATCH_OUTPUT_DEPENDENT_COLNAME_DL,
- FLOAT32_SQL_TYPE=FLOAT32_SQL_TYPE,
- **locals())
+ CREATE TEMP TABLE {start_rows_tbl} AS SELECT
+ {dist_key_col},
+ SUM(rows_per_seg) OVER (ORDER BY gp_segment_id) - rows_per_seg
AS start_row
+ FROM {rows_per_seg_tbl}
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+
plpy.execute(sql)
- plpy.execute("DROP TABLE IF EXISTS {0}, {1}, {2}".format(norm_tbl,
series_tbl, dist_key_tbl))
+
+ plpy.execute("DROP TABLE {0}".format(rows_per_seg_tbl))
+
+ self.buffer_size = self._get_buffer_size(num_segments)
+
+ # The query below assigns slot_id's to each row within
+ # a segment, computes a row_id by adding start_row for
+ # that segment to it, then divides by buffer_size to make
+ # this into a buffer_id
+ # ie:
+ # buffer_id = row_id / buffer_size
+ # row_id = start_row + slot_id
+ # slot_id = ROW_NUMBER() OVER(PARTITION BY <dist key>)::INTEGER
+ #
+ # Instead of partitioning by gp_segment_id itself, we
+ # use __dist_key__ col instead. This is the same partition,
+ # since there's a 1-to-1 mapping between the columns; but
+ # using __dist_key__ avoids an extra Redistribute Motion.
+ #
+ # Note: even though the ordering of these two columns is
+ # different, this doesn't matter as each segment is being
+ # numbered separately (only the start_row is different,
+ # and those are fixed to the correct segments by the JOIN
+ # condition.
+
+ sql = """
+ CREATE TEMP TABLE {normalized_tbl} AS SELECT
+ {dist_key_col},
+ x_norm,
+ y,
+ (ROW_NUMBER() OVER( PARTITION BY {dist_key_col} ))::INTEGER as
slot_id,
+ ((start_row +
+ (ROW_NUMBER() OVER( PARTITION BY {dist_key_col} ) - 1)
+ )::INTEGER / {self.buffer_size}
+ ) AS buffer_id
+ FROM {norm_tbl} JOIN {start_rows_tbl}
+ USING ({dist_key_col})
+ ORDER BY buffer_id
+ DISTRIBUTED BY (slot_id)
+ """.format(**locals())
+
+ plpy.execute(sql) # label buffer_id's
+
+ # A note on DISTRIBUTED BY (slot_id) in above query:
+ #
+ # In the next query, we'll be doing the actual batching. Due
+ # to the GROUP BY, gpdb will Redistribute on buffer_id. We could
+ # avoid this by using DISTRIBUTED BY (buffer_id) in the above
+ # (buffer-labelling) query. But this also causes the GROUP BY
+ # to use single-stage GroupAgg instead of multistage GroupAgg,
+ # which for unknown reasons is *much* slower and often runs out
+ # of VMEM unless it's set very high!
+
+ plpy.execute("DROP TABLE {norm_tbl},
{start_rows_tbl}".format(**locals()))
+
+ # Disable optimizer (ORCA) for platforms that use it
+ # since we want to use a groupagg instead of hashagg
+ with OptimizerControl(False) and HashaggControl(False):
+ # Run actual batching query
+ plpy.execute(batching_query.format(**locals()))
+
+ plpy.execute("DROP TABLE {0}".format(normalized_tbl))
+
+ if not all_segments: # remove any segments we don't plan to use
+ sql = """
+ DELETE FROM {dist_key_tbl}
+ WHERE NOT gp_segment_id = ANY({self.gpu_config})
+ """.format(**locals())
+
+ plpy.execute("ANALYZE {dist_key_tbl}".format(**locals()))
+ plpy.execute("ANALYZE {batched_table}".format(**locals()))
+
+ # Redistribute from buffer_id to dist_key
+ #
+ # This has to be separate from the batching query, because
+ # we found that adding DISTRIBUTED BY (dist_key) to that
+ # query causes it to run out of VMEM on large datasets such
+ # as places100. Possibly this is because the memory available
+ # for GroupAgg has to be shared with an extra slice if they
+ # are part of the same query.
+ #
+ # We also tried adding this to the BYTEA conversion query, but
+ # that resulted in slower performance than just keeping it
+ # separate.
+ #
+ sql = """CREATE TEMP TABLE {batched_table}_dist_key AS
+ SELECT {dist_key_col}, t.*
+ FROM {batched_table} t
+ JOIN {dist_key_tbl} d
+ ON {join_key} = d.gp_segment_id
+ DISTRIBUTED BY ({dist_key_col})
+ """.format(**locals())
+
+ # match buffer_id's with dist_keys
+ plpy.execute(sql)
+
+ sql = """DROP TABLE {batched_table}, {dist_key_tbl};
+ ALTER TABLE {batched_table}_dist_key RENAME TO {batched_table}
+ """.format(**locals())
+ plpy.execute(sql)
+
+ # Convert batched table to BYTEA and output as final (permanent) table
+ plpy.execute(bytea_query.format(**locals()))
+
+ plpy.execute("DROP TABLE {0}".format(batched_table))
+
# Create summary table
self._create_output_summary_table()
@@ -405,7 +629,8 @@ class InputDataPreprocessorDL(object):
_assert(self.buffer_size > 0,
"{0}: The buffer size has to be a "
"positive integer or NULL.".format(self.module_name))
- _assert(self.normalizing_const > 0,
+ if self.normalizing_const is not None:
+ _assert(self.normalizing_const > 0,
"{0}: The normalizing constant has to be a "
"positive integer or NULL.".format(self.module_name))
@@ -442,16 +667,17 @@ class InputDataPreprocessorDL(object):
return get_distinct_col_levels(table, dependent_varname,
dependent_vartype, include_nulls=True)
- def _get_num_buffers(self):
+ def _get_buffer_size(self, num_segments):
num_rows_in_tbl = plpy.execute("""
SELECT count(*) AS cnt FROM {0}
""".format(self.source_table))[0]['cnt']
buffer_size_calculator = MiniBatchBufferSizeCalculator()
indepdent_var_dim = get_product_of_dimensions(self.source_table,
self.independent_varname)
- self.buffer_size =
buffer_size_calculator.calculate_default_buffer_size(
- self.buffer_size, num_rows_in_tbl, indepdent_var_dim)
- return ceil((1.0 * num_rows_in_tbl) / self.buffer_size)
+ buffer_size = buffer_size_calculator.calculate_default_buffer_size(
+ self.buffer_size, num_rows_in_tbl, indepdent_var_dim, num_segments)
+ num_buffers = num_segments * ceil((1.0 * num_rows_in_tbl) /
buffer_size / num_segments)
+ return int(ceil(num_rows_in_tbl / num_buffers))
class ValidationDataPreprocessorDL(InputDataPreprocessorDL):
def __init__(self, schema_madlib, source_table, output_table,
diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
index 40ae56e..6e006d5 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_helper.py_in
@@ -51,7 +51,6 @@ FLOAT32_SQL_TYPE = 'REAL'
SMALLINT_SQL_TYPE = 'SMALLINT'
DEFAULT_NORMALIZING_CONST = 1.0
-DEFAULT_GPU_CONFIG = 'all_segments'
GP_SEGMENT_ID_COLNAME = "gp_segment_id"
INTERNAL_GPU_CONFIG = '__internal_gpu_config__'
diff --git
a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
index 37a2e25..11730cf 100644
--- a/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
+++ b/src/ports/postgres/modules/deep_learning/madlib_keras_validator.py_in
@@ -31,7 +31,6 @@ from madlib_keras_helper import NORMALIZING_CONST_COLNAME
from madlib_keras_helper import DISTRIBUTION_KEY_COLNAME
from madlib_keras_helper import METRIC_TYPE_COLNAME
from madlib_keras_helper import INTERNAL_GPU_CONFIG
-from madlib_keras_helper import DEFAULT_GPU_CONFIG
from madlib_keras_helper import query_model_configs
from utilities.minibatch_validation import validate_bytea_var_for_minibatch
@@ -234,7 +233,7 @@ class InputValidator:
gpu_config = plpy.execute(
"SELECT {0} FROM {1}".format(INTERNAL_GPU_CONFIG, summary_table)
)[0][INTERNAL_GPU_CONFIG]
- if gpu_config == DEFAULT_GPU_CONFIG:
+ if gpu_config == 'all_segments':
_assert(0 not in accessible_gpus_for_seg,
"{0} error: Host(s) are missing gpus.".format(module_name))
else:
diff --git
a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
index d8c6798..7c6c5c3 100644
---
a/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
+++
b/src/ports/postgres/modules/deep_learning/test/input_data_preprocessor.sql_in
@@ -19,6 +19,7 @@
*
*//* -----------------------------------------------------------------------
*/
m4_include(`SQLCommon.m4')
+m4_changequote(`<!', `!>')
DROP TABLE IF EXISTS data_preprocessor_input;
CREATE TABLE data_preprocessor_input(id serial, x double precision[], label
TEXT);
@@ -49,20 +50,72 @@ SELECT training_preprocessor_dl(
'x',
5);
-SELECT assert(count(*)=4, 'Incorrect number of buffers in
data_preprocessor_input_batch.')
+-- Divide two numbers and round up to the nearest integer
+CREATE FUNCTION divide_roundup(numerator NUMERIC, denominator NUMERIC)
+RETURNS INTEGER AS
+$$
+ SELECT (ceil($1 / $2)::INTEGER);
+$$ LANGUAGE SQL;
+
+-- num_buffers_calc() represents the num_buffers value that should be
+-- calculated by the preprocessor.
+-- For postgres, just need total rows / buffer_size rounded up.
+-- For greenplum, we take that result, and round up to the nearest multiple
+-- of num_segments.
+CREATE FUNCTION num_buffers_calc(rows_in_tbl INTEGER, buffer_size INTEGER)
+RETURNS INTEGER AS
+$$
+m4_ifdef(<!__POSTGRESQL__!>,
+ <! SELECT divide_roundup($1, $2); !>,
+ <! SELECT (COUNT(*)::INTEGER) * divide_roundup(divide_roundup($1, $2),
COUNT(*)) FROM gp_segment_configuration
+ WHERE role = 'p' AND content
!= -1; !>
+)
+$$ LANGUAGE SQL;
+
+-- num_buffers() represents the actual number of buffers expected to
+-- be returned in the output table.
+-- For postgres, this should always be the same as num_buffers_calc()
+-- (as long as rows_in_tbl > 0, which should be validated elsewhere)
+-- For greenplum, this can be less than num_buffers_calc() in
+-- the special case where there is only one row per buffer. In
+-- that case, the buffers in the output table will be equal to
+-- the number of rows in the input table. This can only happen
+-- if rows_in_tbl < num_segments and is the only case where the
+-- number of buffers on each segment will not be exactly equal
+CREATE FUNCTION num_buffers(rows_in_tbl INTEGER, buffer_size INTEGER)
+RETURNS INTEGER AS
+$$
+ SELECT LEAST(num_buffers_calc($1, $2), $1);
+$$ LANGUAGE SQL;
+
+CREATE FUNCTION buffer_size(rows_in_tbl INTEGER, requested_buffer_size INTEGER)
+RETURNS INTEGER AS
+$$
+ SELECT divide_roundup($1, num_buffers($1, $2));
+$$ LANGUAGE SQL;
+
+SELECT assert(COUNT(*) = num_buffers(17, 5),
+ 'Incorrect number of buffers in data_preprocessor_input_batch.')
FROM data_preprocessor_input_batch;
-SELECT assert(independent_var_shape[2]=6, 'Incorrect buffer size.')
+SELECT assert(independent_var_shape[2]=6, 'Incorrect image shape ' ||
independent_var_shape[2])
FROM data_preprocessor_input_batch WHERE buffer_id=0;
-SELECT assert(independent_var_shape[1]=5, 'Incorrect buffer size.')
-FROM data_preprocessor_input_batch WHERE buffer_id=1;
+SELECT assert(independent_var_shape[1]=buffer_size, 'Incorrect buffer size '
|| independent_var_shape[1])
+FROM (SELECT buffer_size(17, 5) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
-SELECT assert(independent_var_shape[1]=4, 'Incorrect buffer size.')
-FROM data_preprocessor_input_batch WHERE buffer_id=3;
+SELECT assert(independent_var_shape[1]=buffer_size, 'Incorrect buffer size '
|| independent_var_shape[1])
+FROM (SELECT buffer_size(17, 5) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=1;
+
+SELECT assert(independent_var_shape[1]=buffer_size, 'Incorrect buffer size '
|| independent_var_shape[1])
+FROM (SELECT buffer_size(17, 5) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=2;
+
+SELECT assert(total_images = 17, 'Incorrect total number of images! Last
buffer has incorrect size?')
+FROM (SELECT SUM(independent_var_shape[1]) AS total_images FROM
data_preprocessor_input_batch) a;
+
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length ' || octet_length(independent_var)::TEXT)
+FROM (SELECT buffer_size(17, 5) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
-SELECT assert(octet_length(independent_var) = 96, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
DROP TABLE IF EXISTS validation_out, validation_out_summary;
SELECT validation_preprocessor_dl(
@@ -73,20 +126,21 @@ SELECT validation_preprocessor_dl(
'data_preprocessor_input_batch',
5);
-SELECT assert(count(*)=4, 'Incorrect number of buffers in validation_out.')
+SELECT assert(COUNT(*) = num_buffers(17, 5),
+ 'Incorrect number of buffers in validation_out.')
FROM validation_out;
-SELECT assert(independent_var_shape[2]=6, 'Incorrect buffer size.')
+SELECT assert(independent_var_shape[2]=6, 'Incorrect image shape.')
FROM data_preprocessor_input_batch WHERE buffer_id=0;
-SELECT assert(independent_var_shape[1]=5, 'Incorrect buffer size.')
-FROM data_preprocessor_input_batch WHERE buffer_id=1;
+SELECT assert(independent_var_shape[1]=buffer_size, 'Incorrect buffer size.')
+FROM (SELECT buffer_size(17, 5) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=1;
-SELECT assert(independent_var_shape[1]=4, 'Incorrect buffer size.')
-FROM data_preprocessor_input_batch WHERE buffer_id=3;
+SELECT assert(total_images = 17, 'Incorrect total number of images! Last
buffer has incorrect size?')
+FROM (SELECT SUM(independent_var_shape[1]) AS total_images FROM
data_preprocessor_input_batch) a;
-SELECT assert(octet_length(independent_var) = 96, 'Incorrect buffer size')
-FROM validation_out WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 5) buffer_size) a, validation_out WHERE
buffer_id=0;
DROP TABLE IF EXISTS data_preprocessor_input_batch,
data_preprocessor_input_batch_summary;
SELECT training_preprocessor_dl(
@@ -96,7 +150,6 @@ SELECT training_preprocessor_dl(
'x');
-- Test data is evenly distributed across all segments (GPDB only)
-m4_changequote(`<!', `!>')
m4_ifdef(<!__POSTGRESQL__!>, <!!>, <!
DROP TABLE IF EXISTS data_preprocessor_input_batch,
data_preprocessor_input_batch_summary;
SELECT training_preprocessor_dl(
@@ -109,11 +162,10 @@ SELECT training_preprocessor_dl(
-- This test expects that total number of images(17 for input table
data_preprocessor_input)
-- are equally distributed across all segments.
-- Therefore, after preprocessing seg0 will have 17/(# of segs) buffers.
-SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from
gp_segment_configuration WHERE role = 'p' and content != -1), 'Even
distribution of buffers failed.')
-FROM data_preprocessor_input_batch
-WHERE gp_segment_id = 0;
+SELECT gp_segment_id, assert((SELECT divide_roundup(17, count(*)) from
gp_segment_configuration WHERE role = 'p' and content != -1) - COUNT(*) <= 1,
'Even distribution of buffers failed. Seeing ' || count(*) || ' buffers.')
+ FROM data_preprocessor_input_batch GROUP BY 1;
SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in
summary table')
-FROM data_preprocessor_input_batch_summary;
+ FROM data_preprocessor_input_batch_summary;
-- Test validation data is evenly distributed across all segments (GPDB only)
DROP TABLE IF EXISTS validation_out, validation_out_summary;
@@ -124,9 +176,8 @@ SELECT validation_preprocessor_dl(
'x',
'data_preprocessor_input_batch',
1);
-SELECT assert(count(*)=(SELECT ceil(17.0/count(*)) from
gp_segment_configuration WHERE role = 'p' and content != -1), 'Even
distribution of validation buffers failed.')
-FROM validation_out
-WHERE gp_segment_id = 0;
+SELECT gp_segment_id, assert((SELECT divide_roundup(17, count(*)) from
gp_segment_configuration WHERE role = 'p' and content != -1) - COUNT(*) <= 1,
'Even distribution of buffers failed. Seeing ' || count(*) || ' buffers.')
+ FROM validation_out GROUP BY 1;
SELECT assert(__internal_gpu_config__ = 'all_segments', 'Missing column in
validation summary table')
FROM validation_out_summary;
@@ -208,8 +259,8 @@ SELECT assert(relative_error(MAX(x),46.6) < 0.00001,
'Independent var not normal
SELECT assert(dependent_var_shape[2] = 16, 'Incorrect one-hot encode dimension
with num_classes') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
-- Test summary table
SELECT assert
@@ -220,13 +271,14 @@ SELECT assert
independent_varname = 'x' AND
dependent_vartype = 'integer' AND
class_values = '{-6,-3,-1,0,2,3,4,5,6,7,8,9,10,12,NULL,NULL}'
AND
- buffer_size = 4 AND -- we sort the class values in python
+ summary.buffer_size = a.buffer_size AND -- we sort the class values
in python
normalizing_const = 5 AND
pg_typeof(normalizing_const) = 'real'::regtype AND
num_classes = 16 AND
distribution_rules = 'all_segments',
'Summary Validation failed. Actual:' || __to_char(summary)
- ) from (select * from data_preprocessor_input_batch_summary) summary;
+ ) FROM (SELECT buffer_size(17, 4) buffer_size) a,
+ (SELECT * FROM data_preprocessor_input_batch_summary) summary;
--- Test output data type
SELECT assert(pg_typeof(independent_var) = 'bytea'::regtype, 'Wrong
independent_var type') FROM data_preprocessor_input_batch WHERE buffer_id = 0;
@@ -286,8 +338,8 @@ SELECT assert(pg_typeof(dependent_var) = 'bytea'::regtype,
'One-hot encode doesn
SELECT assert(dependent_var_shape[2] = 2, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
SELECT assert(SUM(y) = 1, 'Incorrect one-hot encode format') FROM (SELECT
buffer_id, UNNEST((convert_bytea_to_smallint_array(dependent_var))[1:2]) as y
FROM data_preprocessor_input_batch) a WHERE buffer_id = 0;
SELECT assert (dependent_vartype = 'boolean' AND
@@ -328,8 +380,8 @@ SELECT assert(pg_typeof(dependent_var) = 'bytea'::regtype,
'One-hot encode doesn
SELECT assert(dependent_var_shape[2] = 3, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
SELECT assert(SUM(y) = 1, 'Incorrect one-hot encode format') FROM (SELECT
buffer_id, UNNEST((convert_bytea_to_smallint_array(dependent_var))[1:3]) as y
FROM data_preprocessor_input_batch) a WHERE buffer_id = 0;
SELECT assert (dependent_vartype = 'text' AND
@@ -363,8 +415,8 @@ SELECT training_preprocessor_dl(
SELECT assert(pg_typeof(dependent_var) = 'bytea'::regtype, 'One-hot encode
doesn''t convert into integer array format') FROM data_preprocessor_input_batch
WHERE buffer_id = 0;
SELECT assert(dependent_var_shape[2] = 3, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
SELECT assert(SUM(y) = 1, 'Incorrect one-hot encode format') FROM (SELECT
buffer_id, UNNEST((convert_bytea_to_smallint_array(dependent_var))[1:3]) as y
FROM data_preprocessor_input_batch) a WHERE buffer_id = 0;
SELECT assert (dependent_vartype = 'double precision' AND
class_values = '{4.0,4.2,5.0}' AND
@@ -385,8 +437,8 @@ SELECT assert(pg_typeof(dependent_var) = 'bytea'::regtype,
'One-hot encode doesn
SELECT assert(dependent_var_shape[2] = 2, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
SELECT assert(relative_error(SUM(y), SUM(y4)) < 0.000001, 'Incorrect one-hot
encode value') FROM (SELECT
UNNEST(convert_bytea_to_smallint_array(dependent_var)) AS y FROM
data_preprocessor_input_batch) a, (SELECT UNNEST(y4) as y4 FROM
data_preprocessor_input) b;
SELECT assert (dependent_vartype = 'double precision[]' AND
@@ -419,8 +471,8 @@ SELECT assert(pg_typeof(dependent_var) = 'bytea'::regtype,
'One-hot encode doesn
SELECT assert(dependent_var_shape[2] = 2, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
SELECT assert(relative_error(SUM(y), SUM(y5)) < 0.000001, 'Incorrect one-hot
encode value') FROM (SELECT
UNNEST(convert_bytea_to_smallint_array(dependent_var)) AS y FROM
data_preprocessor_input_batch) a, (SELECT UNNEST(y5) as y5 FROM
data_preprocessor_input) b;
SELECT assert (dependent_vartype = 'integer[]' AND
@@ -473,8 +525,8 @@ SELECT assert
SELECT assert(dependent_var_shape[2] = 5, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 72, 'Incorrect buffer size')
-FROM data_preprocessor_input_batch WHERE buffer_id=0;
+SELECT assert(octet_length(independent_var) = buffer_size*6*4, 'Incorrect
buffer length')
+FROM (SELECT buffer_size(17, 4) buffer_size) a, data_preprocessor_input_batch
WHERE buffer_id=0;
-- The same tests, but for validation.
DROP TABLE IF EXISTS data_preprocessor_input_validation_null;
@@ -541,7 +593,7 @@ SELECT assert
SELECT assert(dependent_var_shape[2] = 3, 'Incorrect one-hot encode
dimension') FROM
data_preprocessor_input_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 24, 'Incorrect buffer size')
+SELECT assert(octet_length(independent_var) = 24, 'Incorrect buffer length')
FROM data_preprocessor_input_batch WHERE buffer_id=0;
-- NULL is treated as a class label, so it should show '1' for the
-- first index
@@ -570,7 +622,7 @@ SELECT assert
SELECT assert(dependent_var_shape[2] = 3, 'Incorrect one-hot encode
dimension') FROM
validation_out_batch WHERE buffer_id = 0;
-SELECT assert(octet_length(independent_var) = 24, 'Incorrect buffer size')
+SELECT assert(octet_length(independent_var) = 24, 'Incorrect buffer length')
FROM data_preprocessor_input_batch WHERE buffer_id=0;
-- NULL is treated as a class label, so it should show '1' for the
-- first index
diff --git
a/src/ports/postgres/modules/deep_learning/test/madlib_keras_cifar.setup.sql_in
b/src/ports/postgres/modules/deep_learning/test/madlib_keras_cifar.setup.sql_in
index 1f3a24f..7c9ad5e 100644
---
a/src/ports/postgres/modules/deep_learning/test/madlib_keras_cifar.setup.sql_in
+++
b/src/ports/postgres/modules/deep_learning/test/madlib_keras_cifar.setup.sql_in
@@ -24,8 +24,8 @@
DROP TABLE IF EXISTS cifar_10_sample;
CREATE TABLE cifar_10_sample(id INTEGER, y SMALLINT, y_text TEXT, imgpath
TEXT, x REAL[]);
COPY cifar_10_sample FROM STDIN DELIMITER '|';
-1|0|'cat'|'0/img0.jpg'|{{{202,204,199},{202,204,199},{204,206,201},{206,208,203},{208,210,205},{209,211,206},{210,212,207},{212,214,210},{213,215,212},{215,217,214},{216,218,215},{216,218,215},{215,217,214},{216,218,215},{216,218,215},{216,218,214},{217,219,214},{217,219,214},{218,220,215},{218,219,214},{216,217,212},{217,218,213},{218,219,214},{214,215,209},{213,214,207},{212,213,206},{211,212,205},{209,210,203},{208,209,202},{207,208,200},{205,206,199},{203,204,198}},{{206,208,203},{20
[...]
-2|1|'dog'|'0/img2.jpg'|{{{126,118,110},{122,115,108},{126,119,111},{127,119,109},{130,122,111},{130,122,111},{132,124,113},{133,125,114},{130,122,111},{132,124,113},{134,126,115},{131,123,112},{131,123,112},{134,126,115},{133,125,114},{136,128,117},{137,129,118},{137,129,118},{136,128,117},{131,123,112},{130,122,111},{132,124,113},{132,124,113},{132,124,113},{129,122,110},{127,121,109},{127,121,109},{125,119,107},{124,118,106},{124,118,106},{120,114,102},{117,111,99}},{{122,115,107},{119
[...]
+0|0|'cat'|'0/img0.jpg'|{{{202,204,199},{202,204,199},{204,206,201},{206,208,203},{208,210,205},{209,211,206},{210,212,207},{212,214,210},{213,215,212},{215,217,214},{216,218,215},{216,218,215},{215,217,214},{216,218,215},{216,218,215},{216,218,214},{217,219,214},{217,219,214},{218,220,215},{218,219,214},{216,217,212},{217,218,213},{218,219,214},{214,215,209},{213,214,207},{212,213,206},{211,212,205},{209,210,203},{208,209,202},{207,208,200},{205,206,199},{203,204,198}},{{206,208,203},{20
[...]
+1|1|'dog'|'0/img2.jpg'|{{{126,118,110},{122,115,108},{126,119,111},{127,119,109},{130,122,111},{130,122,111},{132,124,113},{133,125,114},{130,122,111},{132,124,113},{134,126,115},{131,123,112},{131,123,112},{134,126,115},{133,125,114},{136,128,117},{137,129,118},{137,129,118},{136,128,117},{131,123,112},{130,122,111},{132,124,113},{132,124,113},{132,124,113},{129,122,110},{127,121,109},{127,121,109},{125,119,107},{124,118,106},{124,118,106},{120,114,102},{117,111,99}},{{122,115,107},{119
[...]
\.
DROP TABLE IF EXISTS cifar_10_sample_batched;
diff --git
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
index f21176c..d2e14cd 100644
---
a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
+++
b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_input_data_preprocessor.py_in
@@ -61,6 +61,8 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
self.module = deep_learning.input_data_preprocessor
import utilities.minibatch_preprocessing
self.util_module = utilities.minibatch_preprocessing
+ import utilities.control
+ self.control_module = utilities.control
self.module.get_expr_type = Mock(side_effect = ['integer[]',
'integer[]'])
self.module.validate_module_input_params = Mock()
self.module.get_distinct_col_levels = Mock(return_value = [0,22,100])
@@ -70,6 +72,9 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
def test_input_preprocessor_dl_executes_query(self):
self.module.get_expr_type = Mock(side_effect = ['integer[]',
'integer[]'])
+ self.control_module.OptimizerControl.__enter__ = Mock()
+ self.control_module.OptimizerControl.optimizer_control = True
+ self.control_module.OptimizerControl.optimizer_enabled = True
preprocessor_obj = self.module.InputDataPreprocessorDL(
self.default_schema_madlib,
"input",
@@ -85,6 +90,9 @@ class InputPreProcessorDLTestCase(unittest.TestCase):
def test_input_preprocessor_null_buffer_size_executes_query(self):
self.module.get_expr_type = Mock(side_effect = ['integer[]',
'integer[]'])
+ self.control_module.OptimizerControl.__enter__ = Mock()
+ self.control_module.OptimizerControl.optimizer_control = True
+ self.control_module.OptimizerControl.optimizer_enabled = True
preprocessor_obj = self.module.InputDataPreprocessorDL(
self.default_schema_madlib,
"input",
diff --git a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
index e03bf44..c25463c 100644
--- a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
+++ b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
@@ -457,10 +457,13 @@ class MiniBatchBufferSizeCalculator:
@staticmethod
def calculate_default_buffer_size(buffer_size,
avg_num_rows_processed,
- independent_var_dimension):
+ independent_var_dimension,
+ num_of_segments=None):
if buffer_size is not None:
return buffer_size
- num_of_segments = get_seg_number()
+
+ if num_of_segments is None:
+ num_of_segments = get_seg_number()
default_buffer_size = min(75000000.0/independent_var_dimension,
float(avg_num_rows_processed)/num_of_segments)
diff --git a/src/ports/postgres/modules/utilities/utilities.py_in
b/src/ports/postgres/modules/utilities/utilities.py_in
index 687566a..12b5205 100644
--- a/src/ports/postgres/modules/utilities/utilities.py_in
+++ b/src/ports/postgres/modules/utilities/utilities.py_in
@@ -20,6 +20,27 @@ import plpy
m4_changequote(`<!', `!>')
+def plpy_execute_debug(sql, *args, **kwargs):
+ """ Replace plpy.execute(sql, ...) with
+ plpy_execute_debug(sql, ...) to debug
+ a query. Shows the query itself, the
+ EXPLAIN of it, and how long the query
+ takes to execute.
+ """
+ plpy.info(sql) # Print sql command
+
+ # Print EXPLAIN of sql command
+ res = plpy.execute("EXPLAIN " + sql, *args)
+ for r in res:
+ plpy.info(r['QUERY PLAN'])
+
+ # Run actual sql command, with timing
+ start = time.time()
+ plpy.execute(sql, *args)
+
+ # Print how long execution of query took
+ plpy.info("Query took {0}s".format(time.time() - start))
+
def has_function_properties():
""" __HAS_FUNCTION_PROPERTIES__ variable defined during configure """
return m4_ifdef(<!__HAS_FUNCTION_PROPERTIES__!>, <!True!>, <!False!>)