HAWQ-1060. Refactor hawq register with better readability and quality.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/637f9d57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/637f9d57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/637f9d57 Branch: refs/heads/master Commit: 637f9d5787aec24d72a3159b356d388ba116991e Parents: 981c0a9 Author: xunzhang <[email protected]> Authored: Fri Sep 16 18:48:39 2016 +0800 Committer: Lili Ma <[email protected]> Committed: Sun Sep 18 14:39:54 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 680 ++++++++++++++++++++++---------------------- 1 file changed, 335 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/637f9d57/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index bbdc946..ee5275b 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -20,80 +20,81 @@ # Usage1: hawq register [-h hostname] [-p port] [-U username] [-d database] [-f filepath] [-e eof] <tablename> # Usage2: hawq register [-h hostname] [-p port] [-U username] [-d database] [-c config] [--force] [--repair] <tablename> -import os, sys, optparse, getpass, re, urlparse +import os +import sys try: from gppylib.commands.unix import getLocalHostname, getUserName from gppylib.db import dbconn from gppylib.gplog import get_default_logger, setup_tool_logging from gppylib.gpparseopts import OptParser, OptChecker from pygresql import pg - from pygresql.pgdb import DatabaseError from hawqpylib.hawqlib import local_ssh, local_ssh_output except ImportError, e: print e - sys.stderr.write('cannot import module, please check that you have source greenplum_path.sh\n') + sys.stderr.write('Cannot import module, please check that you have source greenplum_path.sh\n') sys.exit(2) # setup logging logger = get_default_logger() EXECNAME = os.path.split(__file__)[-1] -setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) +setup_tool_logging(EXECNAME, getLocalHostname(), getUserName()) def option_parser(): + '''option parser''' parser = OptParser(option_class=OptChecker, usage='usage: %prog [options] table_name', version='%prog version $Revision: #1 $') parser.remove_option('-h') - parser.add_option('-?', '--help', action = 'help') - parser.add_option('-h', '--host', help = 'host of the target DB') - parser.add_option('-p', '--port', help = 'port of the target DB', type = 'int', default = 0) - parser.add_option('-U', '--user', help = 'username of the target DB') - parser.add_option('-d', '--database', default = 'postgres', dest = 'database', help='database name') - parser.add_option('-f', '--filepath', dest = 'filepath', help = 'file name in HDFS') - parser.add_option('-e', '--eof', dest = 'filesize', type = 'int', default = 0, help = 'eof of the file to be registered') - parser.add_option('-c', '--config', dest = 'yml_config', default = '', help = 'configuration file in YAML format') - parser.add_option('--force', action = 'store_true', default = False) - parser.add_option('--repair', action = 'store_true', default = False) + parser.add_option('-?', '--help', action='help') + parser.add_option('-h', '--host', help='host of the target DB') + parser.add_option('-p', '--port', help='port of the target DB', type='int', default=0) + parser.add_option('-U', '--user', help='username of the target DB') + parser.add_option('-d', '--database', default='postgres', dest='database', help='database name') + parser.add_option('-f', '--filepath', dest='filepath', help='file name in HDFS') + parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, help='eof of the file to be registered') + parser.add_option('-c', '--config', dest='yml_config', default='', help='configuration file in YAML format') + parser.add_option('--force', action='store_true', default=False) + parser.add_option('--repair', action='store_true', default=False) return parser def register_yaml_dict_check(D): - # check exists + '''check exists''' check_list = ['DFS_URL', 'Distribution_Policy', 'FileFormat', 'TableName', 'Bucketnum'] for attr in check_list: - if D.get(attr) == None: + if D.get(attr) is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr) sys.exit(1) if D['FileFormat'] in ['Parquet', 'AO']: prefix = D['FileFormat'] local_check_list = ['%s_FileLocations' % prefix, '%s_Schema' % prefix] for attr in local_check_list: - if D.get(attr) == None: + if D.get(attr) is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % attr) sys.exit(1) - if D['%s_FileLocations' % prefix].get('Files') == None: + if D['%s_FileLocations' % prefix].get('Files') is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files' % prefix) sys.exit(1) for d in D['%s_FileLocations' % prefix]['Files']: - if d.get('path') == None: + if d.get('path') is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.path' % prefix) sys.exit(1) - if d.get('size') == None: + if d.get('size') is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_FileLocations.Files.size' % prefix) sys.exit(1) else: logger.error('hawq register only support Parquet and AO formats. Format %s is not supported.' % D['FileFormat']) sys.exit(1) prefix = D['FileFormat'] - if D.get('%s_Schema' % prefix) == None: + if D.get('%s_Schema' % prefix) is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema' % prefix) sys.exit(1) for d in D['%s_Schema' % prefix]: - if d.get('name') == None: + if d.get('name') is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.name' % prefix) sys.exit(1) - if d.get('type') == None: + if d.get('type') is None: logger.error('Wrong configuration yaml file format: "%s" attribute does not exist.\n See example in "hawq register --help".' % '%s_Schema.type' % prefix) sys.exit(1) if D['FileFormat'] == 'Parquet': @@ -126,182 +127,263 @@ def option_parser_yml(yml_file): return 'AO', files, sizes, params['AO_Schema'], params['Distribution_Policy'], params['AO_FileLocations'], params['Bucketnum'] -def create_table(dburl, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number): - try: - query = "select count(*) from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - for row in rows: - if row[0] != 0: - return False - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) - - try: +class GpRegisterAccessor(object): + def __init__(self, conn): + self.conn = conn + rows = self.exec_query(""" + SELECT oid, datname, dat2tablespace, + pg_encoding_to_char(encoding) encoding + FROM pg_database WHERE datname=current_database()""") + self.dbid = rows[0]['oid'] + self.dbname = rows[0]['datname'] + self.spcid = rows[0]['dat2tablespace'] + self.dbencoding = rows[0]['encoding'] + self.dbversion = self.exec_query('select version()')[0]['version'] + + def exec_query(self, sql): + '''execute query and return dict result''' + return self.conn.query(sql).dictresult() + + def get_table_existed(self, tablename): + qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower() + return self.exec_query(qry)[0]['count'] == 1 + + def do_create_table(self, tablename, schema_info, fmt, distrbution_policy, file_locations, bucket_number): + if self.get_table_existed(tablename): + return False schema = ','.join([k['name'] + ' ' + k['type'] for k in schema_info]) fmt = 'ROW' if fmt == 'AO' else fmt if fmt == 'ROW': query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, checksum=%s, bucketnum=%s) %s;' - % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy)) + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['Checksum'], bucket_number, distrbution_policy)) else: # Parquet query = ('create table %s(%s) with (appendonly=true, orientation=%s, compresstype=%s, compresslevel=%s, pagesize=%s, rowgroupsize=%s, bucketnum=%s) %s;' - % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy)) - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() + % (tablename, schema, fmt, file_locations['CompressionType'], file_locations['CompressionLevel'], file_locations['PageSize'], file_locations['RowGroupSize'], bucket_number, distrbution_policy)) + self.conn.query(query) return True - except DatabaseError, ex: - print DatabaseError, ex - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) + def check_hash_type(self, tablename): + qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename + rows = self.exec_query(qry) + if len(rows) == 0: + logger.error('Table %s not found in table gp_distribution_policy.' % tablename) + sys.exit(1) + if rows[0]['attrnums']: + logger.error('Cannot register file(s) to a table which is hash distribuetd.') + sys.exit(1) -def get_seg_name(dburl, tablename, database, fmt): - try: - relname = '' + # pg_paqseg_# + def get_seg_name(self, tablename, database, fmt): tablename = tablename.split('.')[-1] query = ("select pg_class2.relname from pg_class as pg_class1, pg_appendonly, pg_class as pg_class2 " "where pg_class1.relname ='%s' and pg_class1.oid = pg_appendonly.relid and pg_appendonly.segrelid = pg_class2.oid;") % tablename - conn = dbconn.connect(dburl, True) - rows = dbconn.execSQL(conn, query) - conn.commit() - if not rows.rowcount: + rows = self.exec_query(query) + if len(rows) == 0: logger.error('table "%s" not found in db "%s"' % (tablename, database)) sys.exit(1) - for row in rows: - relname = row[0] - conn.close() - except DatabaseError, ex: - logger.error('Failed to run query "%s" with dbname "%s"' % (query, database)) - sys.exit(1) - if fmt == 'Parquet': - if relname.find("paq") == -1: - logger.error("table '%s' is not parquet format" % tablename) - sys.exit(1) - - return relname - - -def check_hash_type(dburl, tablename): - '''Check whether target table is hash distributed, in that case simple insertion does not work''' - try: - query = "select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;" % tablename - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - if not rows.rowcount: - logger.error('Table %s not found in table gp_distribution_policy.' % tablename) - sys.exit(1) - for row in rows: - if row[0]: - logger.error('Cannot register file(s) to a table which is hash distribuetd.') + relname = rows[0]['relname'] + if fmt == 'Parquet': + if relname.find('paq') == -1: + logger.error("table '%s' is not parquet format" % tablename) sys.exit(1) - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) + return relname + def get_distribution_policy_info(self, tablename): + query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() + rows = self.exec_query(query) + oid = rows[0]['oid'] + query = "select * from gp_distribution_policy where localoid = '%s';" % oid + rows = self.exec_query(query) + return rows[0]['attrnums'] -def get_metadata_from_database(dburl, tablename, seg_name): - '''Get the metadata to be inserted from hdfs''' - try: + def get_metadata_from_database(self, tablename, seg_name): query = 'select segno from pg_aoseg.%s;' % seg_name - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) - - firstsegno = rows.rowcount + 1 - - try: + firstsegno = len(self.exec_query(query)) + 1 # get the full path of correspoding file for target table query = ("select location, gp_persistent_tablespace_node.tablespace_oid, database_oid, relfilenode from pg_class, gp_persistent_relation_node, " "gp_persistent_tablespace_node, gp_persistent_filespace_node where relname = '%s' and pg_class.relfilenode = " "gp_persistent_relation_node.relfilenode_oid and gp_persistent_relation_node.tablespace_oid = gp_persistent_tablespace_node.tablespace_oid " "and gp_persistent_filespace_node.filespace_oid = gp_persistent_filespace_node.filespace_oid;") % tablename.split('.')[-1] - conn = dbconn.connect(dburl, False) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) - for row in rows: - tabledir = '/'.join([row[0].strip(), str(row[1]), str(row[2]), str(row[3]), '']) - return firstsegno, tabledir - - -def check_files_and_table_in_same_hdfs_cluster(filepath, tabledir): - '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' - if not filepath: - return - # check whether the files to be registered is in hdfs - filesystem = filepath.split('://') - if filesystem[0] != 'hdfs': - logger.error('Only support to register file(s) in hdfs') - sys.exit(1) - fileroot = filepath.split('/') - tableroot = tabledir.split('/') - # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020' - if fileroot[0:3] != tableroot[0:3]: - logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir)) - sys.exit(1) + D = self.exec_query(query)[0] + tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), '']) + return firstsegno, tabledir + + def update_catalog(self, query): + self.conn.query(query) + + +class HawqRegister(object): + def __init__(self, options, table, utility_conn, conn): + self.yml = options.yml_config + self.filepath = options.filepath + self.database = options.database + self.tablename = table + self.filesize = options.filesize + self.accessor = GpRegisterAccessor(conn) + self.utility_accessor = GpRegisterAccessor(utility_conn) + self.mode = self._init_mode(options.force, options.repair) + self._init() + + def _init_mode(self, force, repair): + def table_existed(): + return self.accessor.get_table_existed(self.tablename) + + if self.yml: + if force: + return 'force' + elif repair: + if not table_existed(): + logger.error('--repair mode asserts the table is already create.') + sys.exit(1) + return 'repair' + else: + return 'second_normal' + else: + return 'first' + def _init(self): + def check_hash_type(): + self.accessor.check_hash_type(self.tablename) -def get_files_in_hdfs(filepath): - '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files''' - files = [] - sizes = [] - hdfscmd = "hdfs dfs -test -e %s" % filepath - result = local_ssh(hdfscmd, logger) - if result != 0: - logger.error("Path '%s' does not exist in hdfs" % filepath) - sys.exit(1) - hdfscmd = "hdfs dfs -ls -R %s" % filepath - result, out, err = local_ssh_output(hdfscmd) - outlines = out.splitlines() - # recursively search all the files under path 'filepath' - for line in outlines: - lineargs = line.split() - if len(lineargs) == 8 and lineargs[0].find ("d") == -1: - files.append(lineargs[7]) - sizes.append(int(lineargs[4])) - if len(files) == 0: - logger.error("Dir '%s' is empty" % filepath) - sys.exit(1) - return files, sizes - - -def check_parquet_format(files): - '''Check whether the file to be registered is parquet format''' - for f in files: - hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f - rc, out, err = local_ssh_output(hdfscmd) - if out == '0': - continue - hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f - result1 = local_ssh(hdfscmd, logger) - hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f - result2 = local_ssh(hdfscmd, logger) - if result1 or result2: - logger.error('File %s is not parquet format' % f) + # check conflicting distributed policy + def check_distribution_policy(): + if self.distribution_policy.startswith('DISTRIBUTED BY'): + if len(self.files) % self.bucket_number != 0: + logger.error('Files to be registered must be multiple times to the bucket number of hash table.') + sys.exit(1) + + def create_table(): + return self.accessor.do_create_table(self.tablename, self.schema, self.file_format, self.distribution_policy, self.file_locations, self.bucket_number) + + def get_seg_name(): + return self.utility_accessor.get_seg_name(self.tablename, self.database, self.file_format) + + def get_metadata(): + return self.accessor.get_metadata_from_database(self.tablename, self.seg_name) + + if self.yml: + self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml) + self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' + check_distribution_policy() + if self.mode != 'force': + if not create_table(): + self.mode = 'second_exist' + else: + self.file_format = 'Parquet' + check_hash_type() # Usage1 only support randomly distributed table + if not self.filepath: + sys.exit(0) + + if self.mode == 'repair': + # TODO + # check distribution policy consistency + # check bucketnum, pagesize, rowgroupsize, etc + # check filesize smaller + pass + + self.seg_name = get_seg_name() + self.firstsegno, self.tabledir = get_metadata() + + if self.mode == 'second_exist': + if self.tabledir.strip('/') == self.filepath.strip('/'): + logger.error('Files to be registeted in this case should not be the same with table path.') + sys.exit(1) + + self.do_not_move, self.files_update, self.sizes_update = False, [], [] + if self.mode == 'force': + existed_files, _ = self._get_files_in_hdfs(self.tabledir) + if len(self.files) == len(existed_files): + if sorted(self.files) != sorted(existed_files): + logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.') + sys.exit(1) + else: + self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes + self.files, self.sizes = [], [] + else: + files_old, sizes_old = [f for f in self.files], [sz for sz in self.sizes] + for k, f in enumerate(files_old): + if f in existed_files: + self.files_update.append(files_old[k]) + self.sizes_update.append(sizes_old[k]) + self.files.remove(files_old[k]) + self.sizes.remove(sizes_old[k]) + + self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir) + + if not self.yml: + self.files, self.sizes = self._get_files_in_hdfs(self.filepath) + print 'New file(s) to be registered: ', self.files + if self.files_update: + print 'Catalog info need to be updated for these files: ', self.files_update + + if self.filesize: + if len(self.files) != 1: + logger.error('-e option is only supported with single file case.') + sys.exit(1) + self.sizes = [self.filesize] + + if self.file_format == 'Parquet': + self._check_parquet_format(self.files) + + def _check_files_and_table_in_same_hdfs_cluster(self, filepath, tabledir): + '''Check whether all the files refered by 'filepath' and the location corresponding to the table are in the same hdfs cluster''' + if not filepath: + return + # check whether the files to be registered is in hdfs + filesystem = filepath.split('://') + if filesystem[0] != 'hdfs': + logger.error('Only support to register file(s) in hdfs') + sys.exit(1) + fileroot = filepath.split('/') + tableroot = tabledir.split('/') + # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020' + if fileroot[0:3] != tableroot[0:3]: + logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir)) sys.exit(1) + def _get_files_in_hdfs(self, filepath): + '''Get all the files refered by 'filepath', which could be a file or a directory containing all the files''' + files, sizes = [], [] + hdfscmd = "hdfs dfs -test -e %s" % filepath + result = local_ssh(hdfscmd, logger) + if result != 0: + logger.error("Path '%s' does not exist in hdfs" % filepath) + sys.exit(1) + hdfscmd = "hdfs dfs -ls -R %s" % filepath + result, out, err = local_ssh_output(hdfscmd) + outlines = out.splitlines() + # recursively search all the files under path 'filepath' + for line in outlines: + lineargs = line.split() + if len(lineargs) == 8 and lineargs[0].find("d") == -1: + files.append(lineargs[7]) + sizes.append(int(lineargs[4])) + if len(files) == 0: + logger.error("Dir '%s' is empty" % filepath) + sys.exit(1) + return files, sizes -def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, normal): - '''Move file(s) in src path into the folder correspoding to the target table''' - if normal: - segno = firstsegno + def _check_parquet_format(self, files): + '''Check whether the file to be registered is parquet format''' for f in files: + hdfscmd = 'hdfs dfs -du -h %s | head -c 1' % f + rc, out, err = local_ssh_output(hdfscmd) + if out == '0': + continue + hdfscmd = 'hdfs dfs -cat %s | head -c 4 | grep PAR1' % f + result1 = local_ssh(hdfscmd, logger) + hdfscmd = 'hdfs dfs -cat %s | tail -c 4 | grep PAR1' % f + result2 = local_ssh(hdfscmd, logger) + if result1 or result2: + logger.error('File %s is not parquet format' % f) + sys.exit(1) + + def _move_files_in_hdfs(self): + '''Move file(s) in src path into the folder correspoding to the target table''' + segno = self.firstsegno + for f in self.files: srcfile = f - dstfile = tabledir + str(segno) + dstfile = self.tabledir + str(segno) segno += 1 if srcfile != dstfile: hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile) @@ -310,190 +392,98 @@ def move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, nor if result != 0: logger.error('Fail to move %s to %s' % (srcfile, dstfile)) sys.exit(1) - else: - segno = firstsegno - for f in files: - dstfile = f - srcfile = tabledir + str(segno) - segno += 1 - if srcfile != dstfile: - hdfscmd = 'hdfs dfs -mv %s %s' % (srcfile, dstfile) - sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) - result = local_ssh(hdfscmd, logger) - if result != 0: - logger.error('Fail to move "%s" to "%s"' % (srcfile, dstfile)) - sys.exit(1) - -def insert_metadata_into_database(dburl, databasename, tablename, seg_name, firstsegno, tabledir, eofs, fmt): - '''Insert the metadata into database''' - try: - query = "set allow_system_table_mods='dml';" - if fmt == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) + def _modify_metadata(self, mode): + if mode == 'insert': + eofs = self.sizes + query = "set allow_system_table_mods='dml';" + if self.file_format == 'Parquet': + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1) + else: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) + query += ';' + elif mode == 'update': + eofs = self.sizes_update + query = "set allow_system_table_mods='dml';" + query += "begin transaction;" + segno_lst = [f.split('/')[-1] for f in self.files_update] + for i, eof in enumerate(eofs): + query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i]) + query += "end transaction;" + else: # update_and_insert + eofs = self.sizes + query = "set allow_system_table_mods='dml';" + query += "begin transaction;" + if self.file_format == 'Parquet': + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1) + else: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1) + for k, eof in enumerate(eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) + query += ';' + + segno_lst = [f.split('/')[-1] for f in self.files_update] + for i, eof in enumerate(self.sizes_update): + query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (self.seg_name, eof, segno_lst[i]) + query += "end transaction;" + return self.utility_accessor.update_catalog(query) + + def register(self): + if not self.do_not_move: + self._move_files_in_hdfs() + if (not self.do_not_move) and self.mode == 'force': + self._modify_metadata('update_and_insert') else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) - query += ';' - conn = dbconn.connect(dburl, True) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False) - sys.exit(1) - -def update_metadata_into_database(dburl, seg_name, files, eofs): - '''Update the catalog table in --force case''' - try: - query = "set allow_system_table_mods='dml';" - query += "begin transaction;" - segno_lst = [f.split('/')[-1] for f in files] - for i, eof in enumerate(eofs): - query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i]) - query += "end transaction;" - conn = dbconn.connect(dburl, True) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - sys.exit(1) - - -def update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, eofs, fmt, update_files, update_eofs): - '''Insert and update the catalog table in --force case''' + if self.mode == 'force': + self._modify_metadata('update') + else: + self._modify_metadata('insert') + logger.info('Hawq Register Succeed.') + + +def main(options, args): + def connectdb(options): + ''' + Trying to connect database, return a connection object. + If failed to connect, raise a pg.InternalError + ''' + url = dbconn.DbURL(hostname=options.host, port=options.port, + dbname=options.database, username=options.user) + logger.info('try to connect database %s:%s %s' % (url.pghost, url.pgport, url.pgdb)) + utility_conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport, + user=url.pguser, passwd=url.pgpass, opt='-c gp_session_role=utility') + conn = pg.connect(dbname=url.pgdb, host=url.pghost, port=url.pgport, + user=url.pguser, passwd=url.pgpass) + return utility_conn, conn + + # connect db try: - query = "set allow_system_table_mods='dml';" - query += "begin transaction;" - if fmt == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) - else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (seg_name, firstsegno, eofs[0], -1, -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) - query += ';' - - segno_lst = [f.split('/')[-1] for f in update_files] - for i, eof in enumerate(update_eofs): - query += "update pg_aoseg.%s set eof = '%s' where segno = '%s';" % (seg_name, eof, segno_lst[i]) - query += "end transaction;" - conn = dbconn.connect(dburl, True) - rows = dbconn.execSQL(conn, query) - conn.commit() - conn.close() - except DatabaseError, ex: - logger.error('Failed to execute query "%s"' % query) - move_files_in_hdfs(databasename, tablename, files, firstsegno, tabledir, False) - sys.exit(1) + utility_conn, conn = connectdb(options) + except pg.InternalError: + logger.error('Fail to connect to database, this script can only be run when database is up.') + return 1 + # register + ins = HawqRegister(options, args[0], utility_conn, conn) + ins.register() + conn.close() if __name__ == '__main__': - parser = option_parser() options, args = parser.parse_args() - - if len(args) != 1 or ((options.yml_config or options.force or options.repair) and options.filepath) or (options.force and options.repair): + if len(args) != 1 or (options.force and options.repair): parser.print_help(sys.stderr) sys.exit(1) - if local_ssh('hdfs', logger): - logger.error('command "hdfs" is not available.') + if (options.yml_config or options.force or options.repair) and options.filepath: + parser.print_help(sys.stderr) sys.exit(1) - - dburl = dbconn.DbURL(hostname = options.host, port = options.port, username = options.user, dbname = options.database) - filepath, database, tablename = options.filepath, options.database, args[0] - - second_normal_mode, second_exist_mode, force_mode, repair_mode = False, False, False, False - if options.yml_config: # Usage2 - if options.force: - force_mode = True - elif options.repair: - repair_mode = True - else: - second_normal_mode = True - fileformat, files, sizes, schema, distribution_policy, file_locations, bucket_number = option_parser_yml(options.yml_config) - filepath = files[0][:files[0].rfind('/')] if files else '' - # check conflicting distributed policy - if distribution_policy.startswith('DISTRIBUTED BY'): - if len(files) % bucket_number != 0: - logger.error('Files to be registered must be multiple times to the bucket number of hash table.') - sys.exit(1) - if not force_mode: - if not create_table(dburl, tablename, schema, fileformat, distribution_policy, file_locations, bucket_number): - second_normal_mode, second_exist_mode = False, True - else: - fileformat = 'Parquet' - check_hash_type(dburl, tablename) # Usage1 only support randomly distributed table - - if repair_mode: - # check distribution policy consistency - # check bucketnum, pagesize, rowgroupsize, etc - # check filesize smaller - pass - - # check filepath - if not filepath: - sys.exit(0) - - seg_name = get_seg_name(dburl, tablename, database, fileformat) - firstsegno, tabledir = get_metadata_from_database(dburl, tablename, seg_name) - - if second_exist_mode: - if tabledir.strip('/') == filepath.strip('/'): - logger.error('Files to be registered in this case should not be the same with table path.') - sys.exit(1) - - do_not_move, files_update, sizes_update = False, [], [] - if force_mode: - existed_files, _ = get_files_in_hdfs(tabledir) - if len(files) == len(existed_files): - if sorted(files) != sorted(existed_files): - logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.') - sys.exit(1) - else: - do_not_move, files_update, sizes_update = True, files, sizes - files, sizes = [], [] - else: - files_old, sizes_old = [f for f in files], [sz for sz in sizes] - for k, f in enumerate(files_old): - if f in existed_files: - files_update.append(files_old[k]) - sizes_update.append(sizes_old[k]) - files.remove(files_old[k]) - sizes.remove(sizes_old[k]) - - check_files_and_table_in_same_hdfs_cluster(filepath, tabledir) - - if not options.yml_config: - files, sizes = get_files_in_hdfs(filepath) - print 'New file(s) to be registered:', files - if files_update: - print 'Files(s) catalog info need to be update:', files_update - - # set specified eofs - if options.filesize: - if len(files) != 1: - logger.error('-e option is only supported with single file case.') - sys.exit(1) - sizes = [options.filesize] - - if fileformat == 'Parquet': - check_parquet_format(files) - if not do_not_move: - move_files_in_hdfs(database, tablename, files, firstsegno, tabledir, True) - - if (not do_not_move) and force_mode: - update_insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat, files_update, sizes_update) - else: - if force_mode: - update_metadata_into_database(dburl, seg_name, files_update, sizes_update) - else: - insert_metadata_into_database(dburl, database, tablename, seg_name, firstsegno, tabledir, sizes, fileformat) - - logger.info('Hawq Register Succeed.') + if local_ssh('hdfs'): + logger.error('Command "hdfs" is not available.') + sys.exit(1) + main(options, args)
