Repository: incubator-hawq Updated Branches: refs/heads/master 9b7f90b74 -> fd0b25dd7
HAWQ-991. Bugfix for hawq register under --force mode. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fd0b25dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fd0b25dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fd0b25dd Branch: refs/heads/master Commit: fd0b25dd7aa9ce964dad8453d341b8ab97ec8f7c Parents: 9b7f90b Author: xunzhang <[email protected]> Authored: Tue Sep 27 15:34:36 2016 +0800 Committer: rlei <[email protected]> Committed: Tue Sep 27 15:58:22 2016 +0800 ---------------------------------------------------------------------- tools/bin/hawqregister | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd0b25dd/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 6c01b77..cc65491 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -117,7 +117,7 @@ def register_yaml_dict_check(D, table_column_num, src_tablename): yml_column_num = len(D['AO_Schema']) if table_column_num != yml_column_num and table_column_num > 0: logger.error('Column number of table in yaml file is not equals to the column number of table %s.' % src_tablename) - sys.exit(1) + sys.exit(1) class FailureHandler(object): @@ -178,7 +178,7 @@ class GpRegisterAccessor(object): def get_table_existed(self, tablename): qry = """select count(*) from pg_class where relname = '%s';""" % tablename.split('.')[-1].lower() return self.exec_query(qry)[0]['count'] == 1 - + def get_table_column_num(self, tablename): qry = """select count(*) from pg_attribute ,pg_class where pg_class.relname = '%s' and pg_class.oid = pg_attribute.attrelid and attnum > 0;""" % tablename.split('.')[-1].lower() return self.exec_query(qry)[0]['count'] @@ -277,6 +277,11 @@ class GpRegisterAccessor(object): tabledir = '/'.join([D['location'].strip(), str(D['tablespace_oid']), str(D['database_oid']), str(D['relfilenode']), '']) return firstsegno, tabledir + def get_metadata_from_seg_name(self, seg_name): + query = 'select segno, eof from pg_aoseg.%s;' % seg_name + rows = self.exec_query(query) + return [str(row['segno']) for row in rows], [int(row['eof']) for row in rows] + def get_database_encoding_indx(self, database): query = "select encoding from pg_database where datname = '%s';" % database return self.exec_query(query)[0]['encoding'] @@ -361,6 +366,9 @@ class HawqRegister(object): def get_metadata(): return self.accessor.get_metadata_from_database(self.tablename, self.seg_name) + def get_metadata_from_table(): + return self.accessor.get_metadata_from_seg_name(self.seg_name) + def get_distribution_policy(): return self.accessor.get_distribution_policy_info(self.tablename) @@ -544,7 +552,8 @@ class HawqRegister(object): logger.error("In repair mode, file path from yaml file should be the same with table's path.") self.failure_handler.rollback() sys.exit(1) - existed_files, existed_sizes = self._get_files_in_hdfs(self.filepath) + seg_list, existed_sizes = get_metadata_from_table() + existed_files = [self.tabledir + seg for seg in seg_list] existed_info = {} for k, fn in enumerate(existed_files): existed_info[fn] = existed_sizes[k] @@ -567,11 +576,12 @@ class HawqRegister(object): if not self.yml: check_no_regex_filepath([self.filepath]) self.files, self.sizes = self._get_files_in_hdfs(self.filepath) - + self.do_not_move, self.files_update, self.sizes_update = False, [], [] self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes] if self.mode == 'force': - existed_files, _ = self._get_files_in_hdfs(self.tabledir) + seg_list, _ = get_metadata_from_table() + existed_files = [self.tabledir + seg for seg in seg_list] if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') @@ -605,7 +615,7 @@ class HawqRegister(object): self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir) - print 'New file(s) to be registered: ', self.files + print 'New file(s) to be registered: ', self.newfiles if self.files_update: print 'Catalog info need to be updated for these files: ', self.files_update @@ -734,15 +744,15 @@ class HawqRegister(object): query += "begin transaction;" segno_lst = [f.split('/')[-1] for f in self.files] query += "delete from pg_aoseg.%s;" % (self.seg_name) - self.firstsegno = 1 + firstsegno = 1 if self.file_format == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1) for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1) + query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1, -1) for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) + query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) query += ';' query += "end transaction;" elif mode == 'update':
