Repository: incubator-hawq Updated Branches: refs/heads/master c2280debb -> 7e0c63adc
HAWQ-1012. Check whether the input yaml file for hawq register is valid. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/31c3cde5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/31c3cde5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/31c3cde5 Branch: refs/heads/master Commit: 31c3cde5565a26023bd314c64cccfcd669032680 Parents: c2280de Author: xunzhang <[email protected]> Authored: Wed Aug 24 11:20:44 2016 +0800 Committer: Ruilong Huo <[email protected]> Committed: Thu Aug 25 10:24:34 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 110 ++++++++++++++++++++++++++++++-------------- 1 file changed, 75 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/31c3cde5/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 7a20906..c2692d8 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -54,59 +54,96 @@ def option_parser(): return parser +def register_yaml_dict_check(D): + # check exists + check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName'] + for attr in check_list: + if D.get(attr) == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr) + sys.exit(1) + if D['FileFormat'] in ['Parquet', 'AO']: + prefix = D['FileFormat'] + local_check_list = ['%s_FileLocations' % prefix, '%s_Schema' % prefix] + for attr in local_check_list: + if D.get(attr) == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr) + sys.exit(1) + if D['%s_FileLocations' % prefix].get('Files') == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files' % prefix) + sys.exit(1) + for d in D['%s_FileLocations' % prefix]['Files']: + if d.get('path') == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.path' % prefix) + sys.exit(1) + if d.get('size') == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.size' % prefix) + sys.exit(1) + else: + logger.error('hawq register only support Parquet and AO formats. Format %s is not supported.' % D['FileFormat']) + sys.exit(1) + prefix = D['FileFormat'] + if D.get('%s_Schema' % prefix) == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema' % prefix) + sys.exit(1) + for d in D['%s_Schema' % prefix]: + if d.get('name') == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.name' % prefix) + sys.exit(1) + if d.get('type') == None: + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.type' % prefix) + sys.exit(1) + if D['FileFormat'] == 'Parquet': + sub_check_list = ['CompressionLevel', 'CompressionType', 'PageSize', 'RowGroupSize'] + for attr in sub_check_list: + if not D['Parquet_FileLocations'].has_key(attr): + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % 'Parquet_FileLocations.%s' % attr) + sys.exit(1) + else: + sub_check_list = ['Checksum', 'CompressionLevel', 'CompressionType'] + for attr in sub_check_list: + if not D['AO_FileLocations'].has_key(attr): + logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % 'AO_FileLocations.%s' % attr) + sys.exit(1) + + + def option_parser_yml(yml_file): import yaml with open(yml_file, 'r') as f: params = yaml.load(f) - # check if valid configuration yaml file - attrs = ['FileFormat', 'DFS_URL', 'Distribution_Policy'] - for attr in attrs: - if attr not in params.keys(): - logger.error('Wrong configuration yaml file format, see example in "hawq register --help"') - sys.exit(1) + register_yaml_dict_check(params) if params['FileFormat'] == 'Parquet': - attrs = ['Parquet_FileLocations', 'Parquet_Schema'] - for attr in attrs: - if attr not in params.keys(): - logger.error('Wrong configuration yaml file format, see example in "hawq register --help"') - sys.exit(1) - if not params['Parquet_FileLocations'].get('Files'): - logger.error('Wrong configuration yaml file format, see example in "hawq register --help"') - sys.exit(1) if not len(params['Parquet_FileLocations']['Files']): - return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'] + return 'Parquet', '', params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'] offset = params['Parquet_FileLocations']['Files'][0]['path'].rfind('/') filepath = (params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path'][:offset] if len(params['Parquet_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['Parquet_FileLocations']['Files'][0]['path']) - return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'] - attrs = ['AO_FileLocations', 'AO_Schema'] - for attr in attrs: - if attr not in params.keys(): - logger.error('Wrong configuration yaml file format, see example in "hawq register --help"') - sys.exit(1) - if not (params['AO_FileLocations']['Files']): - return 'AO', '', params['AO_Schema'], params['Distribution_Policy'] - if not params['AO_FileLocations'].get('Files'): - logger.error('Wrong configuration yaml file format, see example in "hawq register --help"') - sys.exit(1) + return 'Parquet', filepath, params['Parquet_Schema'], params['Distribution_Policy'], params['Parquet_FileLocations'] + if not len(params['AO_FileLocations']['Files']): + return 'AO', '', params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'] offset = params['AO_FileLocations']['Files'][0]['path'].rfind('/') filepath = (params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path'][:offset] if len(params['AO_FileLocations']['Files']) != 1 else params['DFS_URL'] + params['AO_FileLocations']['Files'][0]['path']) - return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'] + return 'AO', filepath, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'] -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy): +def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations): try: schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info]) fmt = 'ROW' if fmt == 'AO' else fmt - query = 'create table %s(%s) with (appendonly=true, orientation=%s) %s;' % (tablename, schema, fmt, distrbution_policy) + if fmt == 'ROW': + query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s) %s;' + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], distrbution_policy)) + else: # Parquet + query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s) %s;' + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], distrbution_policy)) conn = dbconn.connect(dburl, False) rows = dbconn.execSQL(conn, query) conn.commit() except DatabaseError, ex: - logger.error('Failed to execute query ""%s"' % query) + logger.error('Failed to execute query "%s"' % query) sys.exit(1) @@ -190,6 +227,8 @@ def get_metadata_from_database(dburl, tablename, seg_name): def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir): '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' + if not filepath: + return # check whether the files to be registered is in hdfs filesystem = filepath.split('://') if filesystem[0] != 'hdfs': @@ -305,21 +344,22 @@ if __name__ == '__main__': filepath, database, tablename = options.filepath, options.database, args[0] if options.yml_config: # Usage2 - fileformat, filepath, schema, distribution_policy = option_parser_yml(options.yml_config) - create_table(dburl, tablename, schema, fileformat, distribution_policy) + fileformat, filepath, schema, distribution_policy, file_locations = option_parser_yml(options.yml_config) + create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations) else: fileformat = 'Parquet' check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table - + if not filepath: + sys.exit(0) seg_name = get_seg_name(dburl, tablename, database, fileformat) firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name) + sizes = 0 check_files_and_table_in_same_hdfs_cluster(filepath, tabledir) - files, sizes = get_files_in_hdfs(filepath) print 'File(s) to be registered:', files if fileformat == 'Parquet': check_parquet_format(files) - print files + print files move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes) logger.info('Hawq Register Succeed.')
