This is an automated email from the ASF dual-hosted git repository. khannaekta pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/madlib.git
commit c849dd08a16ee03114dbe75ee65781dd7264c925 Author: Ekta Khanna <[email protected]> AuthorDate: Thu Oct 15 17:42:11 2020 -0700 DL: [AutoML] Split automl methods to their own files JIRA: MADLIB-1453 This commit also set plan_cache_mode when calling fit multiple model from the automl methods. Co-authored-by: Nikhil Kak <[email protected]> --- .../deep_learning/madlib_keras_automl.py_in | 831 +-------------------- .../deep_learning/madlib_keras_automl.sql_in | 20 +- .../madlib_keras_automl_hyperband.py_in | 419 +++++++++++ .../madlib_keras_automl_hyperopt.py_in | 458 ++++++++++++ .../test/unit_tests/test_madlib_keras_automl.py_in | 16 +- 5 files changed, 907 insertions(+), 837 deletions(-) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in index dc8c837..c795ee1 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.py_in @@ -17,24 +17,13 @@ # specific language governing permissions and limitations # under the License. -from ast import literal_eval -from datetime import datetime -from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING -from hyperopt.base import Domain -import math -import numpy as np import plpy -import time from madlib_keras_validator import MstLoaderInputValidator -# from utilities.admin import cleanup_madlib_temp_tables from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \ - unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table -from utilities.control import SetGUC -from madlib_keras_fit_multiple_model import FitMultipleModel -from madlib_keras_helper import generate_row_string -from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME -from madlib_keras_model_selection import MstSearch, ModelSelectionSchema + unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table, \ + is_platform_pg +from madlib_keras_model_selection import ModelSelectionSchema from keras_model_arch_table import ModelArchSchema from utilities.validate_args import table_exists, drop_tables, input_tbl_valid from utilities.validate_args import quote_ident @@ -49,6 +38,7 @@ class AutoMLConstants: R = 'R' ETA = 'eta' SKIP_LAST = 'skip_last' + HYPERBAND_PARAMS = [R, ETA, SKIP_LAST] LOSS_METRIC = 'training_loss_final' TEMP_MST_TABLE = unique_string('temp_mst_table') TEMP_MST_SUMMARY_TABLE = add_postfix(TEMP_MST_TABLE, '_summary') @@ -57,119 +47,11 @@ class AutoMLConstants: NUM_CONFIGS = 'num_configs' NUM_ITERS = 'num_iterations' ALGORITHM = 'algorithm' + HYPEROPT_PARAMS = [NUM_CONFIGS, NUM_ITERS, ALGORITHM] TIME_FORMAT = '%Y-%m-%d %H:%M:%S' INT_MAX = 2 ** 31 - 1 TARGET_SCHEMA = 'public' -class HyperbandSchedule(): - """The utility class for loading a hyperband schedule table with algorithm inputs. - - Attributes: - schedule_table (string): Name of output table containing hyperband schedule. - R (int): Maximum number of resources (iterations) that can be allocated - to a single configuration. - eta (int): Controls the proportion of configurations discarded in - each round of successive halving. - skip_last (int): The number of last rounds to skip. - """ - def __init__(self, schedule_table, R, eta=3, skip_last=0): - self.schedule_table = schedule_table # table name to store hyperband schedule - self.R = R # maximum iterations/epochs allocated to a configuration - self.eta = eta # defines downsampling rate - self.skip_last = skip_last - self.module_name = 'hyperband_schedule' - self.validate_inputs() - - # number of unique executions of Successive Halving (minus one) - self.s_max = int(math.floor(math.log(self.R, self.eta))) - self.validate_s_max() - - self.schedule_vals = [] - - self.calculate_schedule() - - def load(self): - """ - The entry point for loading the hyperband schedule table. - """ - self.create_schedule_table() - self.insert_into_schedule_table() - - def validate_inputs(self): - """ - Validates user input values - """ - _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name)) - _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name)) - - def validate_s_max(self): - _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " + - "non-negative and less than {1}".format(self.module_name,self.s_max)) - - def calculate_schedule(self): - """ - Calculates the hyperband schedule (number of configs and allocated resources) - in each round of each bracket and skips the number of last rounds specified in 'skip_last' - """ - for s in reversed(range(self.s_max+1)): - n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations - r = self.R * math.pow(self.eta, -s) - - for i in range((s+1) - int(self.skip_last)): - # Computing each of the - n_i = n*math.pow(self.eta, -i) - r_i = r*math.pow(self.eta, i) - - self.schedule_vals.append({AutoMLConstants.BRACKET: s, - AutoMLConstants.ROUND: i, - AutoMLConstants.CONFIGURATIONS: int(n_i), - AutoMLConstants.RESOURCES: int(round(r_i))}) - - def create_schedule_table(self): - """Initializes the output schedule table""" - create_query = """ - CREATE TABLE {self.schedule_table} ( - {s} INTEGER, - {i} INTEGER, - {n_i} INTEGER, - {r_i} INTEGER, - unique ({s}, {i}) - ); - """.format(self=self, - s=AutoMLConstants.BRACKET, - i=AutoMLConstants.ROUND, - n_i=AutoMLConstants.CONFIGURATIONS, - r_i=AutoMLConstants.RESOURCES) - plpy.execute(create_query) - - def insert_into_schedule_table(self): - """Insert everything in self.schedule_vals into the output schedule table.""" - for sd in self.schedule_vals: - sd_s = sd[AutoMLConstants.BRACKET] - sd_i = sd[AutoMLConstants.ROUND] - sd_n_i = sd[AutoMLConstants.CONFIGURATIONS] - sd_r_i = sd[AutoMLConstants.RESOURCES] - insert_query = """ - INSERT INTO - {self.schedule_table}( - {s_col}, - {i_col}, - {n_i_col}, - {r_i_col} - ) - VALUES ( - {sd_s}, - {sd_i}, - {sd_n_i}, - {sd_r_i} - ) - """.format(s_col=AutoMLConstants.BRACKET, - i_col=AutoMLConstants.ROUND, - n_i_col=AutoMLConstants.CONFIGURATIONS, - r_i_col=AutoMLConstants.RESOURCES, - **locals()) - plpy.execute(insert_query) - class KerasAutoML(object): """ The core AutoML class for running AutoML algorithms such as Hyperband and Hyperopt. @@ -179,6 +61,9 @@ class KerasAutoML(object): automl_params=None, random_state=None, object_table=None, use_gpus=False, validation_table=None, metrics_compute_frequency=None, name=None, description=None, **kwargs): + if is_platform_pg(): + plpy.error( + "DL: AutoML is not supported on PostgreSQL.") self.schema_madlib = schema_madlib self.source_table = source_table self.model_output_table = model_output_table @@ -381,704 +266,8 @@ class KerasAutoML(object): Remove all intermediate tables created for AutoML runs/updates. :param model_training: Fit Multiple function call object. """ + if not model_training: + return drop_tables([model_training.original_model_output_table, model_training.model_info_table, model_training.model_summary_table, AutoMLConstants.TEMP_MST_TABLE, AutoMLConstants.TEMP_MST_SUMMARY_TABLE]) - -class AutoMLHyperband(KerasAutoML): - """ - This class implements Hyperband, an infinite-arm bandit based algorithm that speeds up random search - through adaptive resource allocation, successive halving (SHA), and early stopping. - - This class showcases a novel hyperband implementation by executing the hyperband rounds 'diagonally' - to evaluate multiple configurations together and leverage the compute power of MPP databases such as Greenplum. - - This automl method inherits qualities from the automl class. - """ - def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, - model_id_list, compile_params_grid, fit_params_grid, automl_method, - automl_params, random_state=None, object_table=None, - use_gpus=False, validation_table=None, metrics_compute_frequency=None, - name=None, description=None, **kwargs): - automl_method = automl_method if automl_method else AutoMLConstants.HYPERBAND - automl_params = automl_params if automl_params else 'R=6, eta=3, skip_last=0' - KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table, - model_selection_table, model_id_list, compile_params_grid, fit_params_grid, - automl_method, automl_params, random_state, object_table, use_gpus, - validation_table, metrics_compute_frequency, name, description, **kwargs) - self.validate_and_define_inputs() - self.create_model_output_table() - self.create_model_output_info_table() - self.find_hyperband_config() - - def validate_and_define_inputs(self): - automl_params_dict = extract_keyvalue_params(self.automl_params, - lower_case_names=False) - # casting dict values to int - for i in automl_params_dict: - automl_params_dict[i] = int(automl_params_dict[i]) - _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3, - "{0}: Only R, eta, and skip_last may be specified".format(self.module_name)) - for i in automl_params_dict: - if i == AutoMLConstants.R: - self.R = automl_params_dict[AutoMLConstants.R] - elif i == AutoMLConstants.ETA: - self.eta = automl_params_dict[AutoMLConstants.ETA] - elif i == AutoMLConstants.SKIP_LAST: - self.skip_last = automl_params_dict[AutoMLConstants.SKIP_LAST] - else: - plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i)) - _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name)) - _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name)) - self.s_max = int(math.floor(math.log(self.R, self.eta))) - _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " \ - "non-negative and less than {1}".format(self.module_name, self.s_max)) - - def find_hyperband_config(self): - """ - Executes the diagonal hyperband algorithm. - """ - initial_vals = {} - - # get hyper parameter configs for each s - for s in reversed(range(self.s_max+1)): - n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations - r = self.R * math.pow(self.eta, -s) # initial number of iterations to run configurations for - initial_vals[s] = (n, int(round(r))) - self.start_training_time = self.get_current_timestamp() - self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) - random_search = MstSearch(self.schema_madlib, - self.model_arch_table, - self.model_selection_table, - self.model_id_list, - self.compile_params_grid, - self.fit_params_grid, - 'random', - sum([initial_vals[k][0] for k in initial_vals][self.skip_last:]), - self.random_state, - self.object_table) - random_search.load() # for populating mst tables - - # for creating the summary table for usage in fit multiple - plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_SUMMARY_TABLE} AS " \ - "SELECT * FROM {random_search.model_selection_summary_table}".format(AutoMLSchema=AutoMLConstants, - random_search=random_search)) - ranges_dict = self.mst_key_ranges_dict(initial_vals) - # to store the bracket and round numbers - s_dict, i_dict = {}, {} - for key, val in ranges_dict.items(): - for mst_key in range(val[0], val[1]+1): - s_dict[mst_key] = key - i_dict[mst_key] = -1 - - # outer loop on diagonal - for i in range((self.s_max+1) - int(self.skip_last)): - # inner loop on s desc - temp_lst = [] - configs_prune_lookup = {} - for s in range(self.s_max, self.s_max-i-1, -1): - n = initial_vals[s][0] - n_i = n * math.pow(self.eta, -i+self.s_max-s) - configs_prune_lookup[s] = int(round(n_i)) - temp_lst.append("{0} configs under bracket={1} & round={2}".format(int(n_i), s, s-self.s_max+i)) - num_iterations = int(initial_vals[self.s_max-i][1]) - plpy.info('*** Diagonally evaluating ' + ', '.join(temp_lst) + ' with {0} iterations ***'.format( - num_iterations)) - - self.reconstruct_temp_mst_table(i, ranges_dict, configs_prune_lookup) # has keys to evaluate - active_keys = plpy.execute("SELECT {ModelSelectionSchema.MST_KEY} " \ - "FROM {AutoMLSchema.TEMP_MST_TABLE}".format(AutoMLSchema=AutoMLConstants, - ModelSelectionSchema=ModelSelectionSchema)) - for k in active_keys: - i_dict[k[ModelSelectionSchema.MST_KEY]] += 1 - self.warm_start = int(i != 0) - mcf = self.metrics_compute_frequency if self._is_valid_metrics_compute_frequency(num_iterations) else None - with SetGUC("plan_cache_mode", "force_generic_plan"): - model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLSchema.TEMP_OUTPUT_TABLE, - AutoMLSchema.TEMP_MST_TABLE, num_iterations, self.use_gpus, - self.validation_table, mcf, self.warm_start, self.name, self.description) - self.update_model_output_table(model_training) - self.update_model_output_info_table(i, model_training, initial_vals) - - self.print_best_mst_so_far() - - self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) - self.add_additional_info_cols(s_dict, i_dict) - self.update_model_selection_table() - self.generate_model_output_summary_table(model_training) - self.remove_temp_tables(model_training) - # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA) - - def mst_key_ranges_dict(self, initial_vals): - """ - Extracts the ranges of model configs (using mst_keys) belonging to / sampled as part of - executing a particular SHA bracket. - """ - d = {} - for s_val in sorted(initial_vals.keys(), reverse=True): # going from s_max to 0 - if s_val == self.s_max: - d[s_val] = (1, initial_vals[s_val][0]) - else: - d[s_val] = (d[s_val+1][1]+1, d[s_val+1][1]+initial_vals[s_val][0]) - return d - - def reconstruct_temp_mst_table(self, i, ranges_dict, configs_prune_lookup): - """ - Drops and Reconstructs a temp mst table for evaluation along particular diagonals of hyperband. - :param i: outer diagonal loop iteration. - :param ranges_dict: model config ranges to group by bracket number. - :param configs_prune_lookup: Lookup dictionary for configs to evaluate for a diagonal. - :return: - """ - if i == 0: - _assert_equal(len(configs_prune_lookup), 1, "invalid args") - lower_bound, upper_bound = ranges_dict[self.s_max] - plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} AS SELECT * FROM {self.model_selection_table} " - "WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \ - "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound}".format(self=self, - AutoMLSchema=AutoMLConstants, - lower_bound=lower_bound, - upper_bound=upper_bound, - ModelSelectionSchema=ModelSelectionSchema)) - return - # dropping and repopulating temp_mst_table - drop_tables([AutoMLConstants.TEMP_MST_TABLE]) - - # {mst_key} changed from SERIAL to INTEGER for safe insertions and preservation of mst_key values - create_query = """ - CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} ( - {mst_key} INTEGER, - {model_id} INTEGER, - {compile_params} VARCHAR, - {fit_params} VARCHAR, - unique ({model_id}, {compile_params}, {fit_params}) - ); - """.format(AutoMLSchema=AutoMLConstants, - mst_key=ModelSelectionSchema.MST_KEY, - model_id=ModelSelectionSchema.MODEL_ID, - compile_params=ModelSelectionSchema.COMPILE_PARAMS, - fit_params=ModelSelectionSchema.FIT_PARAMS) - plpy.execute(create_query) - - query = "" - new_configs = True - for s_val in configs_prune_lookup: - lower_bound, upper_bound = ranges_dict[s_val] - if new_configs: - query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \ - "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \ - "{ModelSelectionSchema.FIT_PARAMS} FROM {self.model_selection_table} WHERE " \ - "{ModelSelectionSchema.MST_KEY} >= {lower_bound} AND {ModelSelectionSchema.MST_KEY} <= " \ - "{upper_bound};".format(self=self, AutoMLSchema=AutoMLConstants, - ModelSelectionSchema=ModelSelectionSchema, - lower_bound=lower_bound, upper_bound=upper_bound) - new_configs = False - else: - query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \ - "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \ - "{ModelSelectionSchema.FIT_PARAMS} " \ - "FROM {self.model_info_table} WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \ - "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound} ORDER BY {AutoMLSchema.LOSS_METRIC} " \ - "LIMIT {configs_prune_lookup_val};".format(self=self, AutoMLSchema=AutoMLConstants, - ModelSelectionSchema=ModelSelectionSchema, - lower_bound=lower_bound, upper_bound=upper_bound, - configs_prune_lookup_val=configs_prune_lookup[s_val]) - plpy.execute(query) - - def update_model_output_table(self, model_training): - """ - Updates gathered information of a hyperband diagonal run to the overall model output table. - :param model_training: Fit Multiple function call object. - """ - # updates model weights for any previously trained configs - plpy.execute("UPDATE {self.model_output_table} a SET model_weights=" \ - "t.model_weights FROM {model_training.original_model_output_table} t " \ - "WHERE a.mst_key=t.mst_key".format(self=self, model_training=model_training)) - - # truncate and re-creates table to avoid memory blow-ups - with SetGUC("dev_opt_unsafe_truncate_in_subtransaction", "on"): - temp_model_table = unique_string('updated_model') - plpy.execute("CREATE TABLE {temp_model_table} AS SELECT * FROM {self.model_output_table};" \ - "TRUNCATE {self.model_output_table}; " \ - "DROP TABLE {self.model_output_table};".format(temp_model_table=temp_model_table, self=self)) - rename_table(self.schema_madlib, temp_model_table, self.model_output_table) - - # inserts any newly trained configs - plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM {model_training.original_model_output_table} " \ - "WHERE {model_training.original_model_output_table}.mst_key NOT IN " \ - "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_output_table})".format(self=self, - model_training=model_training, - ModelSelectionSchema=ModelSelectionSchema)) - - def update_model_output_info_table(self, i, model_training, initial_vals): - """ - Updates gathered information of a hyperband diagonal run to the overall model output info table. - :param i: outer diagonal loop iteration. - :param model_training: Fit Multiple function call object. - :param initial_vals: Dictionary of initial configurations and resources as part of the initial hyperband - schedule. - """ - # normalizing factor for metrics_iters due to warm start - epochs_factor = sum([n[1] for n in initial_vals.values()][::-1][:i]) # i & initial_vals args needed - iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \ - "FROM {model_training.model_summary_table}".format(AutoMLSchema=AutoMLConstants, - model_training=model_training)) - metrics_iters_val = [epochs_factor+mi for mi in iters[0]['metrics_iters']] # global iteration counter - - validation_update_q = "validation_metrics_final=t.validation_metrics_final, " \ - "validation_loss_final=t.validation_loss_final, " \ - "validation_metrics=a.validation_metrics || t.validation_metrics, " \ - "validation_loss=a.validation_loss || t.validation_loss, " \ - if self.validation_table else "" - - # updates train/val info for any previously trained configs - plpy.execute("UPDATE {self.model_info_table} a SET " \ - "metrics_elapsed_time=a.metrics_elapsed_time || t.metrics_elapsed_time, " \ - "training_metrics_final=t.training_metrics_final, " \ - "training_loss_final=t.training_loss_final, " \ - "training_metrics=a.training_metrics || t.training_metrics, " \ - "training_loss=a.training_loss || t.training_loss, ".format(self=self) + validation_update_q + - "{AutoMLSchema.METRICS_ITERS}=a.metrics_iters || ARRAY{metrics_iters_val}::INTEGER[] " \ - "FROM {model_training.model_info_table} t " \ - "WHERE a.mst_key=t.mst_key".format(model_training=model_training, AutoMLSchema=AutoMLConstants, - metrics_iters_val=metrics_iters_val)) - - # inserts info about metrics and validation for newly trained model configs - plpy.execute("INSERT INTO {self.model_info_table} SELECT t.*, ARRAY{metrics_iters_val}::INTEGER[] AS metrics_iters " \ - "FROM {model_training.model_info_table} t WHERE t.mst_key NOT IN " \ - "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_info_table})".format(self=self, - model_training=model_training, - metrics_iters_val=metrics_iters_val, - ModelSelectionSchema=ModelSelectionSchema)) - - def add_additional_info_cols(self, s_dict, i_dict): - """Adds s and i columns to the info table""" - - plpy.execute("ALTER TABLE {self.model_info_table} ADD COLUMN s int, ADD COLUMN i int;".format(self=self)) - - l = [(k, s_dict[k], i_dict[k]) for k in s_dict] - query = "UPDATE {self.model_info_table} t SET s=b.s_val, i=b.i_val FROM unnest(ARRAY{l}) " \ - "b (key integer, s_val integer, i_val integer) WHERE t.mst_key=b.key".format(self=self, l=l) - plpy.execute(query) - -class AutoMLHyperopt(KerasAutoML): - """ - This class implements Hyperopt, another automl method that explores awkward search spaces using - Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE. - - This function executes hyperopt on top of our multiple model training infrastructure powered with - Model hOpper Parallelism (MOP), a hybrid of data and task parallelism. - - This automl method inherits qualities from the automl class. - """ - def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, - model_id_list, compile_params_grid, fit_params_grid, automl_method, - automl_params, random_state=None, object_table=None, - use_gpus=False, validation_table=None, metrics_compute_frequency=None, - name=None, description=None, **kwargs): - automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT - automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe' - KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table, - model_selection_table, model_id_list, compile_params_grid, fit_params_grid, - automl_method, automl_params, random_state, object_table, use_gpus, - validation_table, metrics_compute_frequency, name, description, **kwargs) - self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '') - self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '') - try: - self.compile_params_grid = literal_eval(self.compile_params_grid) - - except: - plpy.error("Invalid syntax in 'compile_params_dict'") - try: - self.fit_params_grid = literal_eval(self.fit_params_grid) - except: - plpy.error("Invalid syntax in 'fit_params_dict'") - self.validate_and_define_inputs() - self.num_segments = self.get_num_segments() - - self.create_model_output_table() - self.create_model_output_info_table() - self.find_hyperopt_config() - - def get_num_segments(self): - """ - # query dist rules from summary table to get the total no of segments - :return: - """ - source_summary_table = add_postfix(self.source_table, '_summary') - dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME] - #TODO create constant for all_segments - if dist_rules == "all_segments": - return get_seg_number() - - return len(dist_rules) - - def validate_and_define_inputs(self): - automl_params_dict = extract_keyvalue_params(self.automl_params, - lower_case_names=True) - # casting relevant values to int - for i in automl_params_dict: - try: - automl_params_dict[i] = int(automl_params_dict[i]) - except ValueError: - pass - _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3, - "{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name)) - for i in automl_params_dict: - if i == AutoMLConstants.NUM_CONFIGS: - self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS] - elif i == AutoMLConstants.NUM_ITERS: - self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS] - elif i == AutoMLConstants.ALGORITHM: - if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand': - self.algorithm = rand - elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe': - self.algorithm = tpe - # TODO: Add support for atpe uncomment the below lines after atpe works - # elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe': - # self.algorithm = atpe - else: - plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe' - else: - plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i)) - _assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' " - "must be > 0".format(self.module_name)) - _assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' " - "out of iteration range".format(self.module_name)) - - def find_hyperopt_config(self): - """ - Executes hyperopt on top of MOP. - """ - make_mst_summary = True - trials = Trials() - domain = Domain(None, self.get_search_space()) - rand_state = np.random.RandomState(self.random_state) - configs_lst = self.get_configs_list(self.num_configs, self.num_segments) - - self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) - fit_multiple_runtime = 0 - for low, high in configs_lst: - i, n = low, high - low + 1 - - # Using HyperOpt TPE/ATPE to generate parameters - hyperopt_params = [] - sampled_params = [] - for j in range(i, i + n): - new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX)) - new_param[0]['status'] = STATUS_RUNNING - - trials.insert_trial_docs(new_param) - trials.refresh() - hyperopt_params.append(new_param[0]) - sampled_params.append(new_param[0]['misc']['vals']) - - model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params) - msts_list = self.generate_msts(model_id_list, compile_params, fit_params) - # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA) - try: - self.remove_temp_tables(model_training) - except: - pass - self.populate_temp_mst_tables(i, msts_list) - - plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n)) - fit_multiple_start_time = time.time() - model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE, - AutoMLConstants.TEMP_MST_TABLE, self.num_iters, self.use_gpus, self.validation_table, - self.metrics_compute_frequency, False, self.name, self.description, fit_multiple_runtime) - fit_multiple_runtime += time.time() - fit_multiple_start_time - if make_mst_summary: - self.generate_mst_summary_table(self.model_selection_summary_table) - make_mst_summary = False - - # HyperOpt TPE update - for k, hyperopt_param in enumerate(hyperopt_params, i): - loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {model_training.model_info_table} " \ - "WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants, - ModelSelectionSchema=ModelSelectionSchema, - **locals()))[0][AutoMLConstants.LOSS_METRIC] - - # avoid removing the two lines below (part of Hyperopt updates) - hyperopt_param['status'] = STATUS_OK - hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK} - trials.refresh() - - # stacks info of all model configs together - self.update_model_output_and_info_tables(model_training) - - self.print_best_mst_so_far() - - self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) - self.update_model_selection_table() - self.generate_model_output_summary_table(model_training) - # cleanup_madlib_temp_tables(self.schema_madlib, AutoMLSchema.TARGET_SCHEMA) - self.remove_temp_tables(model_training) - - def get_configs_list(self, num_configs, num_segments): - """ - Gets schedule to evaluate model configs - :return: Model configs evaluation schedule - """ - num_buckets = int(round(float(num_configs) / num_segments)) - configs_list = [] - start_idx = 1 - models_populated = 0 - for _ in range(num_buckets - 1): - end_idx = start_idx + num_segments - models_populated += num_segments - configs_list.append((start_idx, end_idx - 1)) - start_idx = end_idx - - remaining_models = num_configs - models_populated - configs_list.append((start_idx, start_idx + remaining_models-1)) - - return configs_list - - def get_search_space(self): - """ - Converts user inputs to hyperopt search space. - :return: Hyperopt search space - """ - - # initial params (outside 'optimizer_params_list') - hyperopt_search_dict = {} - hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list) - - - for j in self.fit_params_grid: - hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j]) - - for i in self.compile_params_grid: - if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST: - hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i]) - - hyperopt_search_space_lst = [] - - counter = 1 # for unique names to allow multiple distribution options for optimizer params - for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]: - for o_param in optimizer_dict: - name = o_param + '_' + str(counter) - hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param]) - # appending deep copy - hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()}) - for o_param in optimizer_dict: - name = o_param + '_' + str(counter) - del hyperopt_search_dict[name] - counter += 1 - - return hp.choice('space', hyperopt_search_space_lst) - - def get_hyperopt_exps(self, cp, param_value_list): - """ - Samples a value from a given list of values, either randomly from a list of discrete elements, - or from a specified distribution. - :param cp: compile param - :param param_value_list: list of values (or specified distribution) for a param - :return: sampled value - """ - # check if need to sample from a distribution - if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \ - and len(param_value_list) > 1: - _assert_equal(len(param_value_list), 3, - "{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp)) - _assert(param_value_list[1] > param_value_list[0], - "{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp)) - if param_value_list[-1] == 'linear': - return hp.uniform(cp, param_value_list[0], param_value_list[1]) - elif param_value_list[-1] == 'log': - return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1])) - else: - plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format( - self.module_name, - self.original_param_details(cp)[0], - ['linear', 'log'])) - else: - # random sampling - return hp.choice(cp, param_value_list) - - def extract_param_vals(self, sampled_params): - """ - Extract parameter values from hyperopt search space. - :param sampled_params: params suggested by hyperopt. - :return: lists of model ids, compile and fit params. - """ - model_id_list, compile_params, fit_params = [], [], [] - for params_dict in sampled_params: - compile_dict, fit_dict, optimizer_params_dict = {}, {}, {} - for p in params_dict: - if len(params_dict[p]) == 0 or p == 'space': - continue - val = params_dict[p][0] - if p == 'model_id': - model_id_list.append(self.model_id_list[val]) - continue - elif p in self.fit_params_grid: - try: - # check if params_dict[p] is an index - fit_dict[p] = self.fit_params_grid[p][val] - except TypeError: - fit_dict[p] = params_dict[p] - elif p in self.compile_params_grid: - try: - # check if params_dict[p] is an index - compile_dict[p] = self.compile_params_grid[p][val] - except TypeError: - compile_dict[p] = val - else: - o_param, idx = self.original_param_details(p) # extracting unique attribute - try: - # check if params_dict[p] is an index (i.e. optimizer, for example) - optimizer_params_dict[o_param] = self.compile_params_grid[ - ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val] - except TypeError: - optimizer_params_dict[o_param] = val - compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict - - compile_params.append(compile_dict) - fit_params.append(fit_dict) - - return model_id_list, compile_params, fit_params - - def original_param_details(self, name): - """ - Returns the original param name and book-keeping detail. - :param name: name of the param (example - lr_1, epsilon_12) - :return: original param name and book-keeping position. - """ - parts = name.split('_') - return '_'.join(parts[:-1]), int(parts[-1]) - 1 - - - def generate_msts(self, model_id_list, compile_params, fit_params): - """ - Generates msts to insert in the mst table. - :param model_id_list: list of model ids - :param compile_params: list compile params - :param fit_params:list of fit params - :return: List of msts to insert in the mst table. - """ - assert len(model_id_list) == len(compile_params) == len(fit_params) - msts = [] - - for i in range(len(compile_params)): - combination = {} - combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i] - combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i]) - combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i]) - msts.append(combination) - - return msts - - def populate_temp_mst_tables(self, i, msts_list): - """ - Creates and populates temp mst and summary tables with newly suggested model configs for evaluation. - :param i: mst key number - :param msts_list: list of generated msts. - """ - # extra sanity check - if table_exists(AutoMLConstants.TEMP_MST_TABLE): - drop_tables([AutoMLConstants.TEMP_MST_TABLE]) - - create_query = """ - CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} ( - {mst_key} INTEGER, - {model_id} INTEGER, - {compile_params} VARCHAR, - {fit_params} VARCHAR, - unique ({model_id}, {compile_params}, {fit_params}) - ); - """.format(AutoMLSchema=AutoMLConstants, - mst_key=ModelSelectionSchema.MST_KEY, - model_id=ModelSelectionSchema.MODEL_ID, - compile_params=ModelSelectionSchema.COMPILE_PARAMS, - fit_params=ModelSelectionSchema.FIT_PARAMS) - plpy.execute(create_query) - mst_key_val = i - for mst in msts_list: - model_id = mst[ModelSelectionSchema.MODEL_ID] - compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS] - fit_params = mst[ModelSelectionSchema.FIT_PARAMS] - insert_query = """ - INSERT INTO - {AutoMLSchema.TEMP_MST_TABLE}( - {mst_key_col}, - {model_id_col}, - {compile_params_col}, - {fit_params_col} - ) - VALUES ( - {mst_key_val}, - {model_id}, - $${compile_params}$$, - $${fit_params}$$ - ) - """.format(mst_key_col=ModelSelectionSchema.MST_KEY, - model_id_col=ModelSelectionSchema.MODEL_ID, - compile_params_col=ModelSelectionSchema.COMPILE_PARAMS, - fit_params_col=ModelSelectionSchema.FIT_PARAMS, - AutoMLSchema=AutoMLConstants, - **locals()) - mst_key_val += 1 - plpy.execute(insert_query) - - self.generate_mst_summary_table(AutoMLConstants.TEMP_MST_SUMMARY_TABLE) - - def generate_mst_summary_table(self, tbl_name): - """ - generates mst summary table with the given name - :param tbl_name: name of summary table - """ - _assert(tbl_name.endswith('_summary'), 'invalid summary table name') - - # extra sanity check - if table_exists(tbl_name): - drop_tables([tbl_name]) - - create_query = """ - CREATE TABLE {tbl_name} ( - {model_arch_table} VARCHAR, - {object_table} VARCHAR - ); - """.format(tbl_name=tbl_name, - model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE, - object_table=ModelSelectionSchema.OBJECT_TABLE) - plpy.execute(create_query) - - if self.object_table is None: - object_table = 'NULL::VARCHAR' - else: - object_table = '$${0}$$'.format(self.object_table) - insert_summary_query = """ - INSERT INTO - {tbl_name}( - {model_arch_table_name}, - {object_table_name} - ) - VALUES ( - $${self.model_arch_table}$$, - {object_table} - ) - """.format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE, - object_table_name=ModelSelectionSchema.OBJECT_TABLE, - **locals()) - plpy.execute(insert_summary_query) - - def update_model_output_and_info_tables(self, model_training): - """ - Updates model output and info tables by stacking rows after each evaluation round. - :param model_training: Fit Multiple class object - """ - metrics_iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \ - "FROM {model_training.original_model_output_table}_summary".format(self=self, - model_training=model_training, - AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS] - if metrics_iters: - metrics_iters = "ARRAY{0}".format(metrics_iters) - # stacking new rows from training - plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM " \ - "{model_training.original_model_output_table}".format(self=self, model_training=model_training)) - plpy.execute("INSERT INTO {self.model_info_table} SELECT *, {metrics_iters} FROM " \ - "{model_training.model_info_table}".format(self=self, - model_training=model_training, - metrics_iters=metrics_iters)) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in index a5c7507..113ec16 100644 --- a/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl.sql_in @@ -625,9 +625,9 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.hyperband_schedule( eta INTEGER DEFAULT 3, skip_last INTEGER DEFAULT 0 ) RETURNS VOID AS $$ - PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl') + PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperband') with AOControl(False) and MinWarning('warning'): - schedule_loader = madlib_keras_automl.HyperbandSchedule(schedule_table, r, eta, skip_last) + schedule_loader = madlib_keras_automl_hyperband.HyperbandSchedule(schedule_table, r, eta, skip_last) schedule_loader.load() $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); @@ -650,13 +650,15 @@ CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.madlib_keras_automl( name VARCHAR DEFAULT NULL, description VARCHAR DEFAULT NULL ) RETURNS VOID AS $$ - PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl') +if automl_method is None or automl_method.lower() == 'hyperband': + PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperband') with AOControl(False) and MinWarning('warning'): - if automl_method is None or automl_method.lower() == 'hyperband': - schedule_loader = madlib_keras_automl.AutoMLHyperband(**globals()) - elif automl_method.lower() == 'hyperopt': - schedule_loader = madlib_keras_automl.AutoMLHyperopt(**globals()) - else: - plpy.error("madlib_keras_automl: The chosen automl method must be 'hyperband' or 'hyperopt'") + schedule_loader = madlib_keras_automl_hyperband.AutoMLHyperband(**globals()) +elif automl_method.lower() == 'hyperopt': + PythonFunctionBodyOnly(`deep_learning', `madlib_keras_automl_hyperopt') + with AOControl(False) and MinWarning('warning'): + schedule_loader = madlib_keras_automl_hyperopt.AutoMLHyperopt(**globals()) +else: + plpy.error("madlib_keras_automl: The chosen automl method must be 'hyperband' or 'hyperopt'") $$ LANGUAGE plpythonu VOLATILE m4_ifdef(`__HAS_FUNCTION_PROPERTIES__', `MODIFIES SQL DATA', `'); diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in new file mode 100644 index 0000000..2d10f8c --- /dev/null +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperband.py_in @@ -0,0 +1,419 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import math +import plpy + +from madlib_keras_automl import KerasAutoML, AutoMLConstants +from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \ + unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table, \ + is_platform_pg +from utilities.control import SetGUC +from madlib_keras_fit_multiple_model import FitMultipleModel +from madlib_keras_model_selection import MstSearch, ModelSelectionSchema +from utilities.validate_args import table_exists, drop_tables, input_tbl_valid + +class HyperbandSchedule(): + """The utility class for loading a hyperband schedule table with algorithm inputs. + + Attributes: + schedule_table (string): Name of output table containing hyperband schedule. + R (int): Maximum number of resources (iterations) that can be allocated + to a single configuration. + eta (int): Controls the proportion of configurations discarded in + each round of successive halving. + skip_last (int): The number of last rounds to skip. + """ + def __init__(self, schedule_table, R, eta=3, skip_last=0): + if is_platform_pg(): + plpy.error( + "DL: Hyperband schedule is not supported on PostgreSQL.") + self.schedule_table = schedule_table # table name to store hyperband schedule + self.R = R # maximum iterations/epochs allocated to a configuration + self.eta = eta # defines downsampling rate + self.skip_last = skip_last + self.module_name = 'hyperband_schedule' + self.validate_inputs() + + # number of unique executions of Successive Halving (minus one) + self.s_max = int(math.floor(math.log(self.R, self.eta))) + self.validate_s_max() + + self.schedule_vals = [] + + self.calculate_schedule() + + def load(self): + """ + The entry point for loading the hyperband schedule table. + """ + self.create_schedule_table() + self.insert_into_schedule_table() + + def validate_inputs(self): + """ + Validates user input values + """ + _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name)) + _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name)) + + def validate_s_max(self): + _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " + + "non-negative and less than {1}".format(self.module_name,self.s_max)) + + def calculate_schedule(self): + """ + Calculates the hyperband schedule (number of configs and allocated resources) + in each round of each bracket and skips the number of last rounds specified in 'skip_last' + """ + for s in reversed(range(self.s_max+1)): + n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations + r = self.R * math.pow(self.eta, -s) + + for i in range((s+1) - int(self.skip_last)): + # Computing each of the + n_i = n*math.pow(self.eta, -i) + r_i = r*math.pow(self.eta, i) + + self.schedule_vals.append({AutoMLConstants.BRACKET: s, + AutoMLConstants.ROUND: i, + AutoMLConstants.CONFIGURATIONS: int(n_i), + AutoMLConstants.RESOURCES: int(round(r_i))}) + + def create_schedule_table(self): + """Initializes the output schedule table""" + create_query = """ + CREATE TABLE {self.schedule_table} ( + {s} INTEGER, + {i} INTEGER, + {n_i} INTEGER, + {r_i} INTEGER, + unique ({s}, {i}) + ); + """.format(self=self, + s=AutoMLConstants.BRACKET, + i=AutoMLConstants.ROUND, + n_i=AutoMLConstants.CONFIGURATIONS, + r_i=AutoMLConstants.RESOURCES) + plpy.execute(create_query) + + def insert_into_schedule_table(self): + """Insert everything in self.schedule_vals into the output schedule table.""" + for sd in self.schedule_vals: + sd_s = sd[AutoMLConstants.BRACKET] + sd_i = sd[AutoMLConstants.ROUND] + sd_n_i = sd[AutoMLConstants.CONFIGURATIONS] + sd_r_i = sd[AutoMLConstants.RESOURCES] + insert_query = """ + INSERT INTO + {self.schedule_table}( + {s_col}, + {i_col}, + {n_i_col}, + {r_i_col} + ) + VALUES ( + {sd_s}, + {sd_i}, + {sd_n_i}, + {sd_r_i} + ) + """.format(s_col=AutoMLConstants.BRACKET, + i_col=AutoMLConstants.ROUND, + n_i_col=AutoMLConstants.CONFIGURATIONS, + r_i_col=AutoMLConstants.RESOURCES, + **locals()) + plpy.execute(insert_query) + +class AutoMLHyperband(KerasAutoML): + """ + This class implements Hyperband, an infinite-arm bandit based algorithm that speeds up random search + through adaptive resource allocation, successive halving (SHA), and early stopping. + + This class showcases a novel hyperband implementation by executing the hyperband rounds 'diagonally' + to evaluate multiple configurations together and leverage the compute power of MPP databases such as Greenplum. + + This automl method inherits qualities from the automl class. + """ + def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, + model_id_list, compile_params_grid, fit_params_grid, automl_method, + automl_params, random_state=None, object_table=None, + use_gpus=False, validation_table=None, metrics_compute_frequency=None, + name=None, description=None, **kwargs): + automl_method = automl_method if automl_method else AutoMLConstants.HYPERBAND + automl_params = automl_params if automl_params else 'R=6, eta=3, skip_last=0' + KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table, + model_selection_table, model_id_list, compile_params_grid, fit_params_grid, + automl_method, automl_params, random_state, object_table, use_gpus, + validation_table, metrics_compute_frequency, name, description, **kwargs) + self.validate_and_define_inputs() + self.create_model_output_table() + self.create_model_output_info_table() + self.find_hyperband_config() + + def validate_and_define_inputs(self): + automl_params_dict = extract_keyvalue_params(self.automl_params, + lower_case_names=False) + # casting dict values to int + for i in automl_params_dict: + _assert(i in AutoMLConstants.HYPERBAND_PARAMS, + "{0}: Invalid param(s) passed in for hyperband. "\ + "Only R, eta, and skip_last may be specified".format(self.module_name)) + automl_params_dict[i] = int(automl_params_dict[i]) + _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3, + "{0}: Only R, eta, and skip_last may be specified".format(self.module_name)) + for i in automl_params_dict: + if i == AutoMLConstants.R: + self.R = automl_params_dict[AutoMLConstants.R] + elif i == AutoMLConstants.ETA: + self.eta = automl_params_dict[AutoMLConstants.ETA] + elif i == AutoMLConstants.SKIP_LAST: + self.skip_last = automl_params_dict[AutoMLConstants.SKIP_LAST] + else: + plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i)) + _assert(self.eta > 1, "{0}: eta must be greater than 1".format(self.module_name)) + _assert(self.R >= self.eta, "{0}: R should not be less than eta".format(self.module_name)) + self.s_max = int(math.floor(math.log(self.R, self.eta))) + _assert(self.skip_last >= 0 and self.skip_last < self.s_max+1, "{0}: skip_last must be " \ + "non-negative and less than {1}".format(self.module_name, self.s_max)) + + def find_hyperband_config(self): + """ + Executes the diagonal hyperband algorithm. + """ + initial_vals = {} + + # get hyper parameter configs for each s + for s in reversed(range(self.s_max+1)): + n = int(math.ceil(int((self.s_max+1)/(s+1))*math.pow(self.eta, s))) # initial number of configurations + r = self.R * math.pow(self.eta, -s) # initial number of iterations to run configurations for + initial_vals[s] = (n, int(round(r))) + self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) + random_search = MstSearch(self.schema_madlib, + self.model_arch_table, + self.model_selection_table, + self.model_id_list, + self.compile_params_grid, + self.fit_params_grid, + 'random', + sum([initial_vals[k][0] for k in initial_vals][self.skip_last:]), + self.random_state, + self.object_table) + random_search.load() # for populating mst tables + + # for creating the summary table for usage in fit multiple + plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_SUMMARY_TABLE} AS " \ + "SELECT * FROM {random_search.model_selection_summary_table}".format(AutoMLSchema=AutoMLConstants, + random_search=random_search)) + ranges_dict = self.mst_key_ranges_dict(initial_vals) + # to store the bracket and round numbers + s_dict, i_dict = {}, {} + for key, val in ranges_dict.items(): + for mst_key in range(val[0], val[1]+1): + s_dict[mst_key] = key + i_dict[mst_key] = -1 + + # outer loop on diagonal + for i in range((self.s_max+1) - int(self.skip_last)): + # inner loop on s desc + temp_lst = [] + configs_prune_lookup = {} + for s in range(self.s_max, self.s_max-i-1, -1): + n = initial_vals[s][0] + n_i = n * math.pow(self.eta, -i+self.s_max-s) + configs_prune_lookup[s] = int(round(n_i)) + temp_lst.append("{0} configs under bracket={1} & round={2}".format(int(n_i), s, s-self.s_max+i)) + num_iterations = int(initial_vals[self.s_max-i][1]) + plpy.info('*** Diagonally evaluating ' + ', '.join(temp_lst) + ' with {0} iterations ***'.format( + num_iterations)) + + self.reconstruct_temp_mst_table(i, ranges_dict, configs_prune_lookup) # has keys to evaluate + active_keys = plpy.execute("SELECT {ModelSelectionSchema.MST_KEY} " \ + "FROM {AutoMLSchema.TEMP_MST_TABLE}".format(AutoMLSchema=AutoMLConstants, + ModelSelectionSchema=ModelSelectionSchema)) + for k in active_keys: + i_dict[k[ModelSelectionSchema.MST_KEY]] += 1 + self.warm_start = int(i != 0) + mcf = self.metrics_compute_frequency if self._is_valid_metrics_compute_frequency(num_iterations) else None + with SetGUC("plan_cache_mode", "force_generic_plan"): + model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE, + AutoMLConstants.TEMP_MST_TABLE, num_iterations, self.use_gpus, + self.validation_table, mcf, self.warm_start, self.name, self.description) + self.update_model_output_table(model_training) + self.update_model_output_info_table(i, model_training, initial_vals) + + self.print_best_mst_so_far() + + self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) + self.add_additional_info_cols(s_dict, i_dict) + self.update_model_selection_table() + self.generate_model_output_summary_table(model_training) + self.remove_temp_tables(model_training) + + def mst_key_ranges_dict(self, initial_vals): + """ + Extracts the ranges of model configs (using mst_keys) belonging to / sampled as part of + executing a particular SHA bracket. + """ + d = {} + for s_val in sorted(initial_vals.keys(), reverse=True): # going from s_max to 0 + if s_val == self.s_max: + d[s_val] = (1, initial_vals[s_val][0]) + else: + d[s_val] = (d[s_val+1][1]+1, d[s_val+1][1]+initial_vals[s_val][0]) + return d + + def reconstruct_temp_mst_table(self, i, ranges_dict, configs_prune_lookup): + """ + Drops and Reconstructs a temp mst table for evaluation along particular diagonals of hyperband. + :param i: outer diagonal loop iteration. + :param ranges_dict: model config ranges to group by bracket number. + :param configs_prune_lookup: Lookup dictionary for configs to evaluate for a diagonal. + :return: + """ + if i == 0: + _assert_equal(len(configs_prune_lookup), 1, "invalid args") + lower_bound, upper_bound = ranges_dict[self.s_max] + plpy.execute("CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} AS SELECT * FROM {self.model_selection_table} " + "WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \ + "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound}".format(self=self, + AutoMLSchema=AutoMLConstants, + lower_bound=lower_bound, + upper_bound=upper_bound, + ModelSelectionSchema=ModelSelectionSchema)) + return + # dropping and repopulating temp_mst_table + drop_tables([AutoMLConstants.TEMP_MST_TABLE]) + + # {mst_key} changed from SERIAL to INTEGER for safe insertions and preservation of mst_key values + create_query = """ + CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} ( + {mst_key} INTEGER, + {model_id} INTEGER, + {compile_params} VARCHAR, + {fit_params} VARCHAR, + unique ({model_id}, {compile_params}, {fit_params}) + ); + """.format(AutoMLSchema=AutoMLConstants, + mst_key=ModelSelectionSchema.MST_KEY, + model_id=ModelSelectionSchema.MODEL_ID, + compile_params=ModelSelectionSchema.COMPILE_PARAMS, + fit_params=ModelSelectionSchema.FIT_PARAMS) + plpy.execute(create_query) + + query = "" + new_configs = True + for s_val in configs_prune_lookup: + lower_bound, upper_bound = ranges_dict[s_val] + if new_configs: + query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \ + "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \ + "{ModelSelectionSchema.FIT_PARAMS} FROM {self.model_selection_table} WHERE " \ + "{ModelSelectionSchema.MST_KEY} >= {lower_bound} AND {ModelSelectionSchema.MST_KEY} <= " \ + "{upper_bound};".format(self=self, AutoMLSchema=AutoMLConstants, + ModelSelectionSchema=ModelSelectionSchema, + lower_bound=lower_bound, upper_bound=upper_bound) + new_configs = False + else: + query += "INSERT INTO {AutoMLSchema.TEMP_MST_TABLE} SELECT {ModelSelectionSchema.MST_KEY}, " \ + "{ModelSelectionSchema.MODEL_ID}, {ModelSelectionSchema.COMPILE_PARAMS}, " \ + "{ModelSelectionSchema.FIT_PARAMS} " \ + "FROM {self.model_info_table} WHERE {ModelSelectionSchema.MST_KEY} >= {lower_bound} " \ + "AND {ModelSelectionSchema.MST_KEY} <= {upper_bound} ORDER BY {AutoMLSchema.LOSS_METRIC} " \ + "LIMIT {configs_prune_lookup_val};".format(self=self, AutoMLSchema=AutoMLConstants, + ModelSelectionSchema=ModelSelectionSchema, + lower_bound=lower_bound, upper_bound=upper_bound, + configs_prune_lookup_val=configs_prune_lookup[s_val]) + plpy.execute(query) + + def update_model_output_table(self, model_training): + """ + Updates gathered information of a hyperband diagonal run to the overall model output table. + :param model_training: Fit Multiple function call object. + """ + # updates model weights for any previously trained configs + plpy.execute("UPDATE {self.model_output_table} a SET model_weights=" \ + "t.model_weights FROM {model_training.original_model_output_table} t " \ + "WHERE a.mst_key=t.mst_key".format(self=self, model_training=model_training)) + + # truncate and re-creates table to avoid memory blow-ups + with SetGUC("dev_opt_unsafe_truncate_in_subtransaction", "on"): + temp_model_table = unique_string('updated_model') + plpy.execute("CREATE TABLE {temp_model_table} AS SELECT * FROM {self.model_output_table};" \ + "TRUNCATE {self.model_output_table}; " \ + "DROP TABLE {self.model_output_table};".format(temp_model_table=temp_model_table, self=self)) + rename_table(self.schema_madlib, temp_model_table, self.model_output_table) + + # inserts any newly trained configs + plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM {model_training.original_model_output_table} " \ + "WHERE {model_training.original_model_output_table}.mst_key NOT IN " \ + "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_output_table})".format(self=self, + model_training=model_training, + ModelSelectionSchema=ModelSelectionSchema)) + + def update_model_output_info_table(self, i, model_training, initial_vals): + """ + Updates gathered information of a hyperband diagonal run to the overall model output info table. + :param i: outer diagonal loop iteration. + :param model_training: Fit Multiple function call object. + :param initial_vals: Dictionary of initial configurations and resources as part of the initial hyperband + schedule. + """ + # normalizing factor for metrics_iters due to warm start + epochs_factor = sum([n[1] for n in initial_vals.values()][::-1][:i]) # i & initial_vals args needed + iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \ + "FROM {model_training.model_summary_table}".format(AutoMLSchema=AutoMLConstants, + model_training=model_training)) + metrics_iters_val = [epochs_factor+mi for mi in iters[0]['metrics_iters']] # global iteration counter + + validation_update_q = "validation_metrics_final=t.validation_metrics_final, " \ + "validation_loss_final=t.validation_loss_final, " \ + "validation_metrics=a.validation_metrics || t.validation_metrics, " \ + "validation_loss=a.validation_loss || t.validation_loss, " \ + if self.validation_table else "" + + # updates train/val info for any previously trained configs + plpy.execute("UPDATE {self.model_info_table} a SET " \ + "metrics_elapsed_time=a.metrics_elapsed_time || t.metrics_elapsed_time, " \ + "training_metrics_final=t.training_metrics_final, " \ + "training_loss_final=t.training_loss_final, " \ + "training_metrics=a.training_metrics || t.training_metrics, " \ + "training_loss=a.training_loss || t.training_loss, ".format(self=self) + validation_update_q + + "{AutoMLSchema.METRICS_ITERS}=a.metrics_iters || ARRAY{metrics_iters_val}::INTEGER[] " \ + "FROM {model_training.model_info_table} t " \ + "WHERE a.mst_key=t.mst_key".format(model_training=model_training, AutoMLSchema=AutoMLConstants, + metrics_iters_val=metrics_iters_val)) + + # inserts info about metrics and validation for newly trained model configs + plpy.execute("INSERT INTO {self.model_info_table} SELECT t.*, ARRAY{metrics_iters_val}::INTEGER[] AS metrics_iters " \ + "FROM {model_training.model_info_table} t WHERE t.mst_key NOT IN " \ + "(SELECT {ModelSelectionSchema.MST_KEY} FROM {self.model_info_table})".format(self=self, + model_training=model_training, + metrics_iters_val=metrics_iters_val, + ModelSelectionSchema=ModelSelectionSchema)) + + def add_additional_info_cols(self, s_dict, i_dict): + """Adds s and i columns to the info table""" + + plpy.execute("ALTER TABLE {self.model_info_table} ADD COLUMN s int, ADD COLUMN i int;".format(self=self)) + + l = [(k, s_dict[k], i_dict[k]) for k in s_dict] + query = "UPDATE {self.model_info_table} t SET s=b.s_val, i=b.i_val FROM unnest(ARRAY{l}) " \ + "b (key integer, s_val integer, i_val integer) WHERE t.mst_key=b.key".format(self=self, l=l) + plpy.execute(query) diff --git a/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in new file mode 100644 index 0000000..34d2e97 --- /dev/null +++ b/src/ports/postgres/modules/deep_learning/madlib_keras_automl_hyperopt.py_in @@ -0,0 +1,458 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ast import literal_eval +from hyperopt import hp, rand, tpe, atpe, Trials, STATUS_OK, STATUS_RUNNING +from hyperopt.base import Domain +import numpy as np +import plpy +import time + +from madlib_keras_automl import KerasAutoML, AutoMLConstants +from input_data_preprocessor import DistributionRulesOptions +from madlib_keras_fit_multiple_model import FitMultipleModel +from madlib_keras_helper import generate_row_string +from madlib_keras_helper import DISTRIBUTION_RULES_COLNAME +from madlib_keras_model_selection import ModelSelectionSchema +from utilities.control import SetGUC +from utilities.utilities import get_current_timestamp, get_seg_number, get_segments_per_host, \ + unique_string, add_postfix, extract_keyvalue_params, _assert, _assert_equal, rename_table +from utilities.validate_args import table_exists, drop_tables, input_tbl_valid + +class AutoMLHyperopt(KerasAutoML): + """ + This class implements Hyperopt, another automl method that explores awkward search spaces using + Random Search, Tree-structured Parzen Estimator (TPE), or Adaptive TPE. + + This function executes hyperopt on top of our multiple model training infrastructure powered with + Model hOpper Parallelism (MOP), a hybrid of data and task parallelism. + + This automl method inherits qualities from the automl class. + """ + def __init__(self, schema_madlib, source_table, model_output_table, model_arch_table, model_selection_table, + model_id_list, compile_params_grid, fit_params_grid, automl_method, + automl_params, random_state=None, object_table=None, + use_gpus=False, validation_table=None, metrics_compute_frequency=None, + name=None, description=None, **kwargs): + automl_method = automl_method if automl_method else AutoMLConstants.HYPEROPT + automl_params = automl_params if automl_params else 'num_configs=20, num_iterations=5, algorithm=tpe' + KerasAutoML.__init__(self, schema_madlib, source_table, model_output_table, model_arch_table, + model_selection_table, model_id_list, compile_params_grid, fit_params_grid, + automl_method, automl_params, random_state, object_table, use_gpus, + validation_table, metrics_compute_frequency, name, description, **kwargs) + self.compile_params_grid = self.compile_params_grid.replace('\n', '').replace(' ', '') + self.fit_params_grid = self.fit_params_grid.replace('\n', '').replace(' ', '') + try: + self.compile_params_grid = literal_eval(self.compile_params_grid) + + except: + plpy.error("Invalid syntax in 'compile_params_dict'") + try: + self.fit_params_grid = literal_eval(self.fit_params_grid) + except: + plpy.error("Invalid syntax in 'fit_params_dict'") + self.validate_and_define_inputs() + self.num_segments = self.get_num_segments() + + self.create_model_output_table() + self.create_model_output_info_table() + self.find_hyperopt_config() + + def get_num_segments(self): + """ + # query dist rules from summary table to get the total no of segments + :return: + """ + source_summary_table = add_postfix(self.source_table, '_summary') + dist_rules = plpy.execute("SELECT {0} from {1}".format(DISTRIBUTION_RULES_COLNAME, source_summary_table))[0][DISTRIBUTION_RULES_COLNAME] + if dist_rules == DistributionRulesOptions.ALL_SEGMENTS: + return get_seg_number() + + return len(dist_rules) + + def validate_and_define_inputs(self): + automl_params_dict = extract_keyvalue_params(self.automl_params, + lower_case_names=True) + # casting relevant values to int + for i in automl_params_dict: + _assert(i in AutoMLConstants.HYPEROPT_PARAMS, + "{0}: Invalid param(s) passed in for hyperopt. "\ + "Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name)) + try: + automl_params_dict[i] = int(automl_params_dict[i]) + except ValueError: + pass + _assert(len(automl_params_dict) >= 1 and len(automl_params_dict) <= 3, + "{0}: Only num_configs, num_iterations, and algorithm may be specified".format(self.module_name)) + for i in automl_params_dict: + if i == AutoMLConstants.NUM_CONFIGS: + self.num_configs = automl_params_dict[AutoMLConstants.NUM_CONFIGS] + elif i == AutoMLConstants.NUM_ITERS: + self.num_iters = automl_params_dict[AutoMLConstants.NUM_ITERS] + elif i == AutoMLConstants.ALGORITHM: + if automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'rand': + self.algorithm = rand + elif automl_params_dict[AutoMLConstants.ALGORITHM].lower() == 'tpe': + self.algorithm = tpe + # TODO: Add support for atpe uncomment the below lines after atpe works + # elif automl_params_dict[AutoMLSchema.ALGORITHM].lower() == 'atpe': + # self.algorithm = atpe + else: + plpy.error("{0}: valid algorithm 'automl_params' for hyperopt: 'rand', 'tpe'".format(self.module_name)) # , or 'atpe' + else: + plpy.error("{0}: {1} is an invalid automl param".format(self.module_name, i)) + _assert(self.num_configs > 0 and self.num_iters > 0, "{0}: num_configs and num_iterations in 'automl_params' " + "must be > 0".format(self.module_name)) + _assert(self._is_valid_metrics_compute_frequency(self.num_iters), "{0}: 'metrics_compute_frequency' " + "out of iteration range".format(self.module_name)) + + def find_hyperopt_config(self): + """ + Executes hyperopt on top of MOP. + """ + make_mst_summary = True + trials = Trials() + domain = Domain(None, self.get_search_space()) + rand_state = np.random.RandomState(self.random_state) + configs_lst = self.get_configs_list(self.num_configs, self.num_segments) + + self.start_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) + metrics_elapsed_time_offset = 0 + model_training = None + for low, high in configs_lst: + i, n = low, high - low + 1 + + # Using HyperOpt TPE/ATPE to generate parameters + hyperopt_params = [] + sampled_params = [] + for j in range(i, i + n): + new_param = self.algorithm.suggest([j], domain, trials, rand_state.randint(0, AutoMLConstants.INT_MAX)) + new_param[0]['status'] = STATUS_RUNNING + + trials.insert_trial_docs(new_param) + trials.refresh() + hyperopt_params.append(new_param[0]) + sampled_params.append(new_param[0]['misc']['vals']) + + model_id_list, compile_params, fit_params = self.extract_param_vals(sampled_params) + msts_list = self.generate_msts(model_id_list, compile_params, fit_params) + self.remove_temp_tables(model_training) + self.populate_temp_mst_tables(i, msts_list) + + plpy.info("***Evaluating {n} newly suggested model configurations***".format(n=n)) + start_time = time.time() + with SetGUC("plan_cache_mode", "force_generic_plan"): + model_training = FitMultipleModel(self.schema_madlib, self.source_table, AutoMLConstants.TEMP_OUTPUT_TABLE, + AutoMLConstants.TEMP_MST_TABLE, self.num_iters, self.use_gpus, self.validation_table, + self.metrics_compute_frequency, False, self.name, self.description, + metrics_elapsed_time_offset=metrics_elapsed_time_offset) + metrics_elapsed_time_offset += time.time() - start_time + if make_mst_summary: + self.generate_mst_summary_table(self.model_selection_summary_table) + make_mst_summary = False + + # HyperOpt TPE update + for k, hyperopt_param in enumerate(hyperopt_params, i): + loss_val = plpy.execute("SELECT {AutoMLSchema.LOSS_METRIC} FROM {model_training.model_info_table} " \ + "WHERE {ModelSelectionSchema.MST_KEY}={k}".format(AutoMLSchema=AutoMLConstants, + ModelSelectionSchema=ModelSelectionSchema, + **locals()))[0][AutoMLConstants.LOSS_METRIC] + + # avoid removing the two lines below (part of Hyperopt updates) + hyperopt_param['status'] = STATUS_OK + hyperopt_param['result'] = {'loss': loss_val, 'status': STATUS_OK} + trials.refresh() + + # stacks info of all model configs together + self.update_model_output_and_info_tables(model_training) + + self.print_best_mst_so_far() + + self.end_training_time = get_current_timestamp(AutoMLConstants.TIME_FORMAT) + self.update_model_selection_table() + self.generate_model_output_summary_table(model_training) + self.remove_temp_tables(model_training) + + def get_configs_list(self, num_configs, num_segments): + """ + Gets schedule to evaluate model configs + :return: Model configs evaluation schedule + """ + num_buckets = int(round(float(num_configs) / num_segments)) + configs_list = [] + start_idx = 1 + models_populated = 0 + for _ in range(num_buckets - 1): + end_idx = start_idx + num_segments + models_populated += num_segments + configs_list.append((start_idx, end_idx - 1)) + start_idx = end_idx + + remaining_models = num_configs - models_populated + configs_list.append((start_idx, start_idx + remaining_models-1)) + + return configs_list + + def get_search_space(self): + """ + Converts user inputs to hyperopt search space. + :return: Hyperopt search space + """ + + # initial params (outside 'optimizer_params_list') + hyperopt_search_dict = {} + hyperopt_search_dict['model_id'] = self.get_hyperopt_exps('model_id', self.model_id_list) + + + for j in self.fit_params_grid: + hyperopt_search_dict[j] = self.get_hyperopt_exps(j, self.fit_params_grid[j]) + + for i in self.compile_params_grid: + if i != ModelSelectionSchema.OPTIMIZER_PARAMS_LIST: + hyperopt_search_dict[i] = self.get_hyperopt_exps(i, self.compile_params_grid[i]) + + hyperopt_search_space_lst = [] + + counter = 1 # for unique names to allow multiple distribution options for optimizer params + for optimizer_dict in self.compile_params_grid[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST]: + for o_param in optimizer_dict: + name = o_param + '_' + str(counter) + hyperopt_search_dict[name] = self.get_hyperopt_exps(name, optimizer_dict[o_param]) + # appending deep copy + hyperopt_search_space_lst.append({k:v for k, v in hyperopt_search_dict.items()}) + for o_param in optimizer_dict: + name = o_param + '_' + str(counter) + del hyperopt_search_dict[name] + counter += 1 + + return hp.choice('space', hyperopt_search_space_lst) + + def get_hyperopt_exps(self, cp, param_value_list): + """ + Samples a value from a given list of values, either randomly from a list of discrete elements, + or from a specified distribution. + :param cp: compile param + :param param_value_list: list of values (or specified distribution) for a param + :return: sampled value + """ + # check if need to sample from a distribution + if type(param_value_list[-1]) == str and all([type(i) != str and not callable(i) for i in param_value_list[:-1]]) \ + and len(param_value_list) > 1: + _assert_equal(len(param_value_list), 3, + "{0}: '{1}' should have exactly 3 elements if picking from a distribution".format(self.module_name, cp)) + _assert(param_value_list[1] > param_value_list[0], + "{0}: '{1}' should be of the format [lower_bound, upper_bound, distribution_type]".format(self.module_name, cp)) + if param_value_list[-1] == 'linear': + return hp.uniform(cp, param_value_list[0], param_value_list[1]) + elif param_value_list[-1] == 'log': + return hp.loguniform(cp, np.log(param_value_list[0]), np.log(param_value_list[1])) + else: + plpy.error("{0}: Please choose a valid distribution type for '{1}': {2}".format( + self.module_name, + self.original_param_details(cp)[0], + ['linear', 'log'])) + else: + # random sampling + return hp.choice(cp, param_value_list) + + def extract_param_vals(self, sampled_params): + """ + Extract parameter values from hyperopt search space. + :param sampled_params: params suggested by hyperopt. + :return: lists of model ids, compile and fit params. + """ + model_id_list, compile_params, fit_params = [], [], [] + for params_dict in sampled_params: + compile_dict, fit_dict, optimizer_params_dict = {}, {}, {} + for p in params_dict: + if len(params_dict[p]) == 0 or p == 'space': + continue + val = params_dict[p][0] + if p == 'model_id': + model_id_list.append(self.model_id_list[val]) + continue + elif p in self.fit_params_grid: + try: + # check if params_dict[p] is an index + fit_dict[p] = self.fit_params_grid[p][val] + except TypeError: + fit_dict[p] = params_dict[p] + elif p in self.compile_params_grid: + try: + # check if params_dict[p] is an index + compile_dict[p] = self.compile_params_grid[p][val] + except TypeError: + compile_dict[p] = val + else: + o_param, idx = self.original_param_details(p) # extracting unique attribute + try: + # check if params_dict[p] is an index (i.e. optimizer, for example) + optimizer_params_dict[o_param] = self.compile_params_grid[ + ModelSelectionSchema.OPTIMIZER_PARAMS_LIST][idx][o_param][val] + except TypeError: + optimizer_params_dict[o_param] = val + compile_dict[ModelSelectionSchema.OPTIMIZER_PARAMS_LIST] = optimizer_params_dict + + compile_params.append(compile_dict) + fit_params.append(fit_dict) + + return model_id_list, compile_params, fit_params + + def original_param_details(self, name): + """ + Returns the original param name and book-keeping detail. + :param name: name of the param (example - lr_1, epsilon_12) + :return: original param name and book-keeping position. + """ + parts = name.split('_') + return '_'.join(parts[:-1]), int(parts[-1]) - 1 + + + def generate_msts(self, model_id_list, compile_params, fit_params): + """ + Generates msts to insert in the mst table. + :param model_id_list: list of model ids + :param compile_params: list compile params + :param fit_params:list of fit params + :return: List of msts to insert in the mst table. + """ + assert len(model_id_list) == len(compile_params) == len(fit_params) + msts = [] + + for i in range(len(compile_params)): + combination = {} + combination[ModelSelectionSchema.MODEL_ID] = model_id_list[i] + combination[ModelSelectionSchema.COMPILE_PARAMS] = generate_row_string(compile_params[i]) + combination[ModelSelectionSchema.FIT_PARAMS] = generate_row_string(fit_params[i]) + msts.append(combination) + + return msts + + def populate_temp_mst_tables(self, i, msts_list): + """ + Creates and populates temp mst and summary tables with newly suggested model configs for evaluation. + :param i: mst key number + :param msts_list: list of generated msts. + """ + # extra sanity check + if table_exists(AutoMLConstants.TEMP_MST_TABLE): + drop_tables([AutoMLConstants.TEMP_MST_TABLE]) + + create_query = """ + CREATE TABLE {AutoMLSchema.TEMP_MST_TABLE} ( + {mst_key} INTEGER, + {model_id} INTEGER, + {compile_params} VARCHAR, + {fit_params} VARCHAR, + unique ({model_id}, {compile_params}, {fit_params}) + ); + """.format(AutoMLSchema=AutoMLConstants, + mst_key=ModelSelectionSchema.MST_KEY, + model_id=ModelSelectionSchema.MODEL_ID, + compile_params=ModelSelectionSchema.COMPILE_PARAMS, + fit_params=ModelSelectionSchema.FIT_PARAMS) + plpy.execute(create_query) + mst_key_val = i + for mst in msts_list: + model_id = mst[ModelSelectionSchema.MODEL_ID] + compile_params = mst[ModelSelectionSchema.COMPILE_PARAMS] + fit_params = mst[ModelSelectionSchema.FIT_PARAMS] + insert_query = """ + INSERT INTO + {AutoMLSchema.TEMP_MST_TABLE}( + {mst_key_col}, + {model_id_col}, + {compile_params_col}, + {fit_params_col} + ) + VALUES ( + {mst_key_val}, + {model_id}, + $${compile_params}$$, + $${fit_params}$$ + ) + """.format(mst_key_col=ModelSelectionSchema.MST_KEY, + model_id_col=ModelSelectionSchema.MODEL_ID, + compile_params_col=ModelSelectionSchema.COMPILE_PARAMS, + fit_params_col=ModelSelectionSchema.FIT_PARAMS, + AutoMLSchema=AutoMLConstants, + **locals()) + mst_key_val += 1 + plpy.execute(insert_query) + + self.generate_mst_summary_table(AutoMLConstants.TEMP_MST_SUMMARY_TABLE) + + def generate_mst_summary_table(self, tbl_name): + """ + generates mst summary table with the given name + :param tbl_name: name of summary table + """ + _assert(tbl_name.endswith('_summary'), 'invalid summary table name') + + # extra sanity check + if table_exists(tbl_name): + drop_tables([tbl_name]) + + create_query = """ + CREATE TABLE {tbl_name} ( + {model_arch_table} VARCHAR, + {object_table} VARCHAR + ); + """.format(tbl_name=tbl_name, + model_arch_table=ModelSelectionSchema.MODEL_ARCH_TABLE, + object_table=ModelSelectionSchema.OBJECT_TABLE) + plpy.execute(create_query) + + if self.object_table is None: + object_table = 'NULL::VARCHAR' + else: + object_table = '$${0}$$'.format(self.object_table) + insert_summary_query = """ + INSERT INTO + {tbl_name}( + {model_arch_table_name}, + {object_table_name} + ) + VALUES ( + $${self.model_arch_table}$$, + {object_table} + ) + """.format(model_arch_table_name=ModelSelectionSchema.MODEL_ARCH_TABLE, + object_table_name=ModelSelectionSchema.OBJECT_TABLE, + **locals()) + plpy.execute(insert_summary_query) + + def update_model_output_and_info_tables(self, model_training): + """ + Updates model output and info tables by stacking rows after each evaluation round. + :param model_training: Fit Multiple class object + """ + metrics_iters = plpy.execute("SELECT {AutoMLSchema.METRICS_ITERS} " \ + "FROM {model_training.original_model_output_table}_summary".format(self=self, + model_training=model_training, + AutoMLSchema=AutoMLConstants))[0][AutoMLConstants.METRICS_ITERS] + if metrics_iters: + metrics_iters = "ARRAY{0}".format(metrics_iters) + # stacking new rows from training + plpy.execute("INSERT INTO {self.model_output_table} SELECT * FROM " \ + "{model_training.original_model_output_table}".format(self=self, model_training=model_training)) + plpy.execute("INSERT INTO {self.model_info_table} SELECT *, {metrics_iters} FROM " \ + "{model_training.model_info_table}".format(self=self, + model_training=model_training, + metrics_iters=metrics_iters)) diff --git a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in index 946dde3..9db4ea1 100644 --- a/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in +++ b/src/ports/postgres/modules/deep_learning/test/unit_tests/test_madlib_keras_automl.py_in @@ -18,11 +18,11 @@ # under the License. m4_changequote(`<!', `!>') +m4_ifdef(<!__POSTGRESQL__!>, <!print 'skipping automl for postgres'!>, <! import sys from os import path import math -# Add convex module to the pythonpath. # TODO: ? sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) sys.path.append(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))) @@ -46,8 +46,8 @@ class HyperbandScheduleTestCase(unittest.TestCase): self.module_patcher = patch.dict('sys.modules', patches) self.module_patcher.start() - import deep_learning.madlib_keras_automl - self.module = deep_learning.madlib_keras_automl + import deep_learning.madlib_keras_automl_hyperband + self.module = deep_learning.madlib_keras_automl_hyperband # self.module.MstLoaderInputValidator._validate_input_args = \ # MagicMock() @@ -222,13 +222,13 @@ class AutoMLHyperoptTestCase(unittest.TestCase): self.module_patcher = patch.dict('sys.modules', patches) self.module_patcher.start() - import deep_learning.madlib_keras_automl - self.module = deep_learning.madlib_keras_automl + import deep_learning.madlib_keras_automl_hyperopt + self.module = deep_learning.madlib_keras_automl_hyperopt - from deep_learning.madlib_keras_automl import AutoMLHyperopt + # from deep_learning.madlib_keras_automl_hyperopt import AutoMLHyperopt self.seg_num_mock = Mock() - class FakeAutoMLHyperopt(AutoMLHyperopt): + class FakeAutoMLHyperopt(self.module.AutoMLHyperopt): def __init__(self, *args): pass self.module.get_seg_number = self.seg_num_mock @@ -282,3 +282,5 @@ class AutoMLHyperoptTestCase(unittest.TestCase): if __name__ == '__main__': unittest.main() + +!>)
