Minibatch Preprocessor: Create temp table for standardization.

We did a few experiments and the results proved that creating a
temp table for standardization is faster than using a subquery.
This commit now creates a temp table for the standardization.
Before this commit, we were calling the `utils_normalize_data`
function inside the main query but now we create a temp table from the
output of `utils_normalize_data` and use the table in the main query.

Closes #260


Project: http://git-wip-us.apache.org/repos/asf/madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/madlib/commit/259e0041
Tree: http://git-wip-us.apache.org/repos/asf/madlib/tree/259e0041
Diff: http://git-wip-us.apache.org/repos/asf/madlib/diff/259e0041

Branch: refs/heads/master
Commit: 259e00416a268512cee80513fb24bcb2ae9fb273
Parents: b886381
Author: Nikhil Kak <n...@pivotal.io>
Authored: Fri Apr 6 13:55:46 2018 -0700
Committer: Nandish Jayaram <njaya...@apache.org>
Committed: Fri Apr 13 17:16:50 2018 -0700

----------------------------------------------------------------------
 .../utilities/minibatch_preprocessing.py_in     | 70 ++++++++++++--------
 .../test_minibatch_preprocessing.py_in          |  2 +-
 2 files changed, 42 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/madlib/blob/259e0041/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in 
b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
index 856c7e4..89eea6e 100644
--- a/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
+++ b/src/ports/postgres/modules/utilities/minibatch_preprocessing.py_in
@@ -169,6 +169,7 @@ class MiniBatchPreProcessor:
         plpy.execute(sql)
 
         standardizer.create_output_standardization_table()
+        standardizer.drop_standardized_table()
         MiniBatchSummarizer.create_output_summary_table(
             self.output_summary_table,
             self.source_table,
@@ -365,6 +366,7 @@ class MiniBatchStandardizer:
         self.x_mean_table = unique_string(desp='x_mean_table')
         self.x_mean_str = None
         self.x_std_dev_str = None
+        self.standardized_table = unique_string(desp='std_table')
         self._calculate_mean_and_std_dev_str()
 
     def _calculate_mean_and_std_dev_str(self):
@@ -395,42 +397,49 @@ class MiniBatchStandardizer:
 
     def get_query_for_standardizing(self):
         if self.grouping_cols:
-            return self._get_query_for_standardizing_with_grouping()
+            query = self._get_query_for_standardizing_with_grouping()
         else:
-            return self._get_query_for_standardizing_without_grouping()
+            query = self._get_query_for_standardizing_without_grouping()
+        plpy.execute(query)
+
+        return "select * from {0}".format(self.standardized_table)
 
     def _get_query_for_standardizing_without_grouping(self):
         return """
-            SELECT
-                {self.dep_var_array_str} AS {dep_colname},
-                {self.schema_madlib}.utils_normalize_data(
-                    {self.indep_var_array_str},
-                    '{self.x_mean_str}'::double precision[],
-                    '{self.x_std_dev_str}'::double precision[]
-                ) AS {ind_colname}
-            FROM {self.source_table}
-        """.format(dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
-                   ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
-                   self=self)
+          CREATE TEMP TABLE {self.standardized_table} AS
+          SELECT
+            {self.dep_var_array_str} AS {dep_colname},
+            {self.schema_madlib}.utils_normalize_data(
+              {self.indep_var_array_str},
+              '{self.x_mean_str}'::double precision[],
+              '{self.x_std_dev_str}'::double precision[]
+            ) AS {ind_colname}
+          FROM {self.source_table}
+          """.format(dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
+                     ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME,
+                     self=self)
+
 
     def _get_query_for_standardizing_with_grouping(self):
         return """
-            SELECT
-                {self.dep_var_array_str} as {dep_colname},
-                {self.schema_madlib}.utils_normalize_data(
-                    {self.indep_var_array_str},
-                    __x__.mean::double precision[],
-                    __x__.std::double precision[]
-                ) AS {ind_colname},
-                {self.source_table}.{self.grouping_cols}
-            FROM
-                {self.source_table}
-                INNER JOIN
-                {self.x_mean_table} AS __x__
-                ON  {self.source_table}.{self.grouping_cols} = 
__x__.{self.grouping_cols}
-        """.format(self=self,
-                   dep_colname = MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
-                   ind_colname = MINIBATCH_OUTPUT_INDEPENDENT_COLNAME)
+        CREATE TEMP TABLE {self.standardized_table} AS
+          SELECT
+            {self.dep_var_array_str} AS {dep_colname},
+            {self.schema_madlib}.utils_normalize_data(
+                {self.indep_var_array_str},
+                __x__.mean::double precision[],
+                __x__.std::double precision[]
+            ) AS {ind_colname},
+            {self.source_table}.{self.grouping_cols}
+        FROM
+          {self.source_table} 
+          INNER JOIN 
+          {self.x_mean_table} AS __x__ 
+          ON  {self.source_table}.{self.grouping_cols} = 
__x__.{self.grouping_cols}
+        """.format(
+            self=self,
+            dep_colname=MINIBATCH_OUTPUT_DEPENDENT_COLNAME,
+            ind_colname=MINIBATCH_OUTPUT_INDEPENDENT_COLNAME)
 
     def create_output_standardization_table(self):
         if self.grouping_cols:
@@ -446,6 +455,9 @@ class MiniBatchStandardizer:
             """.format(self=self)
         plpy.execute(query)
 
+    def drop_standardized_table(self):
+        plpy.execute("DROP TABLE IF EXISTS 
{0}".format(self.standardized_table))
+
 
 class MiniBatchSummarizer:
     @staticmethod

http://git-wip-us.apache.org/repos/asf/madlib/blob/259e0041/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
 
b/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
index f458303..75cc044 100644
--- 
a/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
+++ 
b/src/ports/postgres/modules/utilities/test/unit_tests/test_minibatch_preprocessing.py_in
@@ -135,7 +135,7 @@ m4_changequote(`<!', `!>')
 #                                                              
self.grouping_cols,
 #                                                              1)
 #         preprocessor_obj.minibatch_preprocessor()
-#         self.assert_(True)
+#         self.assertEqual(1, drop_table_mock.call_count)
 
 
 class MiniBatchQueryFormatterTestCase(unittest.TestCase):

Reply via email to