Github user njayaram2 commented on a diff in the pull request: https://github.com/apache/incubator-madlib/pull/149#discussion_r128101328 --- Diff: src/ports/postgres/modules/convex/mlp_igd.py_in --- @@ -0,0 +1,734 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +@file mlp_igd.py_in + +@brief Multilayer perceptron using IGD: Driver functions + +@namespace mlp_igd +""" +import plpy + +from utilities.control import MinWarning +from utilities.utilities import add_postfix +from utilities.utilities import py_list_to_sql_string +from utilities.utilities import extract_keyvalue_params +from utilities.utilities import _assert +from utilities.utilities import unique_string +from utilities.utilities import strip_end_quotes + +from utilities.validate_args import cols_in_tbl_valid +from utilities.validate_args import input_tbl_valid +from utilities.validate_args import is_var_valid +from utilities.validate_args import output_tbl_valid +from utilities.validate_args import get_expr_type +from utilities.validate_args import array_col_has_same_dimension +from utilities.validate_args import array_col_dimension + + +def mlp(schema_madlib, source_table, output_table, independent_varname, + dependent_varname, hidden_layer_sizes, + optimizer_param_str, activation, is_classification, **kwargs): + """ + Args: + @param schema_madlib + @param source_table + @param output_table + @param independent_varname + @param dependent_varname + @param hidden_layer_sizes + @param optimizer_param_str + + Returns: + None + """ + with MinWarning('info'): + optimizer_params = _get_optimizer_params(optimizer_param_str or "") + _validate_args(source_table, output_table, independent_varname, + dependent_varname, hidden_layer_sizes, + optimizer_params, is_classification) + + current_iteration = 1 + prev_state = None + tolerance = optimizer_params["tolerance"] + n_iterations = optimizer_params["n_iterations"] + step_size = optimizer_params["step_size"] + n_tries = optimizer_params["n_tries"] + activation_name = _get_activation_function_name(activation) + activation_index = _get_activation_index(activation_name) + num_input_nodes = array_col_dimension(source_table, independent_varname) + num_output_nodes = 0 + classes = [] + dependent_type = get_expr_type(dependent_varname, source_table) + original_dependent_varname = dependent_varname + + if is_classification: + dependent_variable_sql = """ + SELECT DISTINCT {dependent_varname} + FROM {source_table} + """.format(dependent_varname=dependent_varname, + source_table=source_table) + labels = plpy.execute(dependent_variable_sql) + one_hot_dependent_varname = 'ARRAY[' + num_output_nodes = len(labels) + for label_obj in labels: + label = _format_label(label_obj[dependent_varname]) + classes.append(label) + one_hot_dependent_varname += dependent_varname + "=" + str(label) + "," + # Remove the last comma + one_hot_dependent_varname = one_hot_dependent_varname[:-1] + one_hot_dependent_varname += ']::integer[]' + dependent_varname = one_hot_dependent_varname + if not is_classification: + if "[]" not in dependent_type: + dependent_varname = "ARRAY[" + dependent_varname + "]" + num_output_nodes = array_col_dimension(source_table, dependent_varname) + layer_sizes = [num_input_nodes] + hidden_layer_sizes + [num_output_nodes] + + while True: + if prev_state: + prev_state_str = py_list_to_sql_string(prev_state, array_type="double precision") + else: + prev_state_str = "(NULL)::DOUBLE PRECISION[]" + train_sql = """ + SELECT + {schema_madlib}.mlp_igd_step( + ({independent_varname})::DOUBLE PRECISION[], + ({dependent_varname})::DOUBLE PRECISION[], + {prev_state}, + {layer_sizes}, + ({step_size})::FLOAT8, + {activation}, + {is_classification}) as curr_state + FROM {source_table} AS _src + """.format(schema_madlib=schema_madlib, + independent_varname=independent_varname, + dependent_varname=dependent_varname, + prev_state=prev_state_str, + # C++ uses double internally + layer_sizes=py_list_to_sql_string(layer_sizes, + array_type="double precision"), + step_size=step_size, + source_table=source_table, + activation=activation_index, + is_classification=int(is_classification)) + curr_state = plpy.execute(train_sql)[0]["curr_state"] + dist_sql = """ + SELECT {schema_madlib}.internal_mlp_igd_distance( + {prev_state}, + {curr_state}) as state_dist + """.format(schema_madlib=schema_madlib, + prev_state=prev_state_str, + curr_state=py_list_to_sql_string(curr_state, "double precision")) + state_dist = plpy.execute(dist_sql)[0]["state_dist"] + if ((state_dist and state_dist < tolerance) or + current_iteration > n_iterations): + break + prev_state = curr_state + current_iteration += 1 + _build_model_table(schema_madlib, output_table, curr_state, n_iterations) + layer_sizes_str = py_list_to_sql_string(layer_sizes, array_type="integer") + classes_str = py_list_to_sql_string([strip_end_quotes(cl, "'") for cl in classes], + array_type=dependent_type) + summary_table_creation_query = """ + CREATE TABLE {output_table}_summary( + source_table TEXT, + independent_varname TEXT, + dependent_varname TEXT, + tolerance FLOAT, + step_size FLOAT, + n_iterations INTEGER, + n_tries INTEGER, + layer_sizes INTEGER[], + activation_function TEXT, + is_classification BOOLEAN, + classes {dependent_type}[] + )""".format(output_table=output_table, + dependent_type=dependent_type) + + summary_table_update_query = """ + INSERT INTO {output_table}_summary VALUES( + '{source_table}', + '{independent_varname}', + '{original_dependent_varname}', + {tolerance}, + {step_size}, + {n_iterations}, + {n_tries}, + {layer_sizes_str}, + '{activation_name}', + {is_classification}, + {classes_str} + ) + """.format(**locals()) + plpy.execute(summary_table_creation_query) + plpy.execute(summary_table_update_query) +# ---------------------------------------------------------------------- + + +def _build_model_table(schema_madlib, output_table, final_state, n_iterations): + final_state_str = py_list_to_sql_string(final_state, array_type="double precision") + + model_table_query = """ + CREATE TABLE {output_table} AS + SELECT + (result).coeff AS coeff, + (result).loss AS loss, + {n_iterations} AS num_iterations + -- (result).num_rows_processed AS num_rows_processed, + -- n_tuples_including_nulls - (result).num_rows_processed + FROM ( + SELECT + {schema_madlib}.internal_mlp_igd_result( + {final_state_str} + ) AS result + ) rel_state_subq + """.format(**locals()) + plpy.execute(model_table_query) +# ---------------------------------------------------------------------- + + +def _get_optimizer_params(param_str): + params_defaults = { + "step_size": (0.001, float), + "n_iterations": (100, int), + "n_tries": (1, int), + "tolerance": (0.001, float), + } + param_defaults = dict([(k, v[0]) for k, v in params_defaults.items()]) + param_types = dict([(k, v[1]) for k, v in params_defaults.items()]) + + if not param_str: + return param_defaults + + name_value = extract_keyvalue_params(param_str, param_types, param_defaults, + ignore_invalid=False) + return name_value +# ---------------------------------------------------------------------- + + +def _validate_args_classification(source_table, dependent_varname): + expr_type = get_expr_type(dependent_varname, source_table) + int_types = ['integer', 'smallint', 'bigint'] + text_types = ['text', 'varchar', 'character varying', 'char', 'character'] + boolean_types = ['boolean'] + _assert("[]" in expr_type or expr_type in int_types + text_types + boolean_types, + "Dependent variable column should refer to an " + "integer, boolean, text, varchar, or character type.") +# ---------------------------------------------------------------------- + + +def _validate_args_regression(source_table, dependent_varname): + expr_type = get_expr_type(dependent_varname, source_table) + int_types = ['integer', 'smallint', 'bigint'] + float_types = ['double precision', 'real'] + _assert("[]" in expr_type or expr_type in int_types + float_types, + "Dependent variable column should refer to an array or numeric type") + if "[]" in expr_type: + _assert(array_col_has_same_dimension(source_table, dependent_varname), + "Dependent variable column should refer to arrays of the same length") +# ---------------------------------------------------------------------- + + +def _validate_args(source_table, output_table, independent_varname, + dependent_varname, hidden_layer_sizes, + optimizer_params, is_classification): + input_tbl_valid(source_table, "MLP") + output_tbl_valid(output_table, "MLP") + output_tbl_valid(output_table+"_summary", "MLP") + _assert(is_var_valid(source_table, independent_varname), + "MLP error: invalid independent_varname " + "('{independent_varname}') for source_table " + "({source_table})!".format(independent_varname=independent_varname, + source_table=source_table)) + + _assert(is_var_valid(source_table, dependent_varname), + "MLP error: invalid dependent_varname " + "('{dependent_varname}') for source_table " + "({source_table})!".format(dependent_varname=dependent_varname, + source_table=source_table)) + _assert(hidden_layer_sizes is not None, + "hidden_layer_sizes may not be null") + _assert(isinstance(hidden_layer_sizes, list), + "hidden_layer_sizes must be an array of integers") + _assert(all(isinstance(value, int) for value in hidden_layer_sizes), + "MLP error: Hidden layers sizes must be integers") + _assert(all(value >= 0 for value in hidden_layer_sizes), + "MLP error: Hidden layers sizes must be greater than 0.") + _assert(optimizer_params["tolerance"] >= 0, + "MLP error: Tolerance should be greater than or equal to 0.") + _assert(optimizer_params["n_tries"] >= 1, + "MLP error: Number of tries should be greater than or equal to 1") + _assert(optimizer_params["n_iterations"] >= 1, + "MLP error: Number of iterations should be greater than or equal to 1") + _assert(optimizer_params["step_size"] > 0, + "MLP error: Stepsize should be greater than 0.") + _assert("[]" in get_expr_type(independent_varname, source_table), + "Independent variable column should refer to an array") + _assert(array_col_has_same_dimension(source_table, independent_varname), + "Independent variable column should refer to arrays of the same length") + + if is_classification: + _validate_args_classification(source_table, dependent_varname) + else: + _validate_args_regression(source_table, dependent_varname) +# ---------------------------------------------------------------------- + + +def _get_activation_function_name(activation_function): + if not activation_function: + activation_function = 'sigmoid' + else: + # Add non-linear kernels below after implementing them. + supported_activation_function = ['sigmoid', 'tanh', 'relu'] + try: + # allow user to specify a prefix substring of + # supported kernels. This works because the supported + # kernels have unique prefixes. + activation_function = next(x for x in supported_activation_function + if x.startswith(activation_function)) + except StopIteration: + # next() returns a StopIteration if no element found + plpy.error("MLP Error: Invalid activation function: " + "{0}. Supported activation functions are ({1})" + .format(activation_function, ','.join( + sorted(supported_activation_function)))) + return activation_function +# ------------------------------------------------------------------------------ + + +def _get_activation_index(activation_name): + table = {"relu": 0, "sigmoid": 1, "tanh": 2} + return table[activation_name] + + +def _format_label(label): + if isinstance(label, str): + return "'" + label + "'" + return label +# ------------------------------------------------------------------------- + + +def mlp_predict(schema_madlib, model_table, data_table, + id_col_name, output_table, + pred_type='response', **kwargs): + """ Score new observations using a trained neural network + + @param schema_madlib Name of the schema where MADlib is installed + @param model_table Name of learned model + @param data_table Name of table/view containing the data + points to be scored + @param id_col_name Name of column in source_table containing + (integer) identifier for data point + @param output_table Name of table to store the results + @param pred_type: str, The type of output required: + 'response' gives the actual response values, + 'prob' gives the probability of the classes in a + For regression, only type='response' is defined. + """ + # model table + input_tbl_valid(model_table, 'MLP') + cols_in_tbl_valid(model_table, ['coeff'], 'MLP') + # summary table + summary_table = add_postfix(model_table, "_summary") + input_tbl_valid(summary_table, 'MLP') + cols_in_tbl_valid(summary_table, + ['dependent_varname', 'independent_varname', + 'activation_function', + 'tolerance', 'step_size', 'n_iterations', + 'n_tries', 'classes', 'layer_sizes', 'source_table'], + 'MLP') + + # read necessary info from summary + summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0] + coeff = py_list_to_sql_string(plpy.execute("SELECT * FROM {0}".format(model_table))[0]["coeff"]) + dependent_varname = summary['dependent_varname'] + independent_varname = summary['independent_varname'] + source_table = summary['source_table'] + activation_function = _get_activation_index(summary['activation_function']) + layer_sizes = py_list_to_sql_string(summary['layer_sizes'], array_type="DOUBLE PRECISION") + is_classification = int(summary["is_classification"]) + is_response = int(pred_type == 'response') + + pred_name = ('"prob_{0}"' if pred_type == "prob" else + '"estimated_{0}"').format(dependent_varname.replace('"', '').strip()) + + input_tbl_valid(data_table, 'MLP') + + _assert(is_var_valid(data_table, independent_varname), + "MLP Error: independent_varname ('{0}') is invalid for data_table ({1})". + format(independent_varname, data_table)) + _assert(id_col_name is not None, "MLP Error: id_col_name is NULL") + _assert(is_var_valid(data_table, id_col_name), + "MLP Error: id_col_name ('{0}') is invalid for {1}". + format(id_col_name, data_table)) + output_tbl_valid(output_table, 'MLP') + # optimizer_param_dict = _get_optimizer_params(optimizer_params) + + with MinWarning("warning"): + header = "CREATE TABLE " + output_table + " AS " + # Regression + if not is_classification: + dependent_type = get_expr_type(dependent_varname, source_table) + unnest_if_not_array = "" + # Return the same type as the user provided. Internally we always use an array, but + # if they provided a scaler, unnest it for the user + if "[]" not in dependent_type: + unnest_if_not_array = "UNNEST" + sql = header + """ + SELECT {id_col_name}, + {unnest_if_not_array}({schema_madlib}.internal_predict_mlp( + {coeff}, + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation_function}, + {layer_sizes}, + {is_response} + )) as {pred_name} + FROM {data_table} + """ + else: + summary_query = """ + SELECT classes FROM {0}_summary + """.format(model_table) + classes = plpy.execute(summary_query)[0]['classes'] + if pred_type == "response": + # This join is to recover the class name from the summary table, + # as prediction just returns an index + classes_with_index_table = unique_string() + classes_table = unique_string() + sql = header + """ + SELECT + q.{id_col_name} + ,(ARRAY{classes})[pred_idx[1]+1] as {pred_name} + FROM ( + SELECT + {id_col_name}, + {schema_madlib}.internal_predict_mlp( + {coeff}::DOUBLE PRECISION[], + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation_function}, + {layer_sizes}, + {is_response} + ) + as pred_idx + FROM {data_table} + ) q + """ + else: + # Incomplete + intermediate_col = unique_string() + score_format = ',\n'.join([ + 'CAST({interim}[{j}] as DOUBLE PRECISION) as "estimated_prob_{c_str}"'. + format(j=i + 1, c_str=str(c).strip(' "'), interim=intermediate_col) + for i, c in enumerate(classes)]) + sql = header + """ + SELECT + {id_col_name}, + {score_format} + FROM ( + SELECT {id_col_name}, + {schema_madlib}.internal_predict_mlp( + {coeff}::DOUBLE PRECISION[], + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation_function}, + {layer_sizes}, + {is_response} + )::TEXT[] + AS {intermediate_col} + FROM {data_table} + ) q + """ + sql = sql.format(**locals()) + with MinWarning('warning'): --- End diff -- This might not be necessary, use the `MinWarning` that was defined before the `if` condition instead.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---