Repository: incubator-madlib
Updated Branches:
  refs/heads/master 4b0c37714 -> 0ff829a70


Bugfix: Elastic net gives inconsistent result

JIRA: MADLIB-1092

- Elastic net used to consider the number of rows as the total number
of rows in the table even when grouping was used. This fix changes
that to consider the number of rows in a group while computing IGD.
- Elastic net used to consider mean and standard deviation for both
independent and dependent variables based on the entire table even
when grouping was used. This is now computed based on a group,
which is used to computed the scaled data when standardize=TRUE
for Gaussian IGD.
- One approximation still remains. During gradient computation (C++),
every value in the independent variable (for each dimension) is
subtracted with the mean computed based on the entire table and
not groups. This approximiation was adopted since it is messy to
pass group specific mean values for every row in the table to the
C++ layer.

Closes #126


Project: http://git-wip-us.apache.org/repos/asf/incubator-madlib/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-madlib/commit/0ff829a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-madlib/tree/0ff829a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-madlib/diff/0ff829a7

Branch: refs/heads/master
Commit: 0ff829a7060d08f284e8468ebf35c31b6e231d58
Parents: 4b0c377
Author: Nandish Jayaram <njaya...@apache.org>
Authored: Mon Apr 24 09:46:03 2017 -0700
Committer: Nandish Jayaram <njaya...@apache.org>
Committed: Fri Apr 28 17:47:20 2017 -0700

----------------------------------------------------------------------
 .../modules/convex/utils_regularization.py_in   | 157 ++++++++++--
 .../elastic_net_generate_result.py_in           |  89 ++++---
 .../elastic_net_optimizer_fista.py_in           |  20 +-
 .../elastic_net/elastic_net_optimizer_igd.py_in | 106 ++++----
 .../modules/elastic_net/elastic_net_utils.py_in | 242 +++++++------------
 5 files changed, 341 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/0ff829a7/src/ports/postgres/modules/convex/utils_regularization.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/convex/utils_regularization.py_in 
b/src/ports/postgres/modules/convex/utils_regularization.py_in
index 64204aa..879c0d6 100644
--- a/src/ports/postgres/modules/convex/utils_regularization.py_in
+++ b/src/ports/postgres/modules/convex/utils_regularization.py_in
@@ -6,16 +6,17 @@ from validation.cv_utils import 
__cv_split_data_using_id_col_compute
 from validation.cv_utils import __cv_split_data_using_id_tbl_compute
 from validation.cv_utils import __cv_generate_random_id
 from utilities.utilities import __mad_version
+from utilities.utilities import split_quoted_delimited_str
 
 version_wrapper = __mad_version()
 mad_vec = version_wrapper.select_vecfunc()
 
 # ========================================================================
 
-
-def __utils_ind_var_scales(**kwargs):
+def __utils_ind_var_scales(tbl_data, col_ind_var, dimension, schema_madlib):
     """
-    The mean and standard deviation for each dimension of an array stored in a 
column.
+    The mean and standard deviation for each dimension of an array stored
+    in a column.
 
     Returns:
         Dictionary with keys 'mean' and 'std' each with a value of an array of
@@ -32,19 +33,41 @@ def __utils_ind_var_scales(**kwargs):
             FROM
                 {tbl_data}
         ) q2
-        """.format(**kwargs))[0]
+        """.format(**locals()))[0]
     x_scales["mean"] = mad_vec(x_scales["mean"], text=False)
     x_scales["std"] = mad_vec(x_scales["std"], text=False)
     return x_scales
 # ========================================================================
 
-def __utils_dep_var_scale(**kwargs):
+def __utils_ind_var_scales_grouping(tbl_data, col_ind_var, dimension,
+        schema_madlib, grouping_col, x_mean_table):
+    """
+    The mean and standard deviation for each dimension of an array stored in
+    a column. Creates a table containing the mean (array) and std of each
+    dimension of the independent variable, for each group.
+    """
+    group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
+    x_scales = plpy.execute(
+        """
+        CREATE TEMP TABLE {x_mean_table} AS
+        SELECT (f).*, {group_col}
+        FROM (
+            SELECT {group_col},
+                {schema_madlib}.__utils_var_scales_result(
+                {schema_madlib}.utils_var_scales({col_ind_var}, {dimension})) 
as f
+            FROM
+                {tbl_data}
+            GROUP BY {group_col}
+        ) q2
+        """.format(**locals()))
+# ========================================================================
+
+def __utils_dep_var_scale(schema_madlib, tbl_data, col_ind_var,
+        col_dep_var):
     """
     The mean and standard deviation for each element of the dependent variable,
     which is a scalar in ridge and lasso.
 
-    The output will be stored in a temp table: a mean array and a std array
-
     This function is also used in lasso.
 
     Parameters:
@@ -53,18 +76,109 @@ def __utils_dep_var_scale(**kwargs):
     col_ind_var -- independent variables column
     col_dep_var -- dependent variable column
     """
-
     y_scale = plpy.execute(
         """
-        select
-            avg(case when not 
{schema_madlib}.array_contains_null({col_ind_var}) then {col_dep_var} end) as 
mean,
-            1 as std
-        from {tbl_data}
-        """.format(**kwargs))[0]
-
+        SELECT
+            avg(CASE WHEN NOT 
{schema_madlib}.array_contains_null({col_ind_var}) THEN {col_dep_var} END) AS 
mean,
+            1 AS std
+        FROM {tbl_data}
+        """.format(**locals()))[0]
     return y_scale
 # ========================================================================
 
+def __utils_dep_var_scale_grouping(y_mean_table, tbl_data, grouping_col,
+        family, schema_madlib=None, col_ind_var=None, col_dep_var=None):
+    """
+    The mean and standard deviation for each element of the dependent variable,
+    w.r.t a group, which is a scalar in ridge and lasso.
+
+    The output will be stored in a temp table: a mean array and a std array,
+    for each group.
+    If the family is Binomial, mean and std for each group is set to 0 and 1
+    respectively.
+
+    This function is also used in lasso.
+
+    Parameters:
+    y_mean_table -- name of the output table to write into
+    tbl_data -- input table
+    grouping_col -- the columns to group the data on
+    family -- if family is Gaussian, ALL following parameters must be defined
+    schema_madlib -- madlib schema
+    col_ind_var -- independent variables column
+    col_dep_var -- dependent variable column
+    """
+    group_col = _cast_if_null(grouping_col, unique_string('grp_col'))
+    if family == 'binomial':
+        mean_str = '0'
+    else:
+        # If the family is Gaussian, schema_madlib, col_ind_var and
+        # col_dep_var must be passed along.
+        if schema_madlib is None or col_ind_var is None or col_dep_var is None:
+            plpy.error("Schema name, indpendent column and dependent column 
names required.")
+        mean_str = ' avg(CASE WHEN NOT {0}.array_contains_null({1}) THEN {2} 
END) '.format(
+                schema_madlib, col_ind_var, col_dep_var)
+    plpy.execute(
+        """
+        CREATE TEMP TABLE {y_mean_table} AS
+        SELECT {group_col},
+            {mean_str} AS mean,
+            1 AS std
+        FROM {tbl_data}
+        GROUP BY {group_col}
+        """.format(**locals()))
+# ========================================================================
+
+def __utils_normalize_data_grouping(y_decenter=True, **kwargs):
+    """
+    Normalize the independent and dependent variables using the calculated
+    mean's and std's in __utils_ind_var_scales and __utils_dep_var_scale.
+
+    Compute the scaled variables by: scaled_value = (origin_value - mean) / 
std,
+    and special care is needed if std is zero.
+
+    The output is a table with scaled independent and dependent variables,
+    based on mean and std for each group. This function is also used in lasso.
+
+    Parameters:
+    tbl_data -- original data
+    col_ind_var -- independent variables column
+    dimension -- length of independent variable array
+    col_dep_var -- dependent variable column
+    tbl_ind_scales -- independent variables scales array
+    tbl_dep_scale -- dependent variable scale
+    tbl_data_scaled -- scaled data result
+    col_ind_var_norm_new -- create a new name for the scaled array
+                       to be compatible with array[...] expressions
+    x_mean_table -- name of the table containing mean of 'x' for each group
+    y_mean_table -- name of the table containing mean of 'y' for each group
+    grouping_col -- columns to group the data on
+    """
+    group_col = kwargs.get('grouping_col')
+    group_col_list = split_quoted_delimited_str(group_col)
+    group_where_x = ' AND 
'.join(['{tbl_data}.{grp}=__x__.{grp}'.format(grp=grp,
+        **kwargs) for grp in group_col_list])
+    group_where_y = ' AND 
'.join(['{tbl_data}.{grp}=__y__.{grp}'.format(grp=grp,
+        **kwargs) for grp in group_col_list])
+    ydecenter_str = "- __y__.mean".format(**kwargs) if y_decenter else ""
+    plpy.execute(
+        """
+        CREATE TEMP TABLE {tbl_data_scaled} AS
+            SELECT
+                ({schema_madlib}.utils_normalize_data({col_ind_var},
+                                            __x__.mean::double precision[],
+                                            __x__.std::double precision[]))
+                    AS {col_ind_var_norm_new},
+                ({col_dep_var} {ydecenter_str})  AS {col_dep_var_norm_new},
+                {tbl_data}.{group_col}
+            FROM {tbl_data}
+            INNER JOIN {x_mean_table} AS __x__ ON {group_where_x}
+            INNER JOIN {y_mean_table} AS __y__ ON {group_where_y}
+        """.format(ydecenter_str=ydecenter_str, group_col=group_col,
+            group_where_x=group_where_x, group_where_y=group_where_y, 
**kwargs))
+    return None
+# ========================================================================
+
 def __utils_normalize_data(y_decenter=True, **kwargs):
     """
     Normalize the independent and dependent variables using the calculated 
mean's and std's
@@ -88,25 +202,22 @@ def __utils_normalize_data(y_decenter=True, **kwargs):
     col_ind_var_norm_new -- create a new name for the scaled array
                        to be compatible with array[...] expressions
     """
-    group_col = _cast_if_null(kwargs.get('grouping_col', None), 
unique_string('grp_col'))
     ydecenter_str = "- {y_mean}".format(**kwargs) if y_decenter else ""
     plpy.execute(
         """
-        create temp table {tbl_data_scaled} as
-            select
+        CREATE TEMP TABLE {tbl_data_scaled} AS
+            SELECT
                 ({schema_madlib}.utils_normalize_data({col_ind_var},
                                             '{x_mean_str}'::double precision[],
                                             '{x_std_str}'::double precision[]))
-                    as {col_ind_var_norm_new},
-                ({col_dep_var} {ydecenter_str})  as {col_dep_var_norm_new},
-                {group_col}
-            from {tbl_data}
-        """.format(ydecenter_str=ydecenter_str, group_col=group_col, **kwargs))
+                    AS {col_ind_var_norm_new},
+                ({col_dep_var} {ydecenter_str})  AS {col_dep_var_norm_new}
+            FROM {tbl_data}
+        """.format(ydecenter_str=ydecenter_str, **kwargs))
 
     return None
 # ========================================================================
 
-
 def __utils_cv_preprocess(kwargs):
     """
     Some common processes used in both ridge and lasso cross validation 
functions:

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/0ff829a7/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in 
b/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
index 6246ed9..df5489f 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_generate_result.py_in
@@ -2,7 +2,7 @@ import plpy
 from elastic_net_utils import _process_results
 from elastic_net_utils import _compute_log_likelihood
 from utilities.validate_args import get_cols_and_types
-
+from utilities.utilities import split_quoted_delimited_str
 
 def _elastic_net_generate_result(optimizer, iteration_run, **args):
     """
@@ -10,6 +10,10 @@ def _elastic_net_generate_result(optimizer, iteration_run, 
**args):
     """
     standardize_flag = "True" if args["normalization"] else "False"
     source_table = args["rel_source"]
+    data_scaled = False
+    if args["normalization"] or optimizer == "igd":
+        # x_mean_table and y_mean_table are created only in these conditions.
+        data_scaled = True
     if optimizer == "fista":
         result_func = 
"__gaussian_fista_result({0})".format(args["col_grp_state"])
     elif optimizer == "igd":
@@ -30,38 +34,54 @@ def _elastic_net_generate_result(optimizer, iteration_run, 
**args):
         grouping_str = args['grouping_str']
         cols_types = dict(get_cols_and_types(args["tbl_source"]))
         grouping_str1 = grouping_column + ","
-        select_grouping_info = ','.join([grp_col.strip() + "\t" + 
cols_types[grp_col.strip()]
-                                         for grp_col in 
grouping_column.split(',')]) + ","
+
+        select_mean_and_std = ''
+        inner_join_x = ''
+        inner_join_y = ''
+        if data_scaled:
+            grouping_cols_list = split_quoted_delimited_str(grouping_column)
+            select_grouping_info = ','.join([
+                grp_col.strip()+"\t"+cols_types[grp_col.strip()]
+                for grp_col in grouping_column.split(',')]) + ","
+            select_grp = ','.join(['n_tuples_including_nulls_subq.'+str(grp)
+                            for grp in grouping_cols_list]) + ','
+            x_grp_cols = ' AND '.join([
+                    'n_tuples_including_nulls_subq.{0}={1}.{2}'.format(grp,
+                    args["x_mean_table"], grp) for grp in grouping_cols_list])
+            y_grp_cols = ' AND '.join([
+                    'n_tuples_including_nulls_subq.{0}={1}.{2}'.format(grp,
+                    args["y_mean_table"], grp) for grp in grouping_cols_list])
+            select_mean_and_std = ' {0}.mean AS x_mean, 
'.format(args["x_mean_table"]) +\
+                ' {0}.mean AS y_mean, '.format(args["y_mean_table"]) +\
+                ' {0}.std AS x_std, '.format(args["x_mean_table"])
+            inner_join_x = ' INNER JOIN {0} ON {1} '.format(
+                args["x_mean_table"], x_grp_cols)
+            inner_join_y = ' INNER JOIN {0} ON {1} '.format(
+                args["y_mean_table"], y_grp_cols)
         out_table_qstr = """
             SELECT
-                {grouping_str1}
+                {select_grp}
+                {select_mean_and_std}
                 (result).coefficients AS coef,
                 (result).intercept AS intercept
             FROM
                 (
-                    SELECT {schema_madlib}.{result_func} AS result, 
{col_grp_key}
-                    FROM {tbl_state}
-                    WHERE {col_grp_iteration} = {iteration_run}
-                ) t
-                JOIN
-                (
                     SELECT
                         {grouping_str1}
                         array_to_string(ARRAY[{grouping_str}], ',') AS 
{col_grp_key}
                     FROM {source_table}
-                    GROUP BY {grouping_col}, {col_grp_key}
+                    GROUP BY {grouping_column}, {col_grp_key}
                 ) n_tuples_including_nulls_subq
-                USING ({col_grp_key})
-            """.format(result_func=result_func,
-                       tbl_state=tbl_state,
-                       grouping_col=grouping_column,
-                       col_grp_iteration=args["col_grp_iteration"],
-                       iteration_run=iteration_run,
-                       grouping_str1=grouping_str1,
-                       grouping_str=grouping_str,
-                       col_grp_key=col_grp_key,
-                       source_table=source_table,
-                       schema_madlib=args["schema_madlib"])
+                INNER JOIN
+                (
+                    SELECT {schema_madlib}.{result_func} AS result, 
{col_grp_key}
+                    FROM {tbl_state}
+                    WHERE {col_grp_iteration} = {iteration_run}
+                ) t USING ({col_grp_key})
+                {inner_join_x}
+                {inner_join_y}
+            """.format(schema_madlib=args["schema_madlib"],
+                       col_grp_iteration=args["col_grp_iteration"], **locals())
     else:
         # It's a much simpler query when there is no grouping.
         grouping_str1 = ""
@@ -139,7 +159,12 @@ def build_output_table(res, grouping_column, grouping_str1,
     r_coef = res["coef"]
     if r_coef:
         if args["normalization"]:
-            (coef, intercept) = _restore_scale(r_coef, res["intercept"], args)
+            if grouping_column:
+                (coef, intercept) = _restore_scale(r_coef, res["intercept"],
+                    args, res["x_mean"], res["x_std"], res["y_mean"])
+            else:
+                (coef, intercept) = _restore_scale(r_coef,
+                    res["intercept"], args)
         else:
             coef = r_coef
             intercept = res["intercept"]
@@ -167,20 +192,22 @@ def build_output_table(res, grouping_column, 
grouping_str1,
                        **args)
         plpy.execute(fquery)
 # ------------------------------------------------------------------------
-
-
-def _restore_scale(coef, intercept, args):
+def _restore_scale(coef, intercept, args,
+    x_mean=None, x_std=None, y_mean=None):
     """
     Restore the original scales
     """
+    if x_mean is None and x_std is None and y_mean is None:
+        x_mean = args["x_scales"]["mean"]
+        y_mean = args["y_scale"]["mean"]
+        x_std = args["x_scales"]["std"]
     rcoef = [0] * len(coef)
     if args["family"] == "gaussian":
-        rintercept = float(args["y_scale"]["mean"])
+        rintercept = float(y_mean)
     elif args["family"] == "binomial":
         rintercept = float(intercept)
     for i in range(len(coef)):
-        if args["x_scales"]["std"][i] != 0:
-            rcoef[i] = coef[i] / args["x_scales"]["std"][i]
-            rintercept -= (coef[i] * args["x_scales"]["mean"][i] /
-                           args["x_scales"]["std"][i])
+        if x_std[i] != 0:
+            rcoef[i] = coef[i] / x_std[i]
+            rintercept -= (coef[i] * x_mean[i] / x_std[i])
     return (rcoef, rintercept)

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/0ff829a7/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_fista.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_fista.py_in 
b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_fista.py_in
index f50a214..a6ef699 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_fista.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_fista.py_in
@@ -136,6 +136,8 @@ def _fista_cleanup_temp_tbls(**kwargs):
                 drop table if exists {tbl_data_scaled};
                 drop table if exists {tbl_fista_args};
                 drop table if exists pg_temp.{tbl_fista_state};
+                drop table if exists {x_mean_table};
+                drop table if exists {y_mean_table};
                 """.format(**kwargs))
 
     return None
@@ -209,7 +211,8 @@ def _elastic_net_fista_train_compute(schema_madlib, 
func_step_aggregate,
                                                           lambda_value,
                                                           tolerance,
                                                           schema_madlib))
-
+        args.update({'x_mean_table':unique_string(desp='x_mean_table')})
+        args.update({'y_mean_table':unique_string(desp='y_mean_table')})
         args.update({'grouping_col': grouping_col})
         # use normalized data or not
         if normalization:
@@ -226,19 +229,15 @@ def _elastic_net_fista_train_compute(schema_madlib, 
func_step_aggregate,
 
         if args["warmup_lambdas"] is not None:
             args["warm_no"] = len(args["warmup_lambdas"])
-            args["warmup_lambdas"] = args["warmup_lambdas"]
 
         if args["warmup"] and args["warmup_lambdas"] is None:
             # average squares of each feature
             # used to estimate the largest lambda value
             args["sq"] = _compute_average_sq(**args)
             args["warmup_lambdas"] = \
-                _generate_warmup_lambda_sequence(
-                    tbl_used, args["col_ind_var_new"], args["col_dep_var_new"],
-                    dimension, row_num, lambda_value, alpha,
-                    args["warmup_lambda_no"], args["sq"])
+                _generate_warmup_lambda_sequence(lambda_value,
+                args["warmup_lambda_no"])
             args["warm_no"] = len(args["warmup_lambdas"])
-            args["warmup_lambdas"] = args["warmup_lambdas"]
         elif args["warmup"] is False:
             args["warm_no"] = 1
             args["warmup_lambdas"] = [lambda_value]  # only one value
@@ -340,6 +339,11 @@ def _compute_fista(schema_madlib, func_step_aggregate, 
func_state_diff,
             if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))):
                 break
             it.kwargs["warmup_lambda_value"] = 
args.get('lambda_name')[it.kwargs["lambda_count"] - 1]
+            # Fix for JIRA MADLIB-1092
+            # 'col_n_tuples' is supposed to refer to the number of rows in the
+            # table, or the number of rows in a group. col_n_tuples gets
+            # the right value in in_mem_group_control, so using this instead
+            # of row_num (which was used hitherto).
             it.update("""
                     {schema_madlib}.{func_step_aggregate}(
                         ({col_ind_var})::double precision[],
@@ -348,7 +352,7 @@ def _compute_fista(schema_madlib, func_step_aggregate, 
func_state_diff,
                         ({warmup_lambda_value})::double precision,
                         ({alpha})::double precision,
                         ({dimension})::integer,
-                        ({row_num})::integer,
+                        ({col_n_tuples})::integer,
                         ({max_stepsize})::double precision,
                         ({eta})::double precision,
                         ({use_active_set})::integer,

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/0ff829a7/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
----------------------------------------------------------------------
diff --git 
a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in 
b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
index 091aefb..d73a754 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_optimizer_igd.py_in
@@ -144,6 +144,8 @@ def _igd_cleanup_temp_tbls(**args):
                  drop table if exists {tbl_data_scaled};
                  drop table if exists {tbl_igd_args};
                  drop table if exists pg_temp.{tbl_igd_state};
+                 drop table if exists {x_mean_table};
+                 drop table if exists {y_mean_table};
                  """.format(**args))
     return None
 # ------------------------------------------------------------------------
@@ -193,17 +195,19 @@ def _elastic_net_igd_train_compute(schema_madlib, 
func_step_aggregate,
                              is '{arg = value, ...}'::varchar[]
     """
     with MinWarning('error'):
-        (dimension, row_num) = _tbl_dimension_rownum(schema_madlib, 
tbl_source, col_ind_var)
+        (dimension, row_num) = _tbl_dimension_rownum(schema_madlib,
+            tbl_source, col_ind_var)
 
         # generate a full dict to ease the following string format
         # including several temporary table names
-        args = _igd_construct_dict(schema_madlib, family, tbl_source, 
col_ind_var,
-                                   col_dep_var, tbl_result,
-                                   dimension, row_num, lambda_value, alpha, 
normalization,
-                                   max_iter, tolerance, outstr_array,
-                                   _igd_params_parser(optimizer_params, 
lambda_value,
-                                                      tolerance, 
schema_madlib))
-
+        args = _igd_construct_dict(schema_madlib, family, tbl_source,
+            col_ind_var, col_dep_var, tbl_result, dimension, row_num,
+            lambda_value, alpha, normalization, max_iter, tolerance,
+            outstr_array, _igd_params_parser(optimizer_params, lambda_value,
+            tolerance, schema_madlib))
+
+        args.update({'x_mean_table':unique_string(desp='x_mean_table')})
+        args.update({'y_mean_table':unique_string(desp='y_mean_table')})
         args.update({'grouping_col': grouping_col})
         # use normalized data or not
         if normalization:
@@ -216,9 +220,10 @@ def _elastic_net_igd_train_compute(schema_madlib, 
func_step_aggregate,
             tbl_used = tbl_source
             args["col_ind_var_new"] = col_ind_var
             args["col_dep_var_new"] = col_dep_var
-
         args["tbl_used"] = tbl_used
 
+        # parameter values required by the IGD optimizer
+        (xmean, ymean) = _compute_means(args)
         # average squares of each feature
         # used to estimate the largest lambda value
         # also used to screen out tiny values, so order is needed
@@ -227,23 +232,16 @@ def _elastic_net_igd_train_compute(schema_madlib, 
func_step_aggregate,
 
         if args["warmup_lambdas"] is not None:
             args["warm_no"] = len(args["warmup_lambdas"])
-            args["warmup_lambdas"] = args["warmup_lambdas"]
 
         if args["warmup"] and args["warmup_lambdas"] is None:
             args["warmup_lambdas"] = \
-                _generate_warmup_lambda_sequence(
-                args["tbl_used"], args["col_ind_var_new"], 
args["col_dep_var_new"],
-                dimension, row_num, lambda_value, alpha,
-                args["warmup_lambda_no"], args["sq"])
+                _generate_warmup_lambda_sequence(lambda_value,
+                args["warmup_lambda_no"])
             args["warm_no"] = len(args["warmup_lambdas"])
-            args["warmup_lambdas"] = args["warmup_lambdas"]
         elif args["warmup"] is False:
             args["warm_no"] = 1
             args["warmup_lambdas"] = [lambda_value]  # only one value
 
-        # parameter values required by the IGD optimizer
-        (xmean, ymean) = _compute_means(**args)
-
         args.update({
             'rel_args': args["tbl_igd_args"],
             'rel_state': args["tbl_igd_state"],
@@ -263,38 +261,39 @@ def _elastic_net_igd_train_compute(schema_madlib, 
func_step_aggregate,
         if not args.get('parallel'):
             func_step_aggregate += "_single_seg"
         # perform the actual calculation
-        iteration_run = _compute_igd(schema_madlib,
-                                     func_step_aggregate,
-                                     func_state_diff,
-                                     args["tbl_igd_args"],
-                                     args["tbl_igd_state"],
-                                     tbl_used,
-                                     args["col_ind_var_new"],
-                                     args["col_dep_var_new"],
-                                     grouping_str,
-                                     grouping_col,
-                                     start_iter=0,
-                                     max_iter=args["max_iter"],
-                                     tolerance=args["tolerance"],
-                                     warmup_tolerance=args["warmup_tolerance"],
-                                     warm_no=args["warm_no"],
-                                     step_decay=args["step_decay"],
-                                     dimension=args["dimension"],
-                                     stepsize=args["stepsize"],
-                                     lambda_name=args["warmup_lambdas"],
-                                     
warmup_lambda_value=args.get('warmup_lambdas')[args["lambda_count"]-1],
-                                     alpha=args["alpha"],
-                                     row_num=args["row_num"],
-                                     xmean_val=args["xmean_val"],
-                                     ymean_val=args["ymean_val"],
-                                     lambda_count=args["lambda_count"],
-                                     rel_state=args["tbl_igd_state"],
-                                     
col_grp_iteration=args["col_grp_iteration"],
-                                     col_grp_state=args["col_grp_state"],
-                                     col_grp_key=args["col_grp_key"],
-                                     col_n_tuples=args["col_n_tuples"],
-                                     rel_source=args["rel_source"],
-                                     state_type=args["state_type"],)
+        iteration_run = _compute_igd(
+             schema_madlib,
+             func_step_aggregate,
+             func_state_diff,
+             args["tbl_igd_args"],
+             args["tbl_igd_state"],
+             tbl_used,
+             args["col_ind_var_new"],
+             args["col_dep_var_new"],
+             grouping_str,
+             grouping_col,
+             start_iter=0,
+             max_iter=args["max_iter"],
+             tolerance=args["tolerance"],
+             warmup_tolerance=args["warmup_tolerance"],
+             warm_no=args["warm_no"],
+             step_decay=args["step_decay"],
+             dimension=args["dimension"],
+             stepsize=args["stepsize"],
+             lambda_name=args["warmup_lambdas"],
+             
warmup_lambda_value=args.get('warmup_lambdas')[args["lambda_count"]-1],
+             alpha=args["alpha"],
+             row_num=args["row_num"],
+             xmean_val=args["xmean_val"],
+             ymean_val=args["ymean_val"],
+             lambda_count=args["lambda_count"],
+             rel_state=args["tbl_igd_state"],
+             col_grp_iteration=args["col_grp_iteration"],
+             col_grp_state=args["col_grp_state"],
+             col_grp_key=args["col_grp_key"],
+             col_n_tuples=args["col_n_tuples"],
+             rel_source=args["rel_source"],
+             state_type=args["state_type"])
 
         _elastic_net_generate_result("igd", iteration_run, **args)
 
@@ -341,6 +340,11 @@ def _compute_igd(schema_madlib, func_step_aggregate, 
func_state_diff,
             if (it.kwargs["lambda_count"] > len(args.get('lambda_name'))):
                 break
             it.kwargs["warmup_lambda_value"] = 
args.get('lambda_name')[it.kwargs["lambda_count"] - 1]
+            # Fix for JIRA MADLIB-1092
+            # 'col_n_tuples' is supposed to refer to the number of rows in the
+            # table, or the number of rows in a group. col_n_tuples gets
+            # the right value in in_mem_group_control, so using this instead
+            # of row_num (which was used hitherto).
             it.update("""
                     {schema_madlib}.{func_step_aggregate}(
                         ({col_ind_var})::double precision[],
@@ -350,7 +354,7 @@ def _compute_igd(schema_madlib, func_step_aggregate, 
func_state_diff,
                         ({alpha})::double precision,
                         ({dimension})::integer,
                         ({stepsize})::double precision,
-                        ({row_num})::integer,
+                        ({col_n_tuples})::integer,
                         ('{xmean_val}')::double precision[],
                         ({ymean_val})::double precision,
                         ({step_decay})::double precision

http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/0ff829a7/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
----------------------------------------------------------------------
diff --git a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in 
b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
index ce6b280..b2f2505 100644
--- a/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
+++ b/src/ports/postgres/modules/elastic_net/elastic_net_utils.py_in
@@ -6,8 +6,10 @@ from utilities.utilities import _array_to_string
 from convex.utils_regularization import __utils_ind_var_scales
 from convex.utils_regularization import __utils_dep_var_scale
 from convex.utils_regularization import __utils_normalize_data
+from convex.utils_regularization import __utils_ind_var_scales_grouping
+from convex.utils_regularization import __utils_dep_var_scale_grouping
+from convex.utils_regularization import __utils_normalize_data_grouping
 from utilities.validate_args import table_exists
-from utilities.control import IterationController2S
 
 from collections import namedtuple
 
@@ -108,7 +110,6 @@ def _generate_warmup_lambda_sequence(lambda_value, n_steps):
     return seq
 # ------------------------------------------------------------------------
 
-
 def _compute_average_sq(**args):
     """
     Compute the average squares of all features, used to estimtae the largest 
lambda
@@ -192,16 +193,27 @@ def _elastic_net_validate_args(tbl_source, col_ind_var, 
col_dep_var,
     return None
 # ------------------------------------------------------------------------
 
+def _compute_data_scales_grouping(args):
+    __utils_ind_var_scales_grouping(args["tbl_source"], args["col_ind_var"],
+        args["dimension"], args["schema_madlib"], args["grouping_col"],
+        args["x_mean_table"])
+    if args["family"] == "binomial":
+        # set mean and std to 0 and 1 respectively, for each group.
+        __utils_dep_var_scale_grouping(args["y_mean_table"],
+            args["tbl_source"], args["grouping_col"], args["family"])
+    else:
+        __utils_dep_var_scale_grouping(args["y_mean_table"],
+            args["tbl_source"], args["grouping_col"], args["family"],
+            args["schema_madlib"], args["col_ind_var"], args["col_dep_var"])
 
 def _compute_data_scales(args):
-    args["x_scales"] = __utils_ind_var_scales(tbl_data=args["tbl_source"], 
col_ind_var=args["col_ind_var"],
-                                              dimension=args["dimension"], 
schema_madlib=args["schema_madlib"])
-
+    args["x_scales"] = __utils_ind_var_scales(args["tbl_source"],
+        args["col_ind_var"], args["dimension"], args["schema_madlib"])
     if args["family"] == "binomial":
         args["y_scale"] = dict(mean=0, std=1)
     else:
-        args["y_scale"] = 
__utils_dep_var_scale(schema_madlib=args["schema_madlib"], 
tbl_data=args["tbl_source"],
-                                                
col_ind_var=args["col_ind_var"], col_dep_var=args["col_dep_var"])
+        args["y_scale"] = __utils_dep_var_scale(args["schema_madlib"],
+            args["tbl_source"], args["col_ind_var"], args["col_dep_var"])
 
     args["xmean_str"] = _array_to_string(args["x_scales"]["mean"])
 # ------------------------------------------------------------------------
@@ -214,23 +226,45 @@ def _normalize_data(args):
 
     The output is stored in tbl_data_scaled
     """
-    _compute_data_scales(args)
-
     y_decenter = True if args["family"] == "gaussian" else False
-
-    __utils_normalize_data(y_decenter=y_decenter,
-                           tbl_data=args["tbl_source"],
-                           col_ind_var=args["col_ind_var"],
-                           col_dep_var=args["col_dep_var"],
-                           tbl_data_scaled=args["tbl_data_scaled"],
-                           col_ind_var_norm_new=args["col_ind_var_norm_new"],
-                           col_dep_var_norm_new=args["col_dep_var_norm_new"],
-                           schema_madlib=args["schema_madlib"],
-                           x_mean_str=args["xmean_str"],
-                           x_std_str=_array_to_string(args["x_scales"]["std"]),
-                           y_mean=args["y_scale"]["mean"],
-                           y_std=args["y_scale"]["std"],
-                           grouping_col=args["grouping_col"])
+    if args["grouping_col"]:
+        # When grouping_col is defined, we must find an array containing
+        # the mean of every dimension in the independent variable (x), the
+        # mean of dependent variable (y) and the standard deviation for them
+        # specific to groups. Store these results in temp tables x_mean_table
+        # and y_mean_table.
+        _compute_data_scales_grouping(args)
+        # __utils_normalize_data_grouping reads the various means and stds
+        # from the tables.
+        __utils_normalize_data_grouping(y_decenter=y_decenter,
+                               tbl_data=args["tbl_source"],
+                               col_ind_var=args["col_ind_var"],
+                               col_dep_var=args["col_dep_var"],
+                               tbl_data_scaled=args["tbl_data_scaled"],
+                               
col_ind_var_norm_new=args["col_ind_var_norm_new"],
+                               
col_dep_var_norm_new=args["col_dep_var_norm_new"],
+                               schema_madlib=args["schema_madlib"],
+                               x_mean_table=args["x_mean_table"],
+                               y_mean_table=args["y_mean_table"],
+                               grouping_col=args["grouping_col"])
+    else:
+        # When no grouping_col is defined, the mean and std for both 'x' and
+        # 'y' can be defined using strings, stored in x_mean_str, x_std_str
+        # etc. We don't need a table like how we needed for grouping.
+        _compute_data_scales(args)
+        __utils_normalize_data(y_decenter=y_decenter,
+                               tbl_data=args["tbl_source"],
+                               col_ind_var=args["col_ind_var"],
+                               col_dep_var=args["col_dep_var"],
+                               tbl_data_scaled=args["tbl_data_scaled"],
+                               
col_ind_var_norm_new=args["col_ind_var_norm_new"],
+                               
col_dep_var_norm_new=args["col_dep_var_norm_new"],
+                               schema_madlib=args["schema_madlib"],
+                               x_mean_str=args["xmean_str"],
+                               
x_std_str=_array_to_string(args["x_scales"]["std"]),
+                               y_mean=args["y_scale"]["mean"],
+                               y_std=args["y_scale"]["std"],
+                               grouping_col=args["grouping_col"])
 
     return None
 # ------------------------------------------------------------------------
@@ -242,27 +276,27 @@ def _tbl_dimension_rownum(schema_madlib, tbl_source, 
col_ind_var):
     """
     # independent variable array length
     dimension = plpy.execute("""
-                             select array_upper({col_ind_var},1) as dimension
-                             from {tbl_source} limit 1
-                             """.format(tbl_source=tbl_source,
-                                        
col_ind_var=col_ind_var))[0]["dimension"]
+                     SELECT array_upper({col_ind_var},1) AS dimension
+                     FROM {tbl_source} LIMIT 1
+                 """.format(tbl_source=tbl_source,
+                        col_ind_var=col_ind_var))[0]["dimension"]
     # total row number of data source table
-    # The WHERE clause here ignores rows in the table that contain one or more 
NULLs in the
-    # independent variable (x). There is no NULL check made for the dependent 
variable (y),
-    # since one of the hard requirements/assumptions of the input data to 
elastic_net is that the
-    # dependent variable cannot be NULL.
+    # The WHERE clause here ignores rows in the table that contain one or more
+    # NULLs in the independent variable (x). There is no NULL check made for
+    # the dependent variable (y), since one of the hard assumptions of the
+    # input data to elastic_net is that the dependent variable cannot be NULL.
     row_num = plpy.execute("""
-                           select count(*) from {tbl_source}
-                           WHERE not 
{schema_madlib}.array_contains_null({col_ind_var})
-                           """.format(tbl_source=tbl_source,
-                                      schema_madlib=schema_madlib,
-                                      col_ind_var=col_ind_var))[0]["count"]
+                   SELECT COUNT(*) FROM {tbl_source}
+                   WHERE NOT {schema_madlib}.array_contains_null({col_ind_var})
+               """.format(tbl_source=tbl_source,
+                          schema_madlib=schema_madlib,
+                          col_ind_var=col_ind_var))[0]["count"]
 
     return (dimension, row_num)
 # ------------------------------------------------------------------------
 
 
-def _compute_means(**args):
+def _compute_means(args):
     """
     Compute the averages of dependent (y) and independent (x) variables
     """
@@ -270,127 +304,15 @@ def _compute_means(**args):
         xmean_str = _array_to_string([0] * args["dimension"])
         ymean = 0
         return (xmean_str, ymean)
-    else:
-        return (args["xmean_str"], args["y_scale"]["mean"])
+    if args["grouping_col"]:
+        # We can use the mean of the entire table instead of groups here.
+        # The absolute correct thing to do is to use group specific
+        # mean, but we will need to add a new column and change the input
+        # table contents to do that (it has to be accessed by the group
+        # iteration controller, C++ code). That is a lot more messier,
+        # so living with this approximation for now.
+        _compute_data_scales(args)
+    # If there is no grouping_col, note that _compute_data_scales() was
+    # already called, so we don't have to call it again.
+    return (args["xmean_str"], args["y_scale"]["mean"])
 # ------------------------------------------------------------------------
-
-
-class IterationControllerNoTableDrop (IterationController2S):
-
-    """
-    IterationController but without table dropping
-
-    Useful if one wants to use it in cross validation
-    where dropping tables in a loop would use up all the locks
-    and get "out of memory" error
-    """
-    # ------------------------------------------------------------------------
-
-    def __init__(self, rel_args, rel_state, stateType,
-                 temporaryTables=True,
-                 truncAfterIteration=False,
-                 schema_madlib="MADLIB_SCHEMA_MISSING",
-                 verbose=False,
-                 **kwargs):
-        # Need to call super class's init method to initialize
-        # member fields
-        super(IterationControllerNoTableDrop, self).__init__(
-            self, rel_args, rel_state, stateType, temporaryTables,
-            truncAfterIteration, schema_madlib, verbose, **kwargs)
-        # self.kwargs["rel_state"] = "pg_temp" + rel_state, but for testing
-        # the existence of a table, schema name should be used together
-        self.state_exists = plpy.execute(
-            "select count(*) from information_schema.tables "
-            "where table_name = '{0}' and table_schema = 'pg_temp'".
-            format(rel_state))[0]['count'] == 1
-        # The current total row number of rel_state table
-        if self.state_exists:
-            self.state_row_num = plpy.execute("select count(*) from 
{rel_state}".
-                                              
format(**self.kwargs))[0]["count"]
-
-    # ------------------------------------------------------------------------
-
-    def update(self, newState):
-        """
-        Update state of calculation.
-        """
-        newState = newState.format(iteration=self.iteration, **self.kwargs)
-        self.iteration += 1
-        if self.state_exists and self.iteration <= self.state_row_num:
-            # If the rel_state table already exists, and
-            # iteration number is smaller than total row number,
-            # use UPDATE instead of append. UPDATE does not use
-            # extra locks.
-            self.runSQL("""
-                update {rel_state} set _state = ({newState})
-                where _iteration = {iteration}
-            """.format(iteration=self.iteration,
-                       newState=newState,
-                       **self.kwargs))
-        else:
-            # rel_state table is newly created, and
-            # append data to this table
-            self.runSQL("""
-                INSERT INTO {rel_state}
-                    SELECT
-                        {iteration},
-                        ({newState})
-            """.format(iteration=self.iteration,
-                       newState=newState,
-                       **self.kwargs))
-    # ------------------------------------------------------------------------
-
-    def __enter__(self):
-        """
-        __enter__ and __exit__ methods are special. They are automatically 
called
-        when using "with" block.
-        """
-        if self.state_exists is False:
-            # create rel_state table when it does not already exist
-            super(IterationControllerNoTableDrop, self).__enter__(self)
-        self.inWith = True
-        return self
-# ------------------------------------------------------------------------
-
-
-class IterationControllerTableAppend (IterationControllerNoTableDrop):
-
-    def __init__(self, rel_args, rel_state, stateType,
-                 temporaryTables=True,
-                 truncAfterIteration=False,
-                 schema_madlib="MADLIB_SCHEMA_MISSING",
-                 verbose=False,
-                 **kwargs):
-        self.kwargs = kwargs
-        self.kwargs.update(
-            rel_args=rel_args,
-            rel_state=rel_state,
-            stateType=stateType.format(schema_madlib=schema_madlib),
-            schema_madlib=schema_madlib)
-        self.temporaryTables = temporaryTables
-        self.truncAfterIteration = truncAfterIteration
-        self.verbose = verbose
-        self.inWith = False
-        self.iteration = -1
-
-        self.state_exists = plpy.execute("""
-                                         select count(*)
-                                         from information_schema.tables
-                                         where table_name = '{rel_state}'
-                                         
""".format(**self.kwargs))[0]['count'] == 1
-    # ------------------------------------------------------------------------
-
-    def update(self, newState):
-        """
-        Update state of calculation.
-        """
-        newState = newState.format(iteration=self.iteration, **self.kwargs)
-        self.iteration += 1
-        self.runSQL("""
-                    INSERT INTO {rel_state}
-                    SELECT
-                        {iteration},
-                        ({newState})
-                    """.format(iteration=self.iteration,
-                               newState=newState,
-                               **self.kwargs))


Reply via email to