Repository: incubator-hawq Updated Branches: refs/heads/master 981c0a939 -> ef2aef879
HAWQ-1034. Implement --repair option for hawq register. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ef2aef87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ef2aef87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ef2aef87 Branch: refs/heads/master Commit: ef2aef87958941082a016afeea45b7bbcccb9779 Parents: 637f9d5 Author: xunzhang <[email protected]> Authored: Sat Sep 17 20:20:43 2016 +0800 Committer: Lili Ma <[email protected]> Committed: Sun Sep 18 14:39:54 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 115 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 99 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ef2aef87/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index ee5275b..ffae437 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -54,8 +54,8 @@ def option_parser(): parser.add_option('-f', '--filepath', dest='filepath', help='file name in HDFS') parser.add_option('-e', '--eof', dest='filesize', type='int', default=0, help='eof of the file to be registered') parser.add_option('-c', '--config', dest='yml_config', default='', help='configuration file in YAML format') - parser.add_option('--force', action='store_true', default=False) - parser.add_option('--repair', action='store_true', default=False) + parser.add_option('-F', '--force', dest='force', action='store_true', default=False) + parser.add_option('-R', '--repair', dest='repair', action='store_true', default=False) return parser @@ -166,7 +166,7 @@ class GpRegisterAccessor(object): qry = """select attrnums from gp_distribution_policy, pg_class where pg_class.relname = '%s' and pg_class.oid = gp_distribution_policy.localoid;""" % tablename rows = self.exec_query(qry) if len(rows) == 0: - logger.error('Table %s not found in table gp_distribution_policy.' % tablename) + logger.error('Table %s is not an append-only table. There is no record in gp_distribution_policy table.' % tablename) sys.exit(1) if rows[0]['attrnums']: logger.error('Cannot register file(s) to a table which is hash distribuetd.') @@ -196,6 +196,14 @@ class GpRegisterAccessor(object): rows = self.exec_query(query) return rows[0]['attrnums'] + def get_bucket_number(self, tablename): + query = "select oid from pg_class where relname = '%s';" % tablename.split('.')[-1].lower() + rows = self.exec_query(query) + oid = rows[0]['oid'] + query = "select * from gp_distribution_policy where localoid = '%s';" % oid + rows = self.exec_query(query) + return rows[0]['bucketnum'] + def get_metadata_from_database(self, tablename, seg_name): query = 'select segno from pg_aoseg.%s;' % seg_name firstsegno = len(self.exec_query(query)) + 1 @@ -233,7 +241,7 @@ class HawqRegister(object): return 'force' elif repair: if not table_existed(): - logger.error('--repair mode asserts the table is already create.') + logger.error('--repair mode asserts the table has been already created.') sys.exit(1) return 'repair' else: @@ -261,32 +269,76 @@ class HawqRegister(object): def get_metadata(): return self.accessor.get_metadata_from_database(self.tablename, self.seg_name) + def get_distribution_policy(): + return self.accessor.get_distribution_policy_info(self.tablename) + + def check_policy_consistency(): + policy = get_distribution_policy() # "" or "{1,3}" + if policy is None: + if ' '.join(self.distribution_policy.strip().split()).lower() == 'distributed randomly': + return + else: + logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) + sys.exit(1) + tmp_dict = {} + for i, d in enumerate(self.schema): + tmp_dict[d['name']] = i + 1 + # 'DISTRIBUETD BY (1,3)' -> {1,3} + cols = self.distribution_policy.strip().split()[-1].strip('(').strip(')').split(',') + original_policy = ','.join([str(tmp_dict[col]) for col in cols]) + if policy.strip('{').strip('}') != original_policy: + logger.error('Distribution policy of %s from yaml file is not consistent with the policy of existing table.' % self.tablename) + sys.exit(1) + + def check_bucket_number(): + def get_bucket_number(): + return self.accessor.get_bucket_number(self.tablename) + + if self.bucket_number != get_bucket_number(): + logger.error('Bucket number of %s is not consistent with previous bucket number.' % self.tablename) + sys.exit(1) + if self.yml: self.file_format, self.files, self.sizes, self.schema, self.distribution_policy, self.file_locations, self.bucket_number = option_parser_yml(self.yml) self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' check_distribution_policy() - if self.mode != 'force': + if self.mode != 'force' and self.mode != 'repair': if not create_table(): self.mode = 'second_exist' else: self.file_format = 'Parquet' check_hash_type() # Usage1 only support randomly distributed table if not self.filepath: + if self.mode == 'first': + logger('Please specify filepath with -f option.') + else: + logger.info('Hawq Register Succeed.') sys.exit(0) - if self.mode == 'repair': - # TODO - # check distribution policy consistency - # check bucketnum, pagesize, rowgroupsize, etc - # check filesize smaller - pass - self.seg_name = get_seg_name() self.firstsegno, self.tabledir = get_metadata() + if self.mode == 'repair': + if self.tabledir.strip('/') != self.filepath.strip('/'): + logger.error("In repair mode, file path from yaml file should be the same with table's path.") + sys.exit(1) + check_policy_consistency() + check_bucket_number() + existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath) + existed_info = {} + for k, fn in enumerate(existed_files): + existed_info[fn] = existed_sizes[k] + for k, fn in enumerate(self.files): + if fn not in existed_files: + logger.error('Can not register in repair mode since giving non-existing file: %s.' % fn) + sys.exit(1) + if self.sizes[k] > existed_info[fn]: + logger.error('Can not register in repair mode since giving larger file size: %s' % self.sizes[k]) + sys.exit(1) + if self.mode == 'second_exist': if self.tabledir.strip('/') == self.filepath.strip('/'): - logger.error('Files to be registeted in this case should not be the same with table path.') + logger.error('Files to be registered should not be the same with table path.') sys.exit(1) self.do_not_move, self.files_update, self.sizes_update = False, [], [] @@ -294,7 +346,7 @@ class HawqRegister(object): existed_files, _ = self._get_files_in_hdfs(self.tabledir) if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): - logger.error('In this case, you should include previous table files.\nOtherwise you should drop the previous table before registering --force.') + logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') sys.exit(1) else: self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes @@ -307,6 +359,14 @@ class HawqRegister(object): self.sizes_update.append(sizes_old[k]) self.files.remove(files_old[k]) self.sizes.remove(sizes_old[k]) + elif self.mode == 'repair': + self.do_not_move = True + self.files_update, self.sizes_update = [fn for fn in self.files], [sz for sz in self.sizes] + self.files_delete = [] + for fn in existed_files: + if fn not in self.files: + self.files_delete.append(fn) + self.files, self.sizes = [], [] self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir) @@ -332,13 +392,13 @@ class HawqRegister(object): # check whether the files to be registered is in hdfs filesystem = filepath.split('://') if filesystem[0] != 'hdfs': - logger.error('Only support to register file(s) in hdfs') + logger.error('Only support registering file(s) in hdfs.') sys.exit(1) fileroot = filepath.split('/') tableroot = tabledir.split('/') # check the root url of them. eg: for 'hdfs://localhost:8020/temp/tempfile', we check 'hdfs://localohst:8020' if fileroot[0:3] != tableroot[0:3]: - logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'" % (filepath, tabledir)) + logger.error("Files to be registered and the table are not in the same hdfs cluster.\nFile(s) to be registered: '%s'\nTable path in HDFS: '%s'." % (filepath, tabledir)) sys.exit(1) def _get_files_in_hdfs(self, filepath): @@ -393,6 +453,15 @@ class HawqRegister(object): logger.error('Fail to move %s to %s' % (srcfile, dstfile)) sys.exit(1) + def _delete_files_in_hdfs(self): + for fn in self.files_delete: + hdfscmd = 'hdfs dfs -rm %s' % fn + sys.stdout.write('hdfscmd: "%s"\n' % hdfscmd) + result = local_ssh(hdfscmd, logger) + if result != 0: + logger.error('Fail to delete %s ' % fn) + sys.exit(1) + def _modify_metadata(self, mode): if mode == 'insert': eofs = self.sizes @@ -434,6 +503,15 @@ class HawqRegister(object): query += "end transaction;" return self.utility_accessor.update_catalog(query) + def _delete_metadata(self): + query = "set allow_system_table_mods='dml';" + query += "begin transaction;" + segno_lst = [fn.strip().split('/')[-1] for fn in self.files_delete] + for seg in segno_lst: + query += "delete from pg_aoseg.%s where segno = '%s';" % (self.seg_name, seg) + query += "end transaction;" + return self.utility_accessor.update_catalog(query) + def register(self): if not self.do_not_move: self._move_files_in_hdfs() @@ -442,6 +520,11 @@ class HawqRegister(object): else: if self.mode == 'force': self._modify_metadata('update') + elif self.mode == 'repair': + self._modify_metadata('update') + if self.files_delete: + self._delete_files_in_hdfs() + self._delete_metadata() else: self._modify_metadata('insert') logger.info('Hawq Register Succeed.')
