Repository: incubator-hawq Updated Branches: refs/heads/master 3bcc55791 -> e6fdfd3e3
HAWQ-1113. Fix bug when files in yaml is disordered, hawq register error in force mode. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e6fdfd3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e6fdfd3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e6fdfd3e Branch: refs/heads/master Commit: e6fdfd3e3817368317c7c72cf8f112ce9d22da37 Parents: 3bcc557 Author: Chunling Wang <[email protected]> Authored: Fri Oct 21 13:36:32 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Mon Oct 24 11:43:34 2016 +0800 ---------------------------------------------------------------------- .../test_hawq_register_usage2_case2.cpp | 10 +- tools/bin/hawqregister | 189 ++++++++++--------- 2 files changed, 103 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp index 4cce61f..492dbd3 100644 --- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp +++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case2.cpp @@ -246,12 +246,12 @@ TEST_F(TestHawqRegister, TestUsage2Case2TableExistNoData) { util.execute("drop table nt;"); } -TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlConfig) { +TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlConfig) { runYamlCaseForceMode("testusage2case2normalyamlconfig", "usage2case2/normal_yaml_config", 0, 50, 150); } -TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2NormalYamlNoUpdateConfig) { - runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 150); +TEST_F(TestHawqRegister, TestUsage2Case2NormalYamlNoUpdateConfig) { + runYamlCaseForceMode("testusage2case2normalyamlnoupdateconfig", "usage2case2/normal_yaml_no_update_config", 0, 50, 100); } TEST_F(TestHawqRegister, TestUsage2Case2FileNotIncludedInYamlConfig) { @@ -270,8 +270,8 @@ TEST_F(TestHawqRegister, TestUsage2Case2HDFSFilePathContainErrorSymbol) { runYamlCaseForceMode("testusage2case2hdfsfilepathcontainerrorsymbol", "usage2case2/contain_error_symbol"); } -TEST_F(TestHawqRegister, DISABLED_TestUsage2Case2ZeroEof) { - runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 150); +TEST_F(TestHawqRegister, TestUsage2Case2ZeroEof) { + runYamlCaseForceMode("testusage2case2zeroeof", "usage2case2/zero_eof", 0, 50, 143); } TEST_F(TestHawqRegister, TestUsage2Case2LargerEof) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e6fdfd3e/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 245d4d8..27b1e94 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -392,6 +392,9 @@ class HawqRegister(object): self.partitions_filepaths = partitions_filepaths self.partitions_filesizes = partitions_filesizes self.encoding = encoding + self.files_same_path = [] + self.sizes_same_path = [] + self.segnos_same_path = [] def _option_parser_yml(self, yml_file): import yaml @@ -558,14 +561,21 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) self.firstsegno, self.tabledir = self._get_metadata() - + + seg_list, existed_sizes = self._get_metadata_from_table() + existed_files = [self.tabledir + seg for seg in seg_list] + # check if file numbers in hdfs is consistent with the record count of pg_aoseg. + hdfs_file_no_lst = [f.split('/')[-1] for f in existed_files] + for k in range(1, self.firstsegno - 1): + if self.firstsegno - 1 > len(existed_files) or str(k) not in hdfs_file_no_lst: + logger.error("Hawq aoseg metadata doesn't consistent with file numbers in hdfs.") + self.failure_handler.rollback() + sys.exit(1) if self.mode == 'repair': if self.tabledir.strip('/') != self.filepath.strip('/'): logger.error("In repair mode, file path from yaml file should be the same with table's path.") self.failure_handler.rollback() sys.exit(1) - seg_list, existed_sizes = self._get_metadata_from_table() - existed_files = [self.tabledir + seg for seg in seg_list] existed_info = {} for k, fn in enumerate(existed_files): existed_info[fn] = existed_sizes[k] @@ -590,10 +600,8 @@ class HawqRegister(object): self.files, self.sizes = self._get_files_in_hdfs(self.filepath) self.do_not_move, self.files_update, self.sizes_update = False, [], [] - self.newfiles, self.newsizes = [f for f in self.files], [sz for sz in self.sizes] + self.files_append, self.sizes_append = [f for f in self.files], [sz for sz in self.sizes] if self.mode == 'force': - seg_list, _ = self._get_metadata_from_table() - existed_files = [self.tabledir + seg for seg in seg_list] if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') @@ -601,6 +609,7 @@ class HawqRegister(object): sys.exit(1) else: self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes + self.files_append, self.sizes_append = [],[] elif len(self.files) < len(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') self.failure_handler.rollback() @@ -610,8 +619,8 @@ class HawqRegister(object): if f in existed_files: self.files_update.append(self.files[k]) self.sizes_update.append(self.sizes[k]) - self.newfiles.remove(self.files[k]) - self.newsizes.remove(self.sizes[k]) + self.files_append.remove(self.files[k]) + self.sizes_append.remove(self.sizes[k]) if sorted(self.files_update) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') self.failure_handler.rollback() @@ -623,11 +632,11 @@ class HawqRegister(object): for fn in existed_files: if fn not in self.files: self.files_delete.append(fn) - self.files, self.sizes = [], [] + self.files_append, self.sizes_append = [], [] self._check_files_and_table_in_same_hdfs_cluster(self.filepath, self.tabledir) - print 'New file(s) to be registered: ', self.newfiles + print 'New file(s) to be registered: ', self.files_append if self.files_update: print 'Catalog info need to be updated for these files: ', self.files_update @@ -636,6 +645,7 @@ class HawqRegister(object): logger.error('-e option is only supported with single file case.') self.failure_handler.rollback() sys.exit(1) + self.sizes_append = [self.filesize] self.sizes = [self.filesize] self._check_sizes_valid() @@ -706,7 +716,31 @@ class HawqRegister(object): def _set_move_files_in_hdfs(self): segno = self.firstsegno - for f in self.newfiles: + # set self.files_same_path, self.sizes_same_path and self.segnos_same_path, which are for files existed in HDFS but not in catalog metadata + update_segno_lst = [f.split('/')[-1] for f in self.files_update] + catalog_lst = [str(i) for i in range(1, segno)] + new_catalog_lst = [str(i) for i in range(segno, len(self.files_update) + 1)] + exist_catalog_lst = [] + for k, seg in enumerate(update_segno_lst): + if seg not in catalog_lst: + self.files_same_path.append(self.files_update[k]) + self.sizes_same_path.append(self.sizes_update[k]) + if seg in new_catalog_lst: + exist_catalog_lst.append(seg) + for seg in update_segno_lst: + if seg not in catalog_lst: + if seg in exist_catalog_lst: + self.segnos_same_path.append(int(seg)) + else: + while (str(segno) in exist_catalog_lst): + segno += 1 + self.segnos_same_path.append(segno) + + for k, f in enumerate(self.files_same_path): + self.srcfiles.append(f) + self.dstfiles.append(self.tabledir + str(self.segnos_same_path[k])) + + for f in self.files_append: self.srcfiles.append(f) self.dstfiles.append(self.tabledir + str(segno)) segno += 1 @@ -736,68 +770,53 @@ class HawqRegister(object): sys.exit(1) def _set_modify_metadata(self, mode): - if mode == 'insert': - eofs = self.sizes - query = "" - if self.file_format == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1) - else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) - query += ';' - self.queries += query - elif mode == 'force': - eofs = self.sizes - segno_lst = [f.split('/')[-1] for f in self.files] - query = "delete from pg_aoseg.%s;" % (self.seg_name) - firstsegno = 1 - if self.file_format == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1) - else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, firstsegno, eofs[0], -1, -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (firstsegno + k + 1, eof, -1, -1, -1) - query += ';' - self.queries += query - elif mode == 'update': - eofs = self.sizes_update - query = "" - segno_lst = [f.split('/')[-1] for f in self.files_update] - if self.file_format == 'Parquet': - for i, eof in enumerate(eofs): - query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i]) - else: - for i, eof in enumerate(eofs): - query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i]) - self.queries += query - else: # update_and_insert - eofs = self.sizes - query = "" - if self.file_format == 'Parquet': - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1) - else: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, self.firstsegno, eofs[0], -1, -1, -1) - for k, eof in enumerate(eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (self.firstsegno + k + 1, eof, -1, -1, -1) - query += ';' - - segno_lst = [f.split('/')[-1] for f in self.files_update] - if self.file_format == 'Parquet': - for i, eof in enumerate(self.sizes_update): - query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, segno_lst[i]) - else: - for i, eof in enumerate(self.sizes_update): - query += "update pg_aoseg.%s set eof = '%s', tupcount = '%s', varblockcount = '%s', eofuncompressed = '%s' where segno = '%s';" % (self.seg_name, eof, -1, -1, -1, segno_lst[i]) - self.queries += query + segno = self.firstsegno + append_eofs = self.sizes_append + update_eofs = self.sizes_update + same_path_eofs = self.sizes_same_path + update_segno_lst = [f.split('/')[-1] for f in self.files_update] + same_path_segno_lst = [seg for seg in self.segnos_same_path] + query = "" + if mode == 'force': + query += "delete from pg_aoseg.%s;" % (self.seg_name) + + if self.file_format == 'Parquet': + if len(update_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1) + for k, update_eof in enumerate(update_eofs[1:]): + query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1) + query += ';' + if len(same_path_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1) + for k, same_path_eof in enumerate(same_path_eofs[1:]): + query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1) + query += ';' + segno += len(same_path_eofs) + if len(append_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1) + for k, append_eof in enumerate(append_eofs[1:]): + query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1) + query += ';' + else: + if len(update_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1, -1) + for k, update_eof in enumerate(update_eofs[1:]): + query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1, -1) + query += ';' + if len(same_path_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1) + for k, same_path_eof in enumerate(same_path_eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1, -1) + query += ';' + segno += len(same_path_eofs) + if len(append_eofs) > 0: + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1, -1) + for k, append_eof in enumerate(append_eofs[1:]): + query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1, -1) + query += ';' + self.queries += query - def _modify_metadata(self, mode): + def _modify_metadata(self): try: self.utility_accessor.update_catalog(self.queries) except pg.DatabaseError as e: @@ -817,31 +836,19 @@ class HawqRegister(object): def _prepare_register(self): if not self.do_not_move: self._set_move_files_in_hdfs() - if (not self.do_not_move) and self.mode == 'force': + if self.mode == 'force' or self.mode == 'repair': self._set_modify_metadata('force') else: - if self.mode == 'force': - self._set_modify_metadata('force') - elif self.mode == 'repair': - self._set_modify_metadata('update') - else: - self._set_modify_metadata('insert') + self._set_modify_metadata('insert') def register(self): if not self.do_not_move: self._move_files_in_hdfs() - if (not self.do_not_move) and self.mode == 'force': - self._modify_metadata('force') - else: - if self.mode == 'force': - self._modify_metadata('force') - elif self.mode == 'repair': - self._modify_metadata('update') - if self.files_delete: - self._delete_files_in_hdfs() - self._delete_metadata() - else: - self._modify_metadata('insert') + self._modify_metadata() + if self.mode == 'repair': + if self.files_delete: + self._delete_files_in_hdfs() + self._delete_metadata() logger.info('Hawq Register Succeed.')
