Repository: systemml Updated Branches: refs/heads/master a2db1ad89 -> e94374afb
http://git-wip-us.apache.org/repos/asf/systemml/blob/e94374af/scripts/perftest/python/train.py ---------------------------------------------------------------------- diff --git a/scripts/perftest/python/train.py b/scripts/perftest/python/train.py index 627ba03..ec784d7 100755 --- a/scripts/perftest/python/train.py +++ b/scripts/perftest/python/train.py @@ -22,14 +22,18 @@ import sys from os.path import join -from utils import config_writer, relevant_folders, mat_type_check +from utils_misc import config_writer, mat_type_check from functools import reduce +from utils_fs import relevant_folders # Contains configuration setting for training DATA_FORMAT = 'csv' -def binomial_m_svm_train(save_folder_name, datagen_dir, train_dir): +def binomial_m_svm_train(save_folder_name, datagen_dir, train_dir, config_dir): + + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1]: @@ -39,21 +43,19 @@ def binomial_m_svm_train(save_folder_name, datagen_dir, train_dir): maxiter = 20 X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - - model = join(full_path_train, 'model.data') - Log = join(full_path_train, 'Log.data') - + model = join(train_write + '.' + str(i), 'model.data') + Log = join(train_write + '.' + str(i), 'Log.data') config = dict(X=X, Y=Y, icpt=icpt, classes=2, reg=reg, tol=tol, maxiter=maxiter, model=model, Log=Log, fmt=DATA_FORMAT) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def binomial_l2_svm_train(save_folder_name, datagen_dir, train_dir): +def binomial_l2_svm_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1]: @@ -63,23 +65,21 @@ def binomial_l2_svm_train(save_folder_name, datagen_dir, train_dir): maxiter = '100' X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - - model = join(full_path_train, 'model.data') - Log = join(full_path_train, 'Log.data') - + model = join(train_write + '.' + str(i), 'model.data') + Log = join(train_write + '.' + str(i), 'Log.data') config = dict(X=X, Y=Y, icpt=icpt, reg=reg, tol=tol, maxiter=maxiter, model=model, Log=Log, fmt=DATA_FORMAT) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def binomial_multilogreg_train(save_folder_name, datagen_dir, train_dir): - data_folders = [] +def binomial_multilogreg_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) + data_folders = [] for i in [0, 1, 2]: icpt = str(i) reg = '0.01' @@ -88,125 +88,117 @@ def binomial_multilogreg_train(save_folder_name, datagen_dir, train_dir): mii = '5' X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - - B = join(full_path_train, 'B.data') - + B = join(train_write + '.' + str(i), 'B.data') config = dict(X=X, Y=Y, icpt=icpt, reg=reg, tol=tol, moi=moi, mii=mii, B=B) - config_writer(full_path_train + '.json', config) - return data_folders - - -def multinomial_m_svm_train(save_folder_name, datagen_dir, train_dir): - - data_folders = [] - for i in [0, 1]: - icpt = str(i) - reg = '0.01' - tol = '0.0001' - maxiter = '20' - X = join(datagen_dir, 'X.data') - Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - model = join(full_path_train, 'model.data') - Log = join(full_path_train, 'Log.data') - - config = dict(X=X, Y=Y, icpt=icpt, classes=150, reg=reg, tol=tol, maxiter=maxiter, - model=model, Log=Log, fmt=DATA_FORMAT) - config_writer(full_path_train + '.json', config) - data_folders.append(full_path_train) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def clustering_kmeans_train(save_folder_name, datagen_dir, train_dir): +def clustering_kmeans_train(save_folder_name, datagen_dir, train_dir, config_dir): - X = join(datagen_dir, 'X.data') + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) - full_path_train = join(train_dir, save_folder_name) - C = join(full_path_train, 'C.data') + X = join(datagen_dir, 'X.data') + C = join(train_write, 'C.data') k = '50' maxi = '50' tol = '0.0001' config = dict(X=X, k=k, maxi=maxi, tol=tol, C=C) + config_writer(save_path + '.json', config) - config_writer(full_path_train + '.json', config) + return [save_path] - return [full_path_train] - -def stats1_univar_stats_train(save_folder_name, datagen_dir, train_dir): +def stats1_univar_stats_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) X = join(datagen_dir, 'X.data') TYPES = join(datagen_dir, 'types') - - full_path_train = join(train_dir, save_folder_name) - STATS = join(full_path_train, 'STATS.data') + STATS = join(train_write, 'STATS.data') config = dict(X=X, TYPES=TYPES, STATS=STATS) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.json', config) - return [full_path_train] + return [save_path] -def stats1_bivar_stats_train(save_folder_name, datagen_dir, train_dir): +def stats1_bivar_stats_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) X = join(datagen_dir, 'X.data') index1 = join(datagen_dir, 'set1.indices') index2 = join(datagen_dir, 'set2.indices') types1 = join(datagen_dir, 'set1.types') types2 = join(datagen_dir, 'set2.types') + config = dict(X=X, index1=index1, index2=index2, types1=types1, types2=types2, OUTDIR=train_write) + config_writer(save_path + '.json', config) - full_path_train = join(train_dir, save_folder_name) - OUTDIR = full_path_train + return [save_path] - config = dict(X=X, index1=index1, index2=index2, types1=types1, types2=types2, OUTDIR=OUTDIR) - config_writer(full_path_train + '.json', config) - return [full_path_train] - -def stats2_stratstats_train(save_folder_name, datagen_dir, train_dir): +def stats2_stratstats_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) X = join(datagen_dir, 'X.data') Xcid = join(datagen_dir, 'Xcid.data') Ycid = join(datagen_dir, 'Ycid.data') + O = join(train_write, 'O.data') + config = dict(X=X, Xcid=Xcid, Ycid=Ycid, O=O, fmt=DATA_FORMAT) + config_writer(save_path + '.json', config) - full_path_train = join(train_dir, save_folder_name) - O = join(full_path_train, 'O.data') + return [save_path] - config = dict(X=X, Xcid=Xcid, Ycid=Ycid, O=O, fmt=DATA_FORMAT) - config_writer(full_path_train + '.json', config) +def multinomial_m_svm_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) - return [full_path_train] + data_folders = [] + for i in [0, 1]: + icpt = str(i) + reg = '0.01' + tol = '0.0001' + maxiter = '20' + X = join(datagen_dir, 'X.data') + Y = join(datagen_dir, 'Y.data') + model = join(train_write + '.' + str(i), 'model.data') + Log = join(train_write + '.' + str(i), 'Log.data') + config = dict(X=X, Y=Y, icpt=icpt, classes=150, reg=reg, tol=tol, maxiter=maxiter, + model=model, Log=Log, fmt=DATA_FORMAT) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) + + return data_folders -def multinomial_naive_bayes_train(save_folder_name, datagen_dir, train_dir): +def multinomial_naive_bayes_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') classes = '150' - - full_path_train = join(train_dir, save_folder_name) - prior = join(full_path_train, 'prior') - conditionals = join(full_path_train, 'conditionals') - accuracy = join(full_path_train, 'accuracy') - fmt = DATA_FORMAT - probabilities = join(full_path_train, 'probabilities') - + prior = join(train_write, 'prior') + conditionals = join(train_write, 'conditionals') + accuracy = join(train_write, 'accuracy') + probabilities = join(train_write, 'probabilities') config = dict(X=X, Y=Y, classes=classes, prior=prior, conditionals=conditionals, - accuracy=accuracy, fmt=fmt, probabilities=probabilities) + accuracy=accuracy, fmt=DATA_FORMAT, probabilities=probabilities) + config_writer(save_path + '.json', config) - config_writer(full_path_train + '.json', config) + return [save_path] - return [full_path_train] - -def multinomial_multilogreg_train(save_folder_name, datagen_dir, train_dir): +def multinomial_multilogreg_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: @@ -217,18 +209,17 @@ def multinomial_multilogreg_train(save_folder_name, datagen_dir, train_dir): mii = '0' X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - B = join(full_path_train, 'B.data') - + B = join(train_write + '.' + str(i), 'B.data') config = dict(X=X, Y=Y, B=B, icpt=icpt, reg=reg, tol=tol, moi=moi, mii=mii, fmt=DATA_FORMAT) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def regression1_linearregds_train(save_folder_name, datagen_dir, train_dir): +def regression1_linearregds_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: @@ -236,18 +227,17 @@ def regression1_linearregds_train(save_folder_name, datagen_dir, train_dir): reg = '0.01' X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - B = join(full_path_train, 'B.data') - + B = join(train_write + '.' + str(i), 'B.data') config = dict(X=X, Y=Y, B=B, icpt=icpt, fmt=DATA_FORMAT, reg=reg) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def regression1_linearregcg_train(save_folder_name, datagen_dir, train_dir): +def regression1_linearregcg_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: @@ -257,29 +247,24 @@ def regression1_linearregcg_train(save_folder_name, datagen_dir, train_dir): maxi = '20' X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name + '.' + str(i)) - data_folders.append(full_path_train) - B = join(full_path_train, 'B.data') - + B = join(train_write + '.' + str(i), 'B.data') config = dict(X=X, Y=Y, B=B, icpt=icpt, fmt=DATA_FORMAT, maxi=maxi, tol=tol, reg=reg) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def regression2_glm_gamma_train(save_folder_name, datagen_dir, train_dir): +def regression2_glm_gamma_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name) - data_folders.append(full_path_train) - - B = join(full_path_train, 'B.data') + B = join(train_write + '.' + str(i), 'B.data') icpt = str(i) fmt = DATA_FORMAT moi = '200' @@ -292,24 +277,22 @@ def regression2_glm_gamma_train(save_folder_name, datagen_dir, train_dir): reg = '0.01' config = dict(X=X, Y=Y, B=B, icpt=icpt, fmt=fmt, moi=moi, mii=mii, dfam=dfam, vpov=vpow, link=link, lpow=lpow, tol=tol, reg=reg) - - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def regression2_glm_binomial_train(save_folder_name, datagen_dir, train_dir): +def regression2_glm_binomial_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name) - data_folders.append(full_path_train) - - B = join(full_path_train, 'B.data') + B = join(train_write + '.' + str(i), 'B.data') icpt = str(i) fmt = DATA_FORMAT moi = '200' @@ -321,24 +304,22 @@ def regression2_glm_binomial_train(save_folder_name, datagen_dir, train_dir): reg = '0.01' config = dict(X=X, Y=Y, B=B, icpt=icpt, fmt=fmt, moi=moi, mii=mii, dfam=dfam, link=link, yneg=yneg, tol=tol, reg=reg) - - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def regression2_glm_poisson_train(save_folder_name, datagen_dir, train_dir): +def regression2_glm_poisson_train(save_folder_name, datagen_dir, train_dir, config_dir): + save_path = join(config_dir, save_folder_name) + train_write = join(train_dir, save_folder_name) data_folders = [] for i in [0, 1, 2]: X = join(datagen_dir, 'X.data') Y = join(datagen_dir, 'Y.data') - - full_path_train = join(train_dir, save_folder_name) - data_folders.append(full_path_train) - - B = join(full_path_train, 'B.data') + B = join(train_write + '.' + str(i), 'B.data') icpt = str(i) fmt = DATA_FORMAT moi = '200' @@ -351,12 +332,13 @@ def regression2_glm_poisson_train(save_folder_name, datagen_dir, train_dir): reg = '0.01' config = dict(X=X, Y=Y, B=B, icpt=icpt, fmt=fmt, moi=moi, mii=mii, dfam=dfam, vpov=vpov, link=link, lpow=lpow, tol=tol, reg=reg) - config_writer(full_path_train + '.json', config) + config_writer(save_path + '.' + str(i) + '.json', config) + data_folders.append(save_path + '.' + str(i)) return data_folders -def config_packets_train(algo_payload, matrix_type, matrix_shape, datagen_dir, train_dir, dense_algos): +def config_packets_train(algo_payload, matrix_type, matrix_shape, datagen_dir, train_dir, dense_algos, config_dir): """ This function has two responsibilities. Generate the configuration files for input training algorithms and return a dictionary that will be used for execution. @@ -380,11 +362,13 @@ def config_packets_train(algo_payload, matrix_type, matrix_shape, datagen_dir, t dense_algos: List Algorithms that support only dense matrix type + config_dir: String + Location to store to configuration json file + return: {string: list} This dictionary contains algorithms to be executed as keys and the path of configuration json files to be executed list of values. """ - config_bundle = {} for k, _ in algo_payload: @@ -394,6 +378,7 @@ def config_packets_train(algo_payload, matrix_type, matrix_shape, datagen_dir, t current_matrix_type = mat_type_check(current_family, matrix_type, dense_algos) data_gen_folders = relevant_folders(datagen_dir, current_algo, current_family, current_matrix_type, matrix_shape, 'data-gen') + if len(data_gen_folders) == 0: print('datagen folders not present for {}'.format(current_family)) sys.exit() @@ -403,7 +388,7 @@ def config_packets_train(algo_payload, matrix_type, matrix_shape, datagen_dir, t save_name = '.'.join([current_algo] + [file_path_last]) algo_func = '_'.join([current_family] + [current_algo.lower().replace('-', '_')] + ['train']) - conf_path = globals()[algo_func](save_name, current_datagen_dir, train_dir) + conf_path = globals()[algo_func](save_name, current_datagen_dir, train_dir, config_dir) config_bundle[current_algo].append(conf_path) config_packets = {} http://git-wip-us.apache.org/repos/asf/systemml/blob/e94374af/scripts/perftest/python/utils.py ---------------------------------------------------------------------- diff --git a/scripts/perftest/python/utils.py b/scripts/perftest/python/utils.py deleted file mode 100755 index 4bba34f..0000000 --- a/scripts/perftest/python/utils.py +++ /dev/null @@ -1,390 +0,0 @@ -#!/usr/bin/env python3 -#------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------- - -from os.path import join -import os -import json -import subprocess -import shlex -import re -import logging -import sys -import glob -from functools import reduce - -# This file contains all the utility functions required for performance test module - - -def get_families(current_algo, ml_algo): - """ - Given current algorithm we get its families. - - current_algo : String - Input algorithm specified - - ml_algo : Dictionary - key, value dictionary with family as key and algorithms as list of values - - return: List - List of families returned - """ - - family_list = [] - for family, algos in ml_algo.items(): - if current_algo in algos: - family_list.append(family) - return family_list - - -def split_rowcol(matrix_dim): - """ - Split the input matrix dimensions into row and columns - - matrix_dim: String - Input concatenated string with row and column - - return: Tuple - Row and column split based on suffix - """ - - k = str(0) * 3 - M = str(0) * 6 - replace_M = matrix_dim.replace('M', str(M)) - replace_k = replace_M.replace('k', str(k)) - row, col = replace_k.split('_') - return row, col - - -def config_writer(write_path, config_obj): - """ - Writes the dictionary as an configuration json file to the give path - - write_path: String - Absolute path of file name to be written - - config_obj: List or Dictionary - Can be a dictionary or a list based on the object passed - """ - - with open(write_path, 'w') as input_file: - json.dump(config_obj, input_file, indent=4) - - -def config_reader(read_path): - """ - Read json file given path - - return: List or Dictionary - Reading the json file can give us a list if we have positional args or - key value for a dictionary - """ - - with open(read_path, 'r') as input_file: - conf_file = json.load(input_file) - - return conf_file - - -def create_dir(directory): - """ - Create directory given path if the directory does not exist already - - directory: String - Input folder path - """ - - if not os.path.exists(directory): - os.makedirs(directory) - - -def get_existence(path, action_mode): - """ - Check SUCCESS file is present in the input path - - path: String - Input folder path - - action_mode : String - Type of action data-gen, train ... - - return: Boolean check if the file _SUCCESS exists - """ - - if action_mode == 'data-gen': - full_path = join(path, '_SUCCESS') - exist = os.path.isfile(full_path) - else: - # Files does not exist for other modes return False to continue - # For e.g some predict algorithms do not generate an output folder - # hence checking for SUCCESS would fail - exist = False - - return exist - - -def exec_dml_and_parse_time(exec_type, dml_file_name, execution_output_file, args, time=True): - """ - This function is responsible of execution of input arguments via python sub process, - We also extract time obtained from the output of this subprocess - - exec_type: String - Contains the execution type singlenode / hybrid_spark - - dml_file_name: String - DML file name to be used while processing the arguments give - - execution_output_file: String - Name of the file where the output of the DML run is written out - - args: Dictionary - Key values pairs depending on the arg type - - time: Boolean (default=True) - Boolean argument used to extract time from raw output logs. - """ - - algorithm = dml_file_name + '.dml' - if exec_type == 'singlenode': - exec_script = join(os.environ.get('SYSTEMML_HOME'), 'bin', 'systemml-standalone.py') - - args = ''.join(['{} {}'.format(k, v) for k, v in args.items()]) - cmd = [exec_script, algorithm, args] - cmd_string = ' '.join(cmd) - - if exec_type == 'hybrid_spark': - exec_script = join(os.environ.get('SYSTEMML_HOME'), 'bin', 'systemml-spark-submit.py') - args = ''.join(['{} {}'.format(k, v) for k, v in args.items()]) - cmd = [exec_script, '-f', algorithm, args] - cmd_string = ' '.join(cmd) - - # Debug - # print(cmd_string) - - # Subprocess to execute input arguments - # proc1_log contains the shell output which is used for time parsing - proc1 = subprocess.Popen(shlex.split(cmd_string), stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - if time: - proc1_log = [] - while proc1.poll() is None: - raw_std_out = proc1.stdout.readline() - decode_raw = raw_std_out.decode('ascii').strip() - proc1_log.append(decode_raw) - logging.log(10, decode_raw) - - _, err1 = proc1.communicate() - - if "Error" in str(err1): - print('Error Found in {}'.format(dml_file_name)) - total_time = 'failure' - else: - total_time = parse_time(proc1_log) - - with open(execution_output_file, 'w') as file: - for row in proc1_log: - file.write("%s\n" % str(row)) - - else: - total_time = 'not_specified' - - return total_time - - -def parse_time(raw_logs): - """ - Parses raw input list and extracts time - - raw_logs : List - Each line obtained from the standard output is in the list - - return: String - Extracted time in seconds or time_not_found - """ - # Debug - # print(raw_logs) - - for line in raw_logs: - if line.startswith('Total execution time'): - extract_time = re.findall(r'\d+', line) - total_time = '.'.join(extract_time) - - return total_time - - return 'time_not_found' - - -def exec_test_data(exec_type, path): - """ - Creates the test data split from the given input path - - exec_type : String - Contains the execution type singlenode / hybrid_spark - - path : String - Location of the input folder to pick X and Y - """ - systemml_home = os.environ.get('SYSTEMML_HOME') - test_split_script = join(systemml_home, 'scripts', 'perftest', 'extractTestData') - X = join(path, 'X.data') - Y = join(path, 'Y.data') - X_test = join(path, 'X_test.data') - Y_test = join(path, 'Y_test.data') - args = {'-args': ' '.join([X, Y, X_test, Y_test, 'csv'])} - - # Call the exec script without time - config_file_name = path.split('/')[-1] - exec_dml_and_parse_time(exec_type, test_split_script, config_file_name, args, False) - - -def check_predict(current_algo, ml_predict): - """ - To check if the current algorithm requires to run the predict - - current_algo: String - Algorithm being processed - - ml_predict: Dictionary - Key value pairs of algorithm and predict file to process - """ - if current_algo in ml_predict.keys(): - return True - - -def get_folder_metrics(folder_name, action_mode): - """ - Gets metrics from folder name - - folder_name: String - Folder from which we want to grab details - - return: List(3) - A list with mat_type, mat_shape, intercept - """ - - if action_mode == 'data-gen': - split_name = folder_name.split('.') - mat_type = split_name[1] - mat_shape = split_name[2] - intercept = 'none' - - try: - if action_mode == 'train': - split_name = folder_name.split('.') - mat_type = split_name[3] - mat_shape = split_name[2] - intercept = split_name[4] - - if action_mode == 'predict': - split_name = folder_name.split('.') - mat_type = split_name[3] - mat_shape = split_name[2] - intercept = split_name[4] - except IndexError: - intercept = 'none' - - return mat_type, mat_shape, intercept - - -def mat_type_check(current_family, matrix_types, dense_algos): - """ - Some Algorithms support different matrix_type. This function give us the right matrix_type given - an algorithm - - current_family: String - Current family being porcessed in this function - - matrix_type: List - Type of matrix to generate dense, sparse, all - - dense_algos: List - Algorithms that support only dense matrix type - - return: List - Return the list of right matrix types supported by the family - """ - current_type = [] - for current_matrix_type in matrix_types: - if current_matrix_type == 'all': - if current_family in dense_algos: - current_type.append('dense') - else: - current_type.append('dense') - current_type.append('sparse') - - if current_matrix_type == 'sparse': - if current_family in dense_algos: - sys.exit('{} does not support {} matrix type'.format(current_family, - current_matrix_type)) - else: - current_type.append(current_matrix_type) - - if current_matrix_type == 'dense': - current_type.append(current_matrix_type) - - return current_type - - -def relevant_folders(path, algo, family, matrix_type, matrix_shape, mode): - """ - Finds the right folder to read the data based on given parameters - - path: String - Location of data-gen and training folders - - algo: String - Current algorithm being processed by this function - - family: String - Current family being processed by this function - - matrix_type: List - Type of matrix to generate dense, sparse, all - - matrix_shape: List - Dimensions of the input matrix with rows and columns - - mode: String - Based on mode and arguments we read the specific folders e.g data-gen folder or train folder - - return: List - List of folder locations to read data from - """ - folders = [] - for current_matrix_type in matrix_type: - for current_matrix_shape in matrix_shape: - if mode == 'data-gen': - data_gen_path = join(path, family) - sub_folder_name = '.'.join([current_matrix_type, current_matrix_shape]) - path_subdir = glob.glob(data_gen_path + '.' + sub_folder_name + "*") - - if mode == 'train': - train_path = join(path, algo) - sub_folder_name = '.'.join([family, current_matrix_type, current_matrix_shape]) - path_subdir = glob.glob(train_path + '.' + sub_folder_name + "*") - - path_folders = list(filter(lambda x: os.path.isdir(x), path_subdir)) - folders.append(path_folders) - - folders_flat = reduce(lambda x, y: x + y, folders) - - return folders_flat http://git-wip-us.apache.org/repos/asf/systemml/blob/e94374af/scripts/perftest/python/utils_exec.py ---------------------------------------------------------------------- diff --git a/scripts/perftest/python/utils_exec.py b/scripts/perftest/python/utils_exec.py new file mode 100755 index 0000000..0eb2873 --- /dev/null +++ b/scripts/perftest/python/utils_exec.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import subprocess +import shlex +import re + +# Subprocess and log parsing related functions + + +def subprocess_exec(cmd_string, extract=None): + """ + Execute the input string as subprocess + + cmd_string: String + Input string to be executed as a sub process + + extract: String + Based on extract as time/dir we extract this information from + the logs accordingly + + return: String + Based on extract we return the relevant string + """ + # Debug + # print(cmd_string) + proc1 = subprocess.Popen(shlex.split(cmd_string), stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + error_arr, out_arr = get_all_logs(proc1) + std_outs = out_arr + error_arr + return_code = proc1.returncode + + if return_code == 0: + if extract == 'time': + return_data = parse_time(std_outs) + if extract == 'dir': + return_data = parse_hdfs_paths(std_outs) + if extract is None: + return_data = 0 + + if return_code != 0: + return_data = 'proc_fail' + print('sub-process failed, return code {}'.format(return_code)) + + return return_data + + +def get_all_logs(process): + """ + Based on the subprocess capture logs + + process: Process + Process object + + return: List, List + Std out and Error as logs as list + """ + out_arr = [] + while True: + nextline = process.stdout.readline().decode('utf8').strip() + out_arr.append(nextline) + if nextline == '' and process.poll() is not None: + break + + error_arr = [] + while True: + nextline = process.stderr.readline().decode('utf8').strip() + error_arr.append(nextline) + if nextline == '' and process.poll() is not None: + break + + return out_arr, error_arr + + +def parse_hdfs_paths(std_outs): + """ + Extract the hdfs paths from the input + + std_outs: List + Std outs obtained from the subprocess + + return: List + Obtain a list of hdfs paths + """ + + hdfs_dir = [] + for i in std_outs: + if 'No such file or directory' in i: + break + elif 'hdfs' in i: + current_dir = i.split(' ')[-1] + hdfs_dir.append(current_dir) + + return hdfs_dir + + +def parse_time(raw_logs): + """ + Parses raw input list and extracts time + + raw_logs : List + Each line obtained from the standard output is in the list + + return: String + Extracted time in seconds or time_not_found + """ + # Debug + # print(raw_logs) + + for line in raw_logs: + if 'ERROR' in line: + return 'error' + if line.startswith('Total execution time'): + extract_time = re.findall(r'\d+', line) + total_time = '.'.join(extract_time) + return total_time + return 'time_not_found' http://git-wip-us.apache.org/repos/asf/systemml/blob/e94374af/scripts/perftest/python/utils_fs.py ---------------------------------------------------------------------- diff --git a/scripts/perftest/python/utils_fs.py b/scripts/perftest/python/utils_fs.py new file mode 100755 index 0000000..977c4f4 --- /dev/null +++ b/scripts/perftest/python/utils_fs.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import os +from os.path import join +import glob +from functools import reduce +from utils_exec import subprocess_exec + +# Utility support for all file system related operations + + +def create_dir_local(directory): + """ + Create a directory in the local fs + + directory: String + Location to create a directory + """ + if not os.path.exists(directory): + os.makedirs(directory) + + +def write_success(time, path): + """ + Write SUCCESS file in the given directory + + time: String + Time taken to execute the dml script + + path: String + Location to write the SUCCESS file + """ + if 'data-gen' in path: + if path.startswith('hdfs') and len(time.split('.')) == 2: + full_path = join(path, '_SUCCESS') + cmd = ['hdfs', 'dfs', '-touchz', full_path] + subprocess_exec(' '.join(cmd)) + else: + if len(time.split('.')) == 2: + full_path = join(path, '_SUCCESS') + open(full_path, 'w').close() + + +def check_SUCCESS_file_exists(path): + """ + Check SUCCESS file is present in the input path + + path: String + Input folder path + + action_mode : String + Type of action data-gen, train ... + + return: Boolean + Checks if the file _SUCCESS exists + """ + if 'data-gen' in path: + if path.startswith('hdfs'): + full_path = join(path, '_SUCCESS') + cmd = ['hdfs', 'dfs', '-test', '-e', full_path] + return_code = os.system(' '.join(cmd)) + if return_code == 0: + return True + else: + full_path = join(path, '_SUCCESS') + exist = os.path.isfile(full_path) + return exist + return False + + +def contains_dir(hdfs_dirs, sub_folder): + """ + Support for Lambda Function to check if a HDFS subfolder is contained by the HDFS directory + """ + if sub_folder in hdfs_dirs: + return True + else: + # Debug + # print('{}, {}'.format(sub_folder, hdfs_dirs)) + pass + return False + + +def relevant_folders(path, algo, family, matrix_type, matrix_shape, mode): + """ + Finds the right folder to read the data based on given parameters + + path: String + Location of data-gen and training folders + + algo: String + Current algorithm being processed by this function + + family: String + Current family being processed by this function + + matrix_type: List + Type of matrix to generate dense, sparse, all + + matrix_shape: List + Dimensions of the input matrix with rows and columns + + mode: String + Based on mode and arguments we read the specific folders e.g data-gen folder or train folder + + return: List + List of folder locations to read data from + """ + folders = [] + + for current_matrix_type in matrix_type: + for current_matrix_shape in matrix_shape: + if path.startswith('hdfs'): + if mode == 'data-gen': + sub_folder_name = '.'.join([family, current_matrix_type, current_matrix_shape]) + cmd = ['hdfs', 'dfs', '-ls', path] + path_subdir = subprocess_exec(' '.join(cmd), 'dir') + + if mode == 'train': + sub_folder_name = '.'.join([algo, family, current_matrix_type, current_matrix_shape]) + cmd = ['hdfs', 'dfs', '-ls', path] + path_subdir = subprocess_exec(' '.join(cmd), 'dir') + + path_folders = list(filter(lambda x: contains_dir(x, sub_folder_name), path_subdir)) + + else: + if mode == 'data-gen': + data_gen_path = join(path, family) + sub_folder_name = '.'.join([current_matrix_type, current_matrix_shape]) + path_subdir = glob.glob(data_gen_path + '.' + sub_folder_name + "*") + + if mode == 'train': + train_path = join(path, algo) + sub_folder_name = '.'.join([family, current_matrix_type, current_matrix_shape]) + path_subdir = glob.glob(train_path + '.' + sub_folder_name + "*") + + path_folders = list(filter(lambda x: os.path.isdir(x), path_subdir)) + + folders.append(path_folders) + + folders_flat = reduce(lambda x, y: x + y, folders) + return folders_flat http://git-wip-us.apache.org/repos/asf/systemml/blob/e94374af/scripts/perftest/python/utils_misc.py ---------------------------------------------------------------------- diff --git a/scripts/perftest/python/utils_misc.py b/scripts/perftest/python/utils_misc.py new file mode 100755 index 0000000..0a765f6 --- /dev/null +++ b/scripts/perftest/python/utils_misc.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python3 +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +from os.path import join +import os +import json +import re +import sys +from utils_exec import subprocess_exec + +# This file contains all misc utility functions required by performance test module + + +def get_config_args(config_dict, spark_dict, exec_type): + """ + Based on configuration parameters passed build configuration dictionary used by subprocess + + config_dict: Dictionary + General configuration options + + spark_dict: Dictionary + Spark configuration options + + exec_type: String + Contains the execution type singlenode / hybrid_spark + + return: Dictionary, Dictionary + Based on the parameters passed we build to dictionary that need to be passed either at the + beginning or at the end + """ + + sup_args_dict = {} + + if config_dict['stats'] is not None: + sup_args_dict['-stats'] = config_dict['stats'] + + if config_dict['explain'] is not None: + sup_args_dict['-explain'] = config_dict['explain'] + + if config_dict['config'] is not None: + sup_args_dict['-config'] = config_dict['config'] + + spark_args_dict = {} + if exec_type == 'hybrid_spark': + if spark_dict['master'] is not None: + spark_args_dict['--master'] = spark_dict['master'] + + if spark_dict['num_executors'] is not None: + spark_args_dict['--num-executors'] = spark_dict['num_executors'] + + if spark_dict['driver_memory'] is not None: + spark_args_dict['--driver-memory'] = spark_dict['driver_memory'] + + if spark_dict['executor_cores'] is not None: + spark_args_dict['--executor-cores'] = spark_dict['executor_cores'] + + if spark_dict['conf'] is not None: + spark_args_dict['--conf'] = ' '.join(spark_dict['conf']) + + return sup_args_dict, spark_args_dict + + +def args_dict_split(all_arguments): + """ + This functions split the super set of arguments to smaller dictionaries + + all_arguments: Dictionary + All input arguments parsed + + return: Dictionary, Dictionary, Dictionary + We return three dictionaries for init, script, spark arguments + """ + args_dict = dict(list(all_arguments.items())[0:9]) + config_dict = dict(list(all_arguments.items())[9:12]) + spark_dict = dict(list(all_arguments.items())[12:]) + + return args_dict, config_dict, spark_dict + + +def get_families(current_algo, ml_algo): + """ + Given current algorithm we get its families. + + current_algo : String + Input algorithm specified + + ml_algo : Dictionary + key, value dictionary with family as key and algorithms as list of values + + return: List + List of families returned + """ + + family_list = [] + for family, algos in ml_algo.items(): + if current_algo in algos: + family_list.append(family) + return family_list + + +def split_rowcol(matrix_dim): + """ + Split the input matrix dimensions into row and columns + + matrix_dim: String + Input concatenated string with row and column + + return: Tuple + Row and column split based on suffix + """ + + k = str(0) * 3 + M = str(0) * 6 + replace_M = matrix_dim.replace('M', str(M)) + replace_k = replace_M.replace('k', str(k)) + row, col = replace_k.split('_') + return row, col + + +def config_writer(write_path, config_obj): + """ + Writes the dictionary as an configuration json file to the give path + + write_path: String + Absolute path of file name to be written + + config_obj: List or Dictionary + Can be a dictionary or a list based on the object passed + """ + + with open(write_path, 'w') as input_file: + json.dump(config_obj, input_file, indent=4) + + +def config_reader(read_path): + """ + Read json file given path + + return: List or Dictionary + Reading the json file can give us a list if we have positional args or + key value args for a dictionary + """ + + with open(read_path, 'r') as input_file: + conf_file = json.load(input_file) + + return conf_file + + +def exec_dml_and_parse_time(exec_type, dml_file_name, args, spark_args_dict, sup_args_dict): + """ + This function is responsible of execution of input arguments via python sub process, + We also extract time obtained from the output of this subprocess + + exec_type: String + Contains the execution type singlenode / hybrid_spark + + dml_file_name: String + DML file name to be used while processing the arguments give + + args: Dictionary + Key values pairs depending on the arg type + + spark_args_dict: Dictionary + Spark configuration arguments + + sup_args_dict: Dictionary + Supplementary arguments required by the script + + return: String + The value of time parsed from the logs / error + """ + + algorithm = dml_file_name + '.dml' + + sup_args = ''.join(['{} {}'.format(k, v) for k, v in sup_args_dict.items()]) + if exec_type == 'singlenode': + exec_script = join(os.environ.get('SYSTEMML_HOME'), 'bin', 'systemml-standalone.py') + + args = ''.join(['{} {}'.format(k, v) for k, v in args.items()]) + cmd = [exec_script, '-f', algorithm, args, sup_args] + cmd_string = ' '.join(cmd) + + if exec_type == 'hybrid_spark': + exec_script = join(os.environ.get('SYSTEMML_HOME'), 'bin', 'systemml-spark-submit.py') + spark_pre_args = ''.join([' {} {} '.format(k, v) for k, v in spark_args_dict.items()]) + args = ''.join(['{} {}'.format(k, v) for k, v in args.items()]) + cmd = [exec_script, spark_pre_args, '-f', algorithm, args, sup_args] + cmd_string = ' '.join(cmd) + + # Debug + # print(cmd_string) + + time = subprocess_exec(cmd_string, 'time') + + return time + + +def parse_time(raw_logs): + """ + Parses raw input list and extracts time + + raw_logs : List + Each line obtained from the standard output is in the list + + return: String + Extracted time in seconds or time_not_found + """ + # Debug + # print(raw_logs) + + for line in raw_logs: + if line.startswith('Total execution time'): + extract_time = re.findall(r'\d+', line) + total_time = '.'.join(extract_time) + + return total_time + + return 'time_not_found' + + +def exec_test_data(exec_type, spark_args_dict, sup_args_dict, datagen_path, config): + """ + Creates the test data split from the given input path + + exec_type : String + Contains the execution type singlenode / hybrid_spark + + path : String + Location of the input folder to pick X and Y + """ + systemml_home = os.environ.get('SYSTEMML_HOME') + test_split_script = join(systemml_home, 'scripts', 'perftest', 'extractTestData') + path = join(datagen_path, config.split('/')[-1]) + X = join(path, 'X.data') + Y = join(path, 'Y.data') + X_test = join(path, 'X_test.data') + Y_test = join(path, 'Y_test.data') + args = {'-args': ' '.join([X, Y, X_test, Y_test, 'csv'])} + exec_dml_and_parse_time(exec_type, test_split_script, args, spark_args_dict, sup_args_dict) + + +def check_predict(current_algo, ml_predict): + """ + To check if the current algorithm requires to run the predict + + current_algo: String + Algorithm being processed + + ml_predict: Dictionary + Key value pairs of algorithm and predict file to process + """ + if current_algo in ml_predict.keys(): + return True + + +def get_folder_metrics(folder_name, action_mode): + """ + Gets metrics from folder name for logging + + folder_name: String + Folder from which we want to grab details + + return: List(3) + A list with mat_type, mat_shape, intercept + """ + + if action_mode == 'data-gen': + split_name = folder_name.split('.') + mat_type = split_name[1] + mat_shape = split_name[2] + intercept = 'none' + + try: + if action_mode == 'train': + split_name = folder_name.split('.') + mat_type = split_name[3] + mat_shape = split_name[2] + intercept = split_name[4] + + if action_mode == 'predict': + split_name = folder_name.split('.') + mat_type = split_name[3] + mat_shape = split_name[2] + intercept = split_name[4] + except IndexError: + intercept = 'none' + + return mat_type, mat_shape, intercept + + +def mat_type_check(current_family, matrix_types, dense_algos): + """ + Some Algorithms support different matrix_types. This function give us the right matrix_type given + an algorithm + + current_family: String + Current family being porcessed in this function + + matrix_type: List + Type of matrix to generate dense, sparse, all + + dense_algos: List + Algorithms that support only dense matrix type + + return: List + Return the list of right matrix types supported by the family + """ + current_type = [] + for current_matrix_type in matrix_types: + if current_matrix_type == 'all': + if current_family in dense_algos: + current_type.append('dense') + else: + current_type.append('dense') + current_type.append('sparse') + + if current_matrix_type == 'sparse': + if current_family in dense_algos: + sys.exit('{} does not support {} matrix type'.format(current_family, + current_matrix_type)) + else: + current_type.append(current_matrix_type) + + if current_matrix_type == 'dense': + current_type.append(current_matrix_type) + + return current_type
