Repository: incubator-madlib Updated Branches: refs/heads/master 6f6f804b2 -> ff1b0f883
http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/ff1b0f88/src/ports/postgres/modules/convex/mlp_igd.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/convex/mlp_igd.py_in b/src/ports/postgres/modules/convex/mlp_igd.py_in index 6cea7b0..550d630 100644 --- a/src/ports/postgres/modules/convex/mlp_igd.py_in +++ b/src/ports/postgres/modules/convex/mlp_igd.py_in @@ -16,7 +16,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - """ @file mlp_igd.py_in @@ -24,17 +23,18 @@ @namespace mlp_igd """ +import math import plpy -from utilities.control import MinWarning from utilities.utilities import add_postfix from utilities.utilities import py_list_to_sql_string from utilities.utilities import extract_keyvalue_params from utilities.utilities import _assert +from utilities.utilities import _assert_equal from utilities.utilities import unique_string from utilities.utilities import strip_end_quotes - from utilities.validate_args import cols_in_tbl_valid +from utilities.validate_args import table_exists from utilities.validate_args import input_tbl_valid from utilities.validate_args import is_var_valid from utilities.validate_args import output_tbl_valid @@ -42,10 +42,14 @@ from utilities.validate_args import get_expr_type from utilities.validate_args import array_col_has_same_dimension from utilities.validate_args import array_col_dimension +from convex.utils_regularization import __utils_ind_var_scales + +from elastic_net.elastic_net_utils import _tbl_dimension_rownum + def mlp(schema_madlib, source_table, output_table, independent_varname, - dependent_varname, hidden_layer_sizes, - optimizer_param_str, activation, is_classification, **kwargs): + dependent_varname, hidden_layer_sizes, optimizer_param_str, activation, + is_classification, weights, warm_start, verbose=False): """ Args: @param schema_madlib @@ -59,62 +63,128 @@ def mlp(schema_madlib, source_table, output_table, independent_varname, Returns: None """ - with MinWarning('warning'): - optimizer_params = _get_optimizer_params(optimizer_param_str or "") - summary_table = add_postfix(output_table, "_summary") - _validate_args(source_table, output_table, summary_table, independent_varname, - dependent_varname, hidden_layer_sizes, - optimizer_params, is_classification) - - current_iteration = 1 - prev_state = None - tolerance = optimizer_params["tolerance"] - n_iterations = optimizer_params["n_iterations"] - step_size = optimizer_params["step_size"] - n_tries = optimizer_params["n_tries"] - activation_name = _get_activation_function_name(activation) - activation_index = _get_activation_index(activation_name) - num_input_nodes = array_col_dimension( - source_table, independent_varname) - num_output_nodes = 0 - classes = [] - dependent_type = get_expr_type(dependent_varname, source_table) - original_dependent_varname = dependent_varname - - if is_classification: - dependent_variable_sql = """ - SELECT DISTINCT {dependent_varname} - FROM {source_table} - """.format(dependent_varname=dependent_varname, - source_table=source_table) - labels = plpy.execute(dependent_variable_sql) - one_hot_dependent_varname = 'ARRAY[' - num_output_nodes = len(labels) - for label_obj in labels: - label = _format_label(label_obj[dependent_varname]) - classes.append(label) - one_hot_dependent_varname += dependent_varname + \ - "=" + str(label) + "," - # Remove the last comma - one_hot_dependent_varname = one_hot_dependent_varname[:-1] - one_hot_dependent_varname += ']::integer[]' - dependent_varname = one_hot_dependent_varname - else: - if "[]" not in dependent_type: - dependent_varname = "ARRAY[" + dependent_varname + "]" - num_output_nodes = array_col_dimension( - source_table, dependent_varname) - layer_sizes = [num_input_nodes] + \ - hidden_layer_sizes + [num_output_nodes] + warm_start = bool(warm_start) + optimizer_params = _get_optimizer_params(optimizer_param_str or "") + summary_table = add_postfix(output_table, "_summary") + weights = '1' if not weights or not weights.strip() else weights.strip() + hidden_layer_sizes = hidden_layer_sizes or [] + activation = _get_activation_function_name(activation) + learning_rate_policy = _get_learning_rate_policy_name( + optimizer_params["learning_rate_policy"]) + activation_index = _get_activation_index(activation) + + _validate_args(source_table, output_table, summary_table, independent_varname, + dependent_varname, hidden_layer_sizes, + optimizer_params, is_classification, weights, + warm_start, activation) + + current_iteration = 1 + prev_state = None + tolerance = optimizer_params["tolerance"] + n_iterations = optimizer_params["n_iterations"] + step_size_init = optimizer_params["learning_rate_init"] + iterations_per_step = optimizer_params["iterations_per_step"] + power = optimizer_params["power"] + gamma = optimizer_params["gamma"] + step_size = step_size_init + n_tries = optimizer_params["n_tries"] + # lambda is a reserved word in python + lmbda = optimizer_params["lambda"] + iterations_per_step = optimizer_params["iterations_per_step"] + num_input_nodes = array_col_dimension(source_table, + independent_varname) + num_output_nodes = 0 + classes = [] + dependent_type = get_expr_type(dependent_varname, source_table) + original_dependent_varname = dependent_varname + dimension, n_tuples = _tbl_dimension_rownum( + schema_madlib, source_table, independent_varname) + x_scales = __utils_ind_var_scales( + source_table, independent_varname, dimension, schema_madlib) + x_means = py_list_to_sql_string( + x_scales["mean"], array_type="DOUBLE PRECISION") + filtered_stds = [x if x != 0 else 1 for x in x_scales["std"]] + x_stds = py_list_to_sql_string( + filtered_stds, array_type="DOUBLE PRECISION") + if is_classification: + dependent_variable_sql = """ + SELECT DISTINCT {dependent_varname} + FROM {source_table} + """.format( + dependent_varname=dependent_varname, source_table=source_table) + labels = plpy.execute(dependent_variable_sql) + one_hot_dependent_varname = 'ARRAY[' + num_output_nodes = len(labels) + for label_obj in labels: + label = _format_label(label_obj[dependent_varname]) + classes.append(label) + classes.sort() + for c in classes: + one_hot_dependent_varname += dependent_varname + \ + "=" + str(c) + "," + # Remove the last comma + one_hot_dependent_varname = one_hot_dependent_varname[:-1] + one_hot_dependent_varname += ']::integer[]' + dependent_varname = one_hot_dependent_varname + else: + if "[]" not in dependent_type: + dependent_varname = "ARRAY[" + dependent_varname + "]" + num_output_nodes = array_col_dimension( + source_table, dependent_varname) + layer_sizes = [num_input_nodes] + \ + hidden_layer_sizes + [num_output_nodes] + + # Need layers sizes before validating for warm_start + coeff = [] + for i in range(len(layer_sizes) - 1): + fan_in = layer_sizes[i] + fan_out = layer_sizes[i + 1] + # Initalize according to Glorot and Bengio (2010) + # See design doc for more info + span = math.sqrt(6.0 / (fan_in + fan_out)) + dim = (layer_sizes[i] + 1) * layer_sizes[i + 1] + rand = plpy.execute("""SELECT array_agg({span}*(random()-0.5)) + AS random + FROM generate_series(0,{dim}) + """.format(span=span, dim=dim))[0]["random"] + coeff += rand + + if warm_start: + coeff, x_means, x_stds = _validate_warm_start( + source_table, output_table, summary_table, independent_varname, + original_dependent_varname, layer_sizes, optimizer_params, + is_classification, weights, warm_start, activation) + plpy.execute("DROP TABLE IF EXISTS {0}".format(output_table)) + plpy.execute("DROP TABLE IF EXISTS {0}".format(summary_table)) + best_state = [] + best_loss = [float('inf')] + prev_loss = float('inf') + loss = None + for _ in range(n_tries): while True: if prev_state: prev_state_str = py_list_to_sql_string( prev_state, array_type="double precision") else: prev_state_str = "(NULL)::DOUBLE PRECISION[]" + # else block is for "constant", so don't do anything + zero_indexed_iteration = current_iteration - 1 + if learning_rate_policy == "exp": + step_size = step_size_init * gamma**zero_indexed_iteration + elif learning_rate_policy == "inv": + step_size = step_size_init * (current_iteration)**(-power) + elif learning_rate_policy == "step": + step_size = step_size_init * gamma**( + math.floor(zero_indexed_iteration / iterations_per_step)) + + train_sql = """ SELECT + (result).state as state, + (result).loss as loss + FROM ( + SELECT {schema_madlib}.mlp_igd_step( ({independent_varname})::DOUBLE PRECISION[], ({dependent_varname})::DOUBLE PRECISION[], @@ -122,105 +192,153 @@ def mlp(schema_madlib, source_table, output_table, independent_varname, {layer_sizes}, ({step_size})::FLOAT8, {activation}, - {is_classification}) as curr_state - FROM {source_table} AS _src - """.format(schema_madlib=schema_madlib, - independent_varname=independent_varname, - dependent_varname=dependent_varname, - prev_state=prev_state_str, - # C++ uses double internally - layer_sizes=py_list_to_sql_string(layer_sizes, - array_type="double precision"), - step_size=step_size, - source_table=source_table, - activation=activation_index, - is_classification=int(is_classification)) - curr_state = plpy.execute(train_sql)[0]["curr_state"] - dist_sql = """ - SELECT {schema_madlib}.internal_mlp_igd_distance( - {prev_state}, - {curr_state}) as state_dist - """.format(schema_madlib=schema_madlib, - prev_state=prev_state_str, - curr_state=py_list_to_sql_string(curr_state, "double precision")) - state_dist = plpy.execute(dist_sql)[0]["state_dist"] - if ((state_dist and state_dist < tolerance) or - current_iteration > n_iterations): + {is_classification}, + ({weights})::DOUBLE PRECISION, + {warm_start}, + ({warm_start_coeff})::DOUBLE PRECISION[], + {n_tuples}, + {lmbda}, + {x_means}, + {x_stds} + ) as result + FROM {source_table} as _src) _step_q + """.format( + schema_madlib=schema_madlib, + independent_varname=independent_varname, + dependent_varname=dependent_varname, + prev_state=prev_state_str, + # c++ uses double internally + layer_sizes=py_list_to_sql_string( + layer_sizes, array_type="DOUBLE PRECISION"), + step_size=step_size, + source_table=source_table, + activation=activation_index, + is_classification=int(is_classification), + weights=weights, + warm_start=warm_start, + warm_start_coeff=py_list_to_sql_string( + coeff, array_type="DOUBLE PRECISION"), + n_tuples=n_tuples, + lmbda=lmbda, + x_means=x_means, + x_stds=x_stds) + step_result = plpy.execute(train_sql)[0] + curr_state = step_result['state'] + loss = step_result['loss'] + if verbose and 1 < current_iteration <= n_iterations: + plpy.info("Iteration: " + str(current_iteration - + 1) + ", Loss: " + str(loss)) + state_dist = abs(loss-prev_loss) + if ((state_dist and state_dist < tolerance) + or current_iteration > n_iterations): break prev_state = curr_state + prev_loss = loss current_iteration += 1 - _build_model_table(schema_madlib, output_table, - curr_state, n_iterations) - layer_sizes_str = py_list_to_sql_string( - layer_sizes, array_type="integer") - classes_str = py_list_to_sql_string( - [strip_end_quotes(cl, "'") for cl in classes], - array_type=dependent_type) - summary_table_creation_query = """ - CREATE TABLE {summary_table}( - source_table TEXT, - independent_varname TEXT, - dependent_varname TEXT, - tolerance FLOAT, - step_size FLOAT, - n_iterations INTEGER, - n_tries INTEGER, - layer_sizes INTEGER[], - activation_function TEXT, - is_classification BOOLEAN, - classes {dependent_type}[] - )""".format(summary_table=summary_table, - dependent_type=dependent_type) - - summary_table_update_query = """ - INSERT INTO {summary_table} VALUES( - '{source_table}', - '{independent_varname}', - '{original_dependent_varname}', - {tolerance}, - {step_size}, - {n_iterations}, - {n_tries}, - {layer_sizes_str}, - '{activation_name}', - {is_classification}, - {classes_str} - ) - """.format(**locals()) - plpy.execute(summary_table_creation_query) - plpy.execute(summary_table_update_query) -# ---------------------------------------------------------------------- - - -def _build_model_table(schema_madlib, output_table, final_state, n_iterations): + # We use previous state because the last iteration + # just calculates loss + if loss < best_loss: + best_state = prev_state + best_loss = loss + current_iteration = 1 + prev_state = None + _build_model_table(schema_madlib, output_table, best_state, + best_loss, n_iterations) + layer_sizes_str = py_list_to_sql_string( + layer_sizes, array_type="integer") + classes_str = py_list_to_sql_string( + [strip_end_quotes(cl, "'") for cl in classes], + array_type=dependent_type) + summary_table_creation_query = """ + CREATE TABLE {summary_table}( + source_table TEXT, + independent_varname TEXT, + dependent_varname TEXT, + tolerance FLOAT, + learning_rate_init FLOAT, + learning_rate_policy TEXT, + n_iterations INTEGER, + n_tries INTEGER, + layer_sizes INTEGER[], + activation TEXT, + is_classification BOOLEAN, + classes {dependent_type}[], + weights VARCHAR, + x_means DOUBLE PRECISION[], + x_stds DOUBLE PRECISION[] + )""".format(summary_table=summary_table, + dependent_type=dependent_type) + + summary_table_update_query = """ + INSERT INTO {summary_table} VALUES( + '{source_table}', + '{independent_varname}', + '{original_dependent_varname}', + {tolerance}, + {step_size_init}, + '{learning_rate_policy}', + {n_iterations}, + {n_tries}, + {layer_sizes_str}, + '{activation}', + {is_classification}, + {classes_str}, + '{weights}', + {x_means}, + {x_stds} + ) + """.format(**locals()) + plpy.execute(summary_table_creation_query) + plpy.execute(summary_table_update_query) + return None + + +def _get_loss(schema_madlib, state): + return plpy.execute(""" + SELECT + (result).loss AS loss + FROM ( + SELECT + {schema_madlib}.internal_mlp_igd_result( + {final_state_str} + ) AS result + ) rel_state_subq + """.format( + schema_madlib=schema_madlib, + final_state_str=py_list_to_sql_string(state)))[0]["loss"] + + +def _build_model_table(schema_madlib, output_table, final_state, loss, n_iterations): final_state_str = py_list_to_sql_string( final_state, array_type="double precision") model_table_query = """ - CREATE TABLE {output_table} AS + CREATE TABLE {output_table} AS + SELECT + (result).coeff as coeff, + {loss} as loss, + {n_iterations} as num_iterations + FROM ( SELECT - (result).coeff AS coeff, - (result).loss AS loss, - {n_iterations} AS num_iterations - -- (result).num_rows_processed AS num_rows_processed, - -- n_tuples_including_nulls - (result).num_rows_processed - FROM ( - SELECT - {schema_madlib}.internal_mlp_igd_result( - {final_state_str} - ) AS result - ) rel_state_subq - """.format(**locals()) + {schema_madlib}.internal_mlp_igd_result( + {final_state_str} + ) AS result + ) rel_state_subq + """.format(**locals()) plpy.execute(model_table_query) -# ---------------------------------------------------------------------- def _get_optimizer_params(param_str): params_defaults = { - "step_size": (0.001, float), + "learning_rate_init": (0.001, float), "n_iterations": (100, int), "n_tries": (1, int), "tolerance": (0.001, float), + "learning_rate_policy": ("constant", str), + "gamma": (0.1, float), + "iterations_per_step": (100, int), + "power": (0.5, float), + "lambda": (0, float) } param_defaults = dict([(k, v[0]) for k, v in params_defaults.items()]) param_types = dict([(k, v[1]) for k, v in params_defaults.items()]) @@ -228,10 +346,9 @@ def _get_optimizer_params(param_str): if not param_str: return param_defaults - name_value = extract_keyvalue_params(param_str, param_types, param_defaults, - ignore_invalid=False) + name_value = extract_keyvalue_params( + param_str, param_types, param_defaults, ignore_invalid=False) return name_value -# ---------------------------------------------------------------------- def _validate_args_classification(source_table, dependent_varname): @@ -239,89 +356,174 @@ def _validate_args_classification(source_table, dependent_varname): int_types = ['integer', 'smallint', 'bigint'] text_types = ['text', 'varchar', 'character varying', 'char', 'character'] boolean_types = ['boolean'] - _assert("[]" in expr_type or expr_type in int_types + text_types + boolean_types, + _assert("[]" in expr_type + or expr_type in int_types + text_types + boolean_types, "Dependent variable column should refer to an " "integer, boolean, text, varchar, or character type.") -# ---------------------------------------------------------------------- def _validate_args_regression(source_table, dependent_varname): expr_type = get_expr_type(dependent_varname, source_table) int_types = ['integer', 'smallint', 'bigint'] float_types = ['double precision', 'real'] - _assert("[]" in expr_type or expr_type in int_types + float_types, - "Dependent variable column should refer to an array or numeric type") + _assert( + "[]" in expr_type or expr_type in int_types + float_types, + "Dependent variable column should refer to an array or numeric type") if "[]" in expr_type: - _assert(array_col_has_same_dimension(source_table, dependent_varname), - "Dependent variable column should refer to arrays of the same length") -# ---------------------------------------------------------------------- + _assert( + array_col_has_same_dimension(source_table, dependent_varname), + "Dependent variable column should refer to arrays of the same length" + ) + + +def _validate_summary_table(summary_table): + input_tbl_valid(summary_table, 'MLP') + cols_in_tbl_valid(summary_table, [ + 'dependent_varname', 'independent_varname', 'activation', + 'tolerance', 'learning_rate_init', 'n_iterations', 'n_tries', + 'classes', 'layer_sizes', 'source_table', 'x_means', 'x_stds' + ], 'MLP') + + +def _validate_warm_start(source_table, output_table, summary_table, independent_varname, + dependent_varname, layer_sizes, + optimizer_params, is_classification, weights, + warm_start, activation): + _assert(table_exists(output_table), + "MLP error: Warm start failed due to missing model table: " + output_table) + _assert(table_exists(summary_table), + "MLP error: Warm start failed due to missing summary table: " + summary_table) + + _assert(optimizer_params["n_tries"] == 1, + "MLP error: warm_start is only compatible for n_tries = 1") + + summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0] + params = [ + "independent_varname", "dependent_varname", "layer_sizes", + "is_classification", "weights", "activation" + ] + for param in params: + _assert_equal(eval(param), summary[param], + "MLP error: warm start failed due to different parameter value: " + + param) + output = plpy.execute("SELECT * FROM {0}".format(output_table))[0] + coeff = output['coeff'] + num_coeffs = sum( + map(lambda i: (layer_sizes[i] + 1) * (layer_sizes[i + 1]), + range(len(layer_sizes) - 1))) + _assert_equal(num_coeffs, + len(coeff), + "MLP error: Warm start failed to invalid output_table: " + + output_table + ". Invalid number of coefficients in model.") + x_means = py_list_to_sql_string( + summary["x_means"], array_type="DOUBLE PRECISION") + x_stds = py_list_to_sql_string( + summary["x_stds"], array_type="DOUBLE PRECISION") + + return coeff, x_means, x_stds def _validate_args(source_table, output_table, summary_table, independent_varname, dependent_varname, hidden_layer_sizes, - optimizer_params, is_classification): + optimizer_params, is_classification, weights, warm_start, activation): input_tbl_valid(source_table, "MLP") - output_tbl_valid(output_table, "MLP") - output_tbl_valid(summary_table, "MLP") - _assert(is_var_valid(source_table, independent_varname), - "MLP error: invalid independent_varname " - "('{independent_varname}') for source_table " - "({source_table})!".format(independent_varname=independent_varname, - source_table=source_table)) - - _assert(is_var_valid(source_table, dependent_varname), - "MLP error: invalid dependent_varname " - "('{dependent_varname}') for source_table " - "({source_table})!".format(dependent_varname=dependent_varname, - source_table=source_table)) - _assert(hidden_layer_sizes is not None, - "hidden_layer_sizes may not be null") - _assert(isinstance(hidden_layer_sizes, list), - "hidden_layer_sizes must be an array of integers") - _assert(all(isinstance(value, int) for value in hidden_layer_sizes), - "MLP error: Hidden layers sizes must be integers") - _assert(all(value >= 0 for value in hidden_layer_sizes), - "MLP error: Hidden layers sizes must be greater than 0.") + if not warm_start: + output_tbl_valid(output_table, "MLP") + output_tbl_valid(summary_table, "MLP") + + _assert( + is_var_valid(source_table, independent_varname), + "MLP error: invalid independent_varname " + "('{independent_varname}') for source_table " + "({source_table})!".format( + independent_varname=independent_varname, + source_table=source_table)) + + _assert( + is_var_valid(source_table, dependent_varname), + "MLP error: invalid dependent_varname " + "('{dependent_varname}') for source_table " + "({source_table})!".format( + dependent_varname=dependent_varname, source_table=source_table)) + _assert( + isinstance(hidden_layer_sizes, list), + "hidden_layer_sizes must be an array of integers") + # TODO put this check earlier + _assert( + all(isinstance(value, int) for value in hidden_layer_sizes), + "MLP error: Hidden layers sizes must be integers") + _assert( + all(value >= 0 for value in hidden_layer_sizes), + "MLP error: Hidden layers sizes must be greater than 0.") + _assert(optimizer_params["lambda"] >= 0, + "MLP error: lambda should be greater than or equal to 0.") _assert(optimizer_params["tolerance"] >= 0, - "MLP error: Tolerance should be greater than or equal to 0.") + "MLP error: tolerance should be greater than or equal to 0.") _assert(optimizer_params["n_tries"] >= 1, - "MLP error: Number of tries should be greater than or equal to 1") - _assert(optimizer_params["n_iterations"] >= 1, - "MLP error: Number of iterations should be greater than or equal to 1") - _assert(optimizer_params["step_size"] > 0, - "MLP error: Stepsize should be greater than 0.") + "MLP error: n_tries should be greater than or equal to 1") + _assert( + optimizer_params["n_iterations"] >= 1, + "MLP error: n_iterations should be greater than or equal to 1") + _assert(optimizer_params["power"] > 0, + "MLP error: power should be greater than 0.") + _assert(0 < optimizer_params["gamma"] <= 1, + "MLP error: gamma should be between 0 and 1.") + _assert(optimizer_params["iterations_per_step"] > 0, + "MLP error: iterations_per_step should be greater than 0.") + _assert(optimizer_params["learning_rate_init"] > 0, + "MLP error: learning_rate_init should be greater than 0.") _assert("[]" in get_expr_type(independent_varname, source_table), "Independent variable column should refer to an array") - _assert(array_col_has_same_dimension(source_table, independent_varname), - "Independent variable column should refer to arrays of the same length") + _assert( + array_col_has_same_dimension(source_table, independent_varname), + "Independent variable column should refer to arrays of the same length" + ) + + int_types = ['integer', 'smallint', 'bigint'] + float_types = ['double precision', 'real'] + _assert( + get_expr_type(weights, source_table) in int_types + float_types, + "MLP error: Weights should be a numeric type") if is_classification: _validate_args_classification(source_table, dependent_varname) else: _validate_args_regression(source_table, dependent_varname) -# ---------------------------------------------------------------------- -def _get_activation_function_name(activation_function): - if not activation_function: - activation_function = 'sigmoid' +def _get_learning_rate_policy_name(learning_rate_policy): + if not learning_rate_policy: + learning_rate_policy = 'constant' + else: + supported_learning_rate_policies = ['constant', 'exp', 'inv', 'step'] + try: + learning_rate_policy = next( + x for x in supported_learning_rate_policies + if x.startswith(learning_rate_policy)) + except StopIteration: + plpy.error( + "MLP Error: Invalid learning rate policy: " + "{0}. Supported learning rate policies are ({1})".format( + learning_rate_policy, + ','.join(sorted(supported_learning_rate_policies)))) + return learning_rate_policy + + +def _get_activation_function_name(activation): + if not activation: + activation = 'sigmoid' else: - # Add non-linear kernels below after implementing them. supported_activation_function = ['sigmoid', 'tanh', 'relu'] try: - # allow user to specify a prefix substring of - # supported kernels. This works because the supported - # kernels have unique prefixes. - activation_function = next(x for x in supported_activation_function - if x.startswith(activation_function)) + activation = next( + x for x in supported_activation_function + if x.startswith(activation)) except StopIteration: - # next() returns a StopIteration if no element found plpy.error("MLP Error: Invalid activation function: " - "{0}. Supported activation functions are ({1})" - .format(activation_function, ','.join( - sorted(supported_activation_function)))) - return activation_function -# ------------------------------------------------------------------------------ + "{0}. Supported activation functions are ({1})".format( + activation, + ','.join(sorted(supported_activation_function)))) + return activation def _get_activation_index(activation_name): @@ -333,12 +535,15 @@ def _format_label(label): if isinstance(label, str): return "'" + label + "'" return label -# ------------------------------------------------------------------------- -def mlp_predict(schema_madlib, model_table, data_table, - id_col_name, output_table, - pred_type='response', **kwargs): +def mlp_predict(schema_madlib, + model_table, + data_table, + id_col_name, + output_table, + pred_type='response', + **kwargs): """ Score new observations using a trained neural network @param schema_madlib Name of the schema where MADlib is installed @@ -356,13 +561,7 @@ def mlp_predict(schema_madlib, model_table, data_table, input_tbl_valid(model_table, 'MLP') cols_in_tbl_valid(model_table, ['coeff'], 'MLP') summary_table = add_postfix(model_table, "_summary") - input_tbl_valid(summary_table, 'MLP') - cols_in_tbl_valid(summary_table, - ['dependent_varname', 'independent_varname', - 'activation_function', - 'tolerance', 'step_size', 'n_iterations', - 'n_tries', 'classes', 'layer_sizes', 'source_table'], - 'MLP') + _validate_summary_table(summary_table) summary = plpy.execute("SELECT * FROM {0}".format(summary_table))[0] coeff = py_list_to_sql_string(plpy.execute( @@ -370,106 +569,116 @@ def mlp_predict(schema_madlib, model_table, data_table, dependent_varname = summary['dependent_varname'] independent_varname = summary['independent_varname'] source_table = summary['source_table'] - activation_function = _get_activation_index(summary['activation_function']) + activation = _get_activation_index(summary['activation']) layer_sizes = py_list_to_sql_string( summary['layer_sizes'], array_type="DOUBLE PRECISION") is_classification = int(summary["is_classification"]) is_response = int(pred_type == 'response') + x_means = py_list_to_sql_string( + summary["x_means"], array_type="DOUBLE PRECISION") + x_stds = py_list_to_sql_string( + summary["x_stds"], array_type="DOUBLE PRECISION") - pred_name = ('"prob_{0}"' if pred_type == "prob" else - '"estimated_{0}"').format(dependent_varname.replace('"', '').strip()) + pred_name = ( + '"prob_{0}"' if pred_type == "prob" else + '"estimated_{0}"').format(dependent_varname.replace('"', '').strip()) input_tbl_valid(data_table, 'MLP') - _assert(is_var_valid(data_table, independent_varname), - "MLP Error: independent_varname ('{0}') is invalid for data_table ({1})". - format(independent_varname, data_table)) + _assert( + is_var_valid(data_table, independent_varname), + "MLP Error: independent_varname ('{0}') is invalid for data_table ({1})". + format(independent_varname, data_table)) _assert(id_col_name is not None, "MLP Error: id_col_name is NULL") - _assert(is_var_valid(data_table, id_col_name), - "MLP Error: id_col_name ('{0}') is invalid for {1}". - format(id_col_name, data_table)) + _assert( + is_var_valid(data_table, id_col_name), + "MLP Error: id_col_name ('{0}') is invalid for {1}".format( + id_col_name, data_table)) output_tbl_valid(output_table, 'MLP') - with MinWarning("warning"): - header = "CREATE TABLE " + output_table + " AS " - # Regression - if not is_classification: - dependent_type = get_expr_type(dependent_varname, source_table) - unnest_if_not_array = "" - # Return the same type as the user provided. Internally we always use an array, but - # if they provided a scaler, unnest it for the user - if "[]" not in dependent_type: - unnest_if_not_array = "UNNEST" + header = "CREATE TABLE " + output_table + " AS " + # Regression + if not is_classification: + dependent_type = get_expr_type(dependent_varname, source_table) + unnest_if_not_array = "" + # Return the same type as the user provided. Internally we always + # use an array, but if they provided a scaler, unnest it for + # the user + if "[]" not in dependent_type: + unnest_if_not_array = "UNNEST" + sql = header + """ + SELECT {id_col_name}, + {unnest_if_not_array}({schema_madlib}.internal_predict_mlp( + {coeff}, + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation}, + {layer_sizes}, + {is_response}, + {x_means}, + {x_stds} + )) as {pred_name} + FROM {data_table} + """ + else: + summary_query = """ + SELECT classes FROM {0} + """.format(summary_table) + classes = plpy.execute(summary_query)[0]['classes'] + if pred_type == "response": + classes_with_index_table = unique_string() + classes_table = unique_string() sql = header + """ - SELECT {id_col_name}, - {unnest_if_not_array}({schema_madlib}.internal_predict_mlp( - {coeff}, - {independent_varname}::DOUBLE PRECISION[], - {is_classification}, - {activation_function}, - {layer_sizes}, - {is_response} - )) as {pred_name} - FROM {data_table} + SELECT + q.{id_col_name} + ,(ARRAY{classes})[pred_idx[1]+1] as {pred_name} + FROM ( + SELECT + {id_col_name}, + {schema_madlib}.internal_predict_mlp( + {coeff}::DOUBLE PRECISION[], + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation}, + {layer_sizes}, + {is_response}, + {x_means}, + {x_stds} + ) + as pred_idx + FROM {data_table} + ) q """ else: - summary_query = """ - SELECT classes FROM {0} - """.format(summary_table) - classes = plpy.execute(summary_query)[0]['classes'] - if pred_type == "response": - # This join is to recover the class name from the summary table, - # as prediction just returns an index - classes_with_index_table = unique_string() - classes_table = unique_string() - sql = header + """ - SELECT - q.{id_col_name} - ,(ARRAY{classes})[pred_idx[1]+1] as {pred_name} - FROM ( - SELECT - {id_col_name}, - {schema_madlib}.internal_predict_mlp( - {coeff}::DOUBLE PRECISION[], - {independent_varname}::DOUBLE PRECISION[], - {is_classification}, - {activation_function}, - {layer_sizes}, - {is_response} - ) - as pred_idx - FROM {data_table} - ) q - """ - else: - # Incomplete - intermediate_col = unique_string() - score_format = ',\n'.join([ - 'CAST({interim}[{j}] as DOUBLE PRECISION) as "estimated_prob_{c_str}"'. - format(j=i + 1, c_str=str(c).strip(' "'), - interim=intermediate_col) - for i, c in enumerate(classes)]) - sql = header + """ - SELECT - {id_col_name}, - {score_format} - FROM ( - SELECT {id_col_name}, - {schema_madlib}.internal_predict_mlp( - {coeff}::DOUBLE PRECISION[], - {independent_varname}::DOUBLE PRECISION[], - {is_classification}, - {activation_function}, - {layer_sizes}, - {is_response} - )::TEXT[] - AS {intermediate_col} - FROM {data_table} - ) q - """ + # Incomplete + intermediate_col = unique_string() + score_format = ',\n'.join([ + 'CAST({interim}[{j}] as DOUBLE PRECISION) as "estimated_prob_{c_str}"'. + format(j=i + 1, c_str=str(c).strip(' "'), + interim=intermediate_col) + for i, c in enumerate(classes)]) + sql = header + """ + SELECT + {id_col_name}, + {score_format} + FROM ( + SELECT {id_col_name}, + {schema_madlib}.internal_predict_mlp( + {coeff}::DOUBLE PRECISION[], + {independent_varname}::DOUBLE PRECISION[], + {is_classification}, + {activation}, + {layer_sizes}, + {is_response}, + {x_means}, + {x_stds} + )::TEXT[] + AS {intermediate_col} + FROM {data_table} + ) q + """ sql = sql.format(**locals()) plpy.execute(sql) -# ---------------------------------------------------------------------- def mlp_help(schema_madlib, message, is_classification): @@ -511,34 +720,44 @@ def mlp_help(schema_madlib, message, is_classification): USAGE --------------------------------------------------------------------------- SELECT {schema_madlib}.{method}( - source_table, -- name of input table - output_table, -- name of output model table - independent_varname, -- name of independent variable - dependent_varname, -- {label_description} - hidden_layer_sizes, -- Array of integers indicating the + source_table, -- TEXT. name of input table + output_table, -- TEXT. name of output model table + independent_varname, -- TEXT. name of independent variable + dependent_varname, -- TEXT. {label_description} + hidden_layer_sizes, -- INTEGER[]. Array of integers indicating the number of hidden units per layer. Length equal to the number of hidden layers. - optimizer_params, -- optional, default NULL + optimizer_params, -- TEXT. optional, default NULL parameters for optimization in a comma-separated string of key-value pairs. + To find out more: + + SELECT {schema_madlib}.{method}('optimizer_params') - step_size DOUBLE PRECISION, -- Default: 0.001 - Learning rate - n_iterations INTEGER, -- Default: 100 - Number of iterations per try - n_tries INTEGER, -- Default: 1 - Total number of training cycles, - with random initializations to avoid - local minima. - tolerance DOUBLE PRECISION, -- Default: 0.001 - If the distance in loss between - two iterations is less than the - tolerance training will stop, even if - n_iterations has not been reached - - activation -- optional, default: 'sigmoid'. + activation -- TEXT. optional, default: 'sigmoid'. supported activations: 'relu', 'sigmoid', and 'tanh' + + weights -- TEXT. optional, default: NULL. + Weights for input rows. Column name which + specifies the weight for each input row. + This weight will be incorporated into the + update during SGD, and will not be used + for loss calculations. If not specified, + weight for each row will default to 1. + Column should be a numeric type. + + warm_start -- BOOLEAN. optional, default: FALSE. + Initalize weights with the coefficients from + the last call. If true, weights will + be initialized from output_table. Note that + all parameters other than optimizer_params, + and verbose must remain constant between calls + to warm_start. + + verbose -- BOOLEAN. optional, default: FALSE + Provides verbose output of the results of + training. ); @@ -576,22 +795,29 @@ def mlp_help(schema_madlib, message, is_classification): {1,0.09378,12.50,7.870,0,0.5240,5.8890,39.00,5.4509,5,311.0,15.20,390.50,15.71} | 1 | 21.70 \. - - Generate a multilayer perception with a two hidden layers of 5 units + - Generate a multilayer perception with a two hidden layers of 25 units each. Use the x column as the independent variables, and use the class - column as the classification. Set the tolerance to 0 so that 300 + column as the classification. Set the tolerance to 0 so that 500 iterations will be run. Use a sigmoid activation function. The model will be written to mlp_regress_result. - SELECT mlp_regression( - 'lin_housing_wi', -- Source table - 'mlp_regress_result', -- Desination table - 'x', -- Independent variable - 'y', -- Dependent variable - ARRAY[5,5], -- Number of hidden units per layer - 'step_size=0.007, - n_iterations=300, + DROP TABLE IF EXISTS mlp_regress; + DROP TABLE IF EXISTS mlp_regress_summary; + SELECT madlib.mlp_regression( + 'lin_housing', -- Source table + 'mlp_regress', -- Desination table + 'x', -- Input features + 'y', -- Dependent variable + ARRAY[25,25], -- Number of units per layer + 'learning_rate_init=0.001, + n_iterations=500, + lambda=0.001, tolerance=0', - 'sigmoid'); -- Activation + 'relu', + NULL, -- Default weight (1) + FALSE, -- No warm start + TRUE -- Verbose + ); """ @@ -630,29 +856,78 @@ def mlp_help(schema_madlib, message, is_classification): -- Generate a multilayer perception with a single hidden layer of 5 units. Use the attributes column as the independent variables, and use the class - column as the classification. Set the tolerance to 0 so that 1000 + column as the classification. Set the tolerance to 0 so that 500 iterations will be run. Use a hyperbolic tangent activation function. - The model will be written to mlp_result. + The model will be written to mlp_model. - SELECT {schema_madlib}.mlp_classification( + DROP TABLE IF EXISTS mlp_model; + DROP TABLE IF EXISTS mlp_model_summary; + SELECT madlib.mlp_classification( 'iris_data', -- Source table 'mlp_model', -- Destination table 'attributes', -- Input features 'class_text', -- Label ARRAY[5], -- Number of units per layer - 'step_size=0.003, - n_iterations=5000, + 'learning_rate_init=0.003, + n_iterations=500, tolerance=0', -- Optimizer params - 'tanh'); -- Activation function + 'tanh', -- Activation function + NULL, -- Default weight (1) + FALSE, -- No warm start + TRUE -- Verbose + ); + + SELECT * FROM mlp_model; """.format(**args) example = classification_example if is_classification else regression_example + optimizer_params = """ + ------------------------------------------------------------------------------------------------ + OPTIMIZER PARAMS + ------------------------------------------------------------------------------------------------ + learning_rate_init DOUBLE PRECISION, -- Default: 0.001 + Initial learning rate + learning_rate_policy VARCHAR, -- Default: 'constant' + One of 'constant','exp','inv','step' + 'constant': learning_rate = + learning_rate_init + 'exp': learning_rate = + learning_rate_init * gamma^(iter) + 'inv': learning_rate = + learning_rate_init * (iter+1)^(-power) + 'step': learning_rate = + learning_rate_init * gamma^(floor(iter/iterations_per_step)) + Where iter is the current iteration of SGD. + gamma DOUBLE PRECISION, -- Default: '0.1' + Decay rate for learning rate. + Valid for learning_rate_policy = 'exp', or 'step' + power DOUBLE PRECISION, -- Default: '0.5' + Exponent for learning_rate_policy = 'inv' + iterations_per_step INTEGER, -- Default: '100' + Number of iterations to run before decreasing the learning + rate by a factor of gamma. Valid for learning rate + policy = 'step' + n_iterations INTEGER, -- Default: 100 + Number of iterations per try + n_tries INTEGER, -- Default: 1 + Total number of training cycles, + with random initializations to avoid + local minima. + tolerance DOUBLE PRECISION, -- Default: 0.001 + If the distance in loss between + two iterations is less than the + tolerance training will stop, even if + n_iterations has not been reached. + """.format(**args) + if not message: return summary elif message.lower() in ('usage', 'help', '?'): return usage elif message.lower() == 'example': return example + elif message.lower() == 'optimizer_params': + return optimizer_params return """ No such option. Use "SELECT {schema_madlib}.{method}()" for help. """.format(**args) http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/ff1b0f88/src/ports/postgres/modules/convex/test/mlp.sql_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/convex/test/mlp.sql_in b/src/ports/postgres/modules/convex/test/mlp.sql_in index 97541a9..2302252 100644 --- a/src/ports/postgres/modules/convex/test/mlp.sql_in +++ b/src/ports/postgres/modules/convex/test/mlp.sql_in @@ -28,7 +28,7 @@ -- Classification -SELECT setseed(0.5); +SELECT setseed(0.6); DROP TABLE IF EXISTS iris_data, iris_test, mlp_class, mlp_class_summary CASCADE; CREATE TABLE iris_data( id integer, @@ -191,21 +191,27 @@ INSERT INTO iris_data VALUES SELECT mlp_classification( - 'iris_data', -- Source table + 'iris_data', -- Source table 'mlp_class', -- Desination table - 'attributes', -- Input features - 'class', -- Label - ARRAY[5], -- Number of units per layer - 'step_size=0.001, - n_iterations=1000, + 'attributes', -- Input features + 'class', -- Label + ARRAY[5], -- Number of units per layer + 'learning_rate_init=0.1, + learning_rate_policy=constant, + n_iterations=800, + n_tries=2, tolerance=0', - 'tanh'); + 'sigmoid', + '', + FALSE, + TRUE +); SELECT assert( -- Loss will improve much more if more iterations are run - loss < 30, - 'MLP: Loss is too high (> 30). Wrong result.' + loss < 0.1, + 'MLP: Loss is too high (> 0). Wrong result.' ) FROM mlp_class; DROP TABLE IF EXISTS mlp_prediction; @@ -239,9 +245,8 @@ SELECT mlp_predict( 'mlp_prediction', 'response'); -select * from mlp_prediction; +SELECT * FROM mlp_prediction; SELECT assert( - -- Accuracy greater than 90% COUNT(*)/150.0 > 0.95, 'MLP: Accuracy is too low (< 95%). Wrong result.' ) FROM @@ -766,65 +771,30 @@ COPY lin_housing_wi (x, grp_by_col, y) FROM STDIN NULL '?' DELIMITER '|'; {1,0.04741,0.00,11.930,0,0.5730,6.0300,80.80,2.5050,1,273.0,21.00,396.90,7.88} | 2 | 11.90 \. --- Normalize the columns -CREATE TEMPORARY TABLE maxs as( - SELECT - max(x[1]) m1, - max(x[2]) m2, - max(x[3]) m3, - max(x[4]) m4, - max(x[5]) m5, - max(x[6]) m6, - max(x[7]) m7, - max(x[8]) m8, - max(x[9]) m9, - max(x[10]) m10, - max(x[11]) m11, - max(x[12]) m12, - max(x[13]) m13, - max(x[14]) m14 - from lin_housing_wi -); -CREATE TABLE lin_housing_wi_scaled AS -SELECT ARRAY[ - x[1]/(SELECT m1 from maxs), - x[2]/(SELECT m2 from maxs), - x[3]/(SELECT m3 from maxs), - x[4]/(SELECT m4 from maxs), - x[5]/(SELECT m5 from maxs), - x[6]/(SELECT m6 from maxs), - x[7]/(SELECT m7 from maxs), - x[8]/(SELECT m8 from maxs), - x[9]/(SELECT m9 from maxs), - x[10]/(SELECT m10 from maxs), - x[11]/(SELECT m11 from maxs), - x[12]/(SELECT m12 from maxs), - x[13]/(SELECT m13 from maxs), - x[14]/(SELECT m14 from maxs)] as x, - id,y -FROM lin_housing_wi; - -DROP TABLE IF EXISTS maxs; DROP TABLE IF EXISTS mlp_regress; DROP TABLE IF EXISTS mlp_regress_summary; SELECT setseed(0); SELECT mlp_regression( - 'lin_housing_wi_scaled', -- Source table + 'lin_housing_wi', -- Source table 'mlp_regress', -- Desination table 'x', -- Input features 'y', -- Dependent variable - ARRAY[5,5], -- Number of units per layer - 'step_size=0.005, - n_iterations=800, + ARRAY[40], -- Number of units per layer + 'learning_rate_init=0.015, + learning_rate_policy=inv, + n_iterations=300, tolerance=0', - 'sigmoid'); + 'sigmoid', + '', + False, + TRUE); SELECT assert( - loss < 10, + loss < 2, 'MLP: Loss is too high (> 10). Wrong result.' ) FROM mlp_regress; @@ -832,14 +802,14 @@ SELECT assert( DROP TABLE IF EXISTS mlp_prediction_regress; SELECT mlp_predict( 'mlp_regress', - 'lin_housing_wi_scaled', + 'lin_housing_wi', 'id', 'mlp_prediction_regress', 'output'); SELECT assert( - 0.5*SUM(pow(mlp_prediction_regress.estimated_y-lin_housing_wi_scaled.y,2.0))/506 < 10.0, + 0.5*SUM(pow(mlp_prediction_regress.estimated_y-lin_housing_wi.y,2.0))/506 < 2.0, 'MLP: Predict MSE is too high (> 10). Wrong result' ) -FROM mlp_prediction_regress JOIN lin_housing_wi_scaled -ON mlp_prediction_regress.id = lin_housing_wi_scaled.id; -DROP TABLE IF EXISTS lin_housing_wi_scaled; +FROM mlp_prediction_regress JOIN lin_housing_wi +ON mlp_prediction_regress.id = lin_housing_wi.id; +DROP TABLE IF EXISTS lin_housing_wi; http://git-wip-us.apache.org/repos/asf/incubator-madlib/blob/ff1b0f88/src/ports/postgres/modules/utilities/utilities.py_in ---------------------------------------------------------------------- diff --git a/src/ports/postgres/modules/utilities/utilities.py_in b/src/ports/postgres/modules/utilities/utilities.py_in index b28a5f3..c1670b5 100644 --- a/src/ports/postgres/modules/utilities/utilities.py_in +++ b/src/ports/postgres/modules/utilities/utilities.py_in @@ -54,6 +54,18 @@ def is_orca(): # ------------------------------------------------------------------------------ +def _assert_equal(o1, o2, msg): + """ + @brief if the given objects are not equal, then raise an error with the message + @param o1 the first object + @param o2 the second object + @param msg the error message to be reported + """ + if not o1 == o2: + plpy.error(msg) +# ------------------------------------------------------------------------------ + + def _assert(condition, msg): """ @brief if the given condition is false, then raise an error with the message