Repository: incubator-hawq Updated Branches: refs/heads/master 85690dc92 -> 00286d7f2
HAWQ-1104. extract and register tupcount/varblocknumber/eofuncompressed Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/00286d7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/00286d7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/00286d7f Branch: refs/heads/master Commit: 00286d7f2e3528efc57f494f4269a146f5eff5d7 Parents: 85690dc Author: Wen Lin <[email protected]> Authored: Thu Oct 27 11:25:55 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Thu Oct 27 11:25:55 2016 +0800 ---------------------------------------------------------------------- .../test_hawq_register_usage2_case1.cpp | 64 +++++++---- src/test/feature/lib/sql_util.cpp | 11 ++ src/test/feature/lib/sql_util.h | 2 + tools/bin/hawqextract | 24 ++--- tools/bin/hawqregister | 106 +++++++++++++------ 5 files changed, 140 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp index 0b5eabf..8411f0c 100644 --- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp +++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp @@ -15,44 +15,54 @@ using hawq::test::Command; using hawq::test::HdfsConfig; TEST_F(TestHawqRegister, TestUsage2Case1EmptyTable) { - SQLUtility util; - util.execute("drop table if exists t9;"); - util.execute("create table t9(i int) with (appendonly=true, orientation=row) distributed randomly;"); - EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o t9.yml testhawqregister_testusage2case1emptytable.t9")); - EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c t9.yml testhawqregister_testusage2case1emptytable.nt9")); - util.query("select * from nt9;", 0); - EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml")); - util.execute("drop table t9;"); - util.execute("drop table nt9;"); + SQLUtility util; + util.execute("drop table if exists t9;"); + util.execute("create table t9(i int) with (appendonly=true, orientation=row) distributed randomly;"); + EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB + " -o t9.yml testhawqregister_testusage2case1emptytable.t9")); + EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c t9.yml testhawqregister_testusage2case1emptytable.nt9")); + util.query("select * from nt9;", 0); + std::string reloid = getTableOid("nt9"); + /* An empty table has no row in pg_aoseg.pg_aoseg_xxx table */ + util.query(hawq::test::stringFormat("select * from pg_aoseg.pg_aoseg_%s;", reloid.c_str()), 0); + EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml")); + util.execute("drop table t9;"); + util.execute("drop table nt9;"); } TEST_F(TestHawqRegister, TestUsage2Case1IncorrectYaml) { - SQLUtility util; - string filePath = util.getTestRootPath() + "/ManagementTool/"; - - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_pagesize.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_rowgroupsize.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_filesize.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "wrong_schema.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_checksum.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "wrong_dfs_url.yml xx")); - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_bucketnum.yml xx")); + SQLUtility util; + string filePath = util.getTestRootPath() + "/ManagementTool/"; + + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_pagesize.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_rowgroupsize.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_filesize.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "wrong_schema.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_checksum.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "wrong_dfs_url.yml xx")); + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "missing_bucketnum.yml xx")); } TEST_F(TestHawqRegister, TestUsage2Case1MismatchFileNumber) { - SQLUtility util; - string filePath = util.getTestRootPath() + "/ManagementTool/"; - EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "files_incomplete.yml xx")); + SQLUtility util; + string filePath = util.getTestRootPath() + "/ManagementTool/"; + EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) HAWQ_DB + " -c " + filePath + "files_incomplete.yml xx")); } TEST_F(TestHawqRegister, TestUsage2Case1Expected) { SQLUtility util; + string fmt_prefix; std::vector<string> create_table_matrix = {"distributed by (i)", "distributed randomly"}; std::vector<string> fmt_matrix = {"row", "parquet"}; int suffix=0; + for (auto & ddl : create_table_matrix) { for (auto & fmt : fmt_matrix) { + if (fmt.compare("row") == 0) + fmt_prefix = "aoseg"; + else + fmt_prefix = "paqseg"; + suffix++; auto t = hawq::test::stringFormat("t_usage2_case1_%s", std::to_string(suffix).c_str()); auto nt = hawq::test::stringFormat("nt_usage2_case1_%s", std::to_string(suffix).c_str()); @@ -63,10 +73,20 @@ TEST_F(TestHawqRegister, TestUsage2Case1Expected) { util.execute(hawq::test::stringFormat("create table %s(i int) with (appendonly=true, orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str())); util.execute(hawq::test::stringFormat("insert into %s select generate_series(1, 100);", t.c_str())); util.query(hawq::test::stringFormat("select * from %s;", t.c_str()), 100); + + // get pg_aoseg.pg_xxxseg_xxx table + std::string reloid1 = getTableOid(t.c_str()); + string result1 = util.getQueryResultSetString(hawq::test::stringFormat("select * from pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid1.c_str())); + EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("hawq extract -d %s -o t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, std::to_string(suffix).c_str(), t.c_str()))); EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("hawq register -d %s -c t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, std::to_string(suffix).c_str(), nt.c_str()))); util.query(hawq::test::stringFormat("select * from %s;", nt.c_str()), 100); + // check pg_aoseg.pg_xxxseg_xxx table + std::string reloid2 = getTableOid(nt.c_str()); + string result2 = util.getQueryResultSetString(hawq::test::stringFormat("select * from pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid2.c_str())); + EXPECT_EQ(result1, result2); + // hawq register -d hawq_feature_test -c t_usage2_case1_#.yml nt_usage2_case1_#, where nt_usage2_case1_# exists util.execute(hawq::test::stringFormat("drop table if exists %s;", t.c_str())); util.execute(hawq::test::stringFormat("create table %s(i int) with (appendonly=true, orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str())); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/lib/sql_util.cpp b/src/test/feature/lib/sql_util.cpp index f0568a2..6eb34eb 100644 --- a/src/test/feature/lib/sql_util.cpp +++ b/src/test/feature/lib/sql_util.cpp @@ -268,6 +268,17 @@ std::string SQLUtility::getQueryResult(const std::string &query) { return value; } +std::string SQLUtility::getQueryResultSetString(const std::string &query) { + const hawq::test::PSQLQueryResult &result = executeQuery(query); + std::vector<std::vector<string> > resultString = result.getRows(); + string resultStr; + for (auto row : result.getRows()) { + for (auto column : row) resultStr += column + "|"; + resultStr += "\n"; + } + return resultStr; +} + FilePath SQLUtility::splitFilePath(const string &filePath) const { FilePath fp; size_t found1 = filePath.find_last_of("/"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.h ---------------------------------------------------------------------- diff --git a/src/test/feature/lib/sql_util.h b/src/test/feature/lib/sql_util.h index 9bf1f90..e6d4768 100644 --- a/src/test/feature/lib/sql_util.h +++ b/src/test/feature/lib/sql_util.h @@ -91,6 +91,8 @@ class SQLUtility { // @return the query result std::string getQueryResult(const std::string &query); + std::string getQueryResultSetString(const std::string &query); + // execute expect error message // @param sql the given sql command // @param errmsg the expected sql error message http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqextract ---------------------------------------------------------------------- diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract index 3a6ef66..255db84 100644 --- a/tools/bin/hawqextract +++ b/tools/bin/hawqextract @@ -147,17 +147,17 @@ class GpMetadataAccessor: def get_aoseg_files(self, oid): ''' - Return rows in pg_aoseg_`oid` table, excluding - rows whose content id is -1. + Return rows in pg_aoseg_`oid` table Example: >>> accessor.get_aoseg_files(35709) - >>> [{'fileno':'1', 'filesize':'320'}, - ... {'fileno':'2', 'filesize':'880'}, - ... {'fileno':'3', 'filesize':'160'}] + >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 'varblockcount':'2', eofuncompressed:'320'}, + ... {'fileno':'2', 'filesize':'880', 'tupcount':'27', 'varblockcount':'3', eofuncompressed:'880'}, + ... {'fileno':'3', 'filesize':'160', 'tupcount':'5', 'varblockcount':'2', eofuncompressed:'160'}] ''' qry = """ - SELECT segno as fileno, eof as filesize + SELECT segno as fileno, eof as filesize, tupcount as tupcount, + varblockcount as varblockcount, eofuncompressed as eofuncompressed FROM pg_aoseg.pg_aoseg_%d ORDER by fileno; """ % oid @@ -170,12 +170,12 @@ class GpMetadataAccessor: Example: >>> accessor.get_paqseg_files(35709) - >>> [{'fileno':'1', 'filesize':'320'}, - ... {'fileno':'2', 'filesize':'880'}, - ... {'fileno':'3', 'filesize':'160'}] + >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 'eofuncompressed':'320'}, + ... {'fileno':'2', 'filesize':'880', 'tupcount':'27', 'eofuncompressed':'880'}, + ... {'fileno':'3', 'filesize':'160', 'tupcount':'5', 'eofuncompressed':'160'}] ''' qry = """ - SELECT segno as fileno, eof as filesize + SELECT segno as fileno, eof as filesize, tupcount, eofuncompressed FROM pg_aoseg.pg_paqseg_%d ORDER by fileno; """ % oid @@ -357,7 +357,7 @@ def extract_metadata(conn, tbname): relfilenode, f['fileno'] ) - files.append({'path': path, 'size': int(f['filesize'])}) + files.append({'path': path, 'size': int(f['filesize']), 'tupcount': int(f['tupcount']), 'varblockcount': int(f['varblockcount']), 'eofuncompressed': int(f['eofuncompressed'])}) return files def get_parquet_table_files(oid, relfilenode): @@ -375,7 +375,7 @@ def extract_metadata(conn, tbname): relfilenode, f['fileno'] ) - files.append({'path': path, 'size': int(f['filesize'])}) + files.append({'path': path, 'size': int(f['filesize']), 'tupcount': int(f['tupcount']), 'eofuncompressed': int(f['eofuncompressed'])}) return files def extract_AO_metadata(): http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 82ce274..46a9a8f 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -372,12 +372,15 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) - def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, distribution_policy, file_locations,\ + def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, file_locations,\ bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding): self.file_format = file_format self.files = files self.sizes = sizes + self.tupcounts = tupcounts + self.eofuncompresseds = eofuncompresseds + self.varblockcounts = varblockcounts self.src_table_name = tablename self.schema = schema self.distribution_policy = distribution_policy @@ -392,9 +395,6 @@ class HawqRegister(object): self.partitions_filepaths = partitions_filepaths self.partitions_filesizes = partitions_filesizes self.encoding = encoding - self.files_same_path = [] - self.sizes_same_path = [] - self.segnos_same_path = [] def _option_parser_yml(self, yml_file): import yaml @@ -414,7 +414,7 @@ class HawqRegister(object): partitions_checksum = [] partitions_compression_level = [] partitions_compression_type = [] - files, sizes = [], [] + files, sizes, tupcounts, eofuncompresseds, varblockcounts = [], [], [], [], [] if params['FileFormat'].lower() == 'parquet': Format = 'Parquet' @@ -437,11 +437,16 @@ class HawqRegister(object): partitions_filesizes.append([item['size'] for item in pfile]) partitions_name = [d['Name'] for d in params[Format_FileLocations]['Partitions']] if len(params[Format_FileLocations]['Files']): - files, sizes = [params['DFS_URL'] + d['path'] for d in params[Format_FileLocations]['Files']], [d['size'] for d in params[Format_FileLocations]['Files']] + for ele in params[Format_FileLocations]['Files']: + files.append(params['DFS_URL'] + ele['path']) + sizes.append(ele['size']) + tupcounts.append(ele['tupcount'] if ele.has_key('tupcount') else -1) + eofuncompresseds.append(ele['eofuncompressed'] if ele.has_key('eofuncompressed') else -1) + varblockcounts.append(ele['varblockcount'] if ele.has_key('varblockcount') else -1) + encoding = params['Encoding'] bucketNum = params['Bucketnum'] if params['Distribution_Policy'].startswith('DISTRIBUTED BY') else 6 - self._set_yml_dataa(Format, files, sizes, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], bucketNum, partitionby,\ - partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) + self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, params['TableName'], params['%s_Schema' % Format], params['Distribution_Policy'], params[Format_FileLocations], bucketNum, partitionby, partitions_constraint, partitions_name, partitions_compression_level, partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, encoding) # check conflicting distributed policy @@ -524,6 +529,12 @@ class HawqRegister(object): sys.exit(1) def _init(self): + self.files_same_path = [] + self.sizes_same_path = [] + self.tupcounts_same_path = [] + self.varblockcounts_same_path = [] + self.eofuncompresseds_same_path = [] + self.segnos_same_path = [] if self.yml: self._option_parser_yml(options.yml_config) self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' @@ -537,9 +548,6 @@ class HawqRegister(object): logger.error('-e option is only supported with single file case.') sys.exit(1) self.file_format = 'Parquet' - self.files_same_path = [] - self.sizes_same_path = [] - self.segnos_same_path = [] self._check_hash_type() # Usage1 only support randomly distributed table self.queries = "set allow_system_table_mods='dml';" self.queries += "begin transaction;" @@ -602,9 +610,10 @@ class HawqRegister(object): if not self.yml: self._check_no_regex_filepath([self.filepath]) self.files, self.sizes = self._get_files_in_hdfs(self.filepath) + self.tupcounts = self.eofuncompresseds = self.varblockcounts = [-1 for i in range(len(self.files))] - self.do_not_move, self.files_update, self.sizes_update = False, [], [] - self.files_append, self.sizes_append = [f for f in self.files], [sz for sz in self.sizes] + self.do_not_move, self.files_update, self.sizes_update, self.tupcounts_update, self.eofuncompresseds_update, self.varblockcounts_update = False, [], [], [], [], [] + self.files_append, self.sizes_append, self.tupcounts_append, self.eofuncompresseds_append, self.varblockcounts_append = [f for f in self.files], [sz for sz in self.sizes], [tc for tc in self.tupcounts], [eof for eof in self.eofuncompresseds], [v for v in self.varblockcounts] if self.mode == 'force': if len(self.files) == len(existed_files): if sorted(self.files) != sorted(existed_files): @@ -612,8 +621,8 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) else: - self.do_not_move, self.files_update, self.sizes_update = True, self.files, self.sizes - self.files_append, self.sizes_append = [],[] + self.do_not_move, self.files_update, self.sizes_update, self.tupcounts_update, self.eofuncompresseds_update, self.varblockcounts_update = True, self.files, self.sizes, self.tupcounts, self.eofuncompresseds, self.varblockcounts + self.files_append, self.sizes_append, self.tupcounts_append, self.eofuncompresseds_append, self.varblockcounts_append = [], [], [], [], [] elif len(self.files) < len(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') self.failure_handler.rollback() @@ -623,8 +632,14 @@ class HawqRegister(object): if f in existed_files: self.files_update.append(self.files[k]) self.sizes_update.append(self.sizes[k]) + self.tupcounts_update.append(self.tupcounts[k]) + self.eofuncompresseds_update.append(self.eofuncompresseds[k]) + self.varblockcounts_update.append(self.varblockcounts[k]) self.files_append.remove(self.files[k]) self.sizes_append.remove(self.sizes[k]) + self.tupcounts_append.remove(self.tupcounts[k]) + self.eofuncompresseds_append.remove(self.eofuncompresseds[k]) + self.varblockcounts_append.remove(self.varblockcounts[k]) if sorted(self.files_update) != sorted(existed_files): logger.error('In force mode, you should include existing table files in yaml configuration file. Otherwise you should drop the previous table before register --force.') self.failure_handler.rollback() @@ -729,6 +744,9 @@ class HawqRegister(object): if seg not in catalog_lst: self.files_same_path.append(self.files_update[k]) self.sizes_same_path.append(self.sizes_update[k]) + self.tupcounts_same_path.append(self.tupcounts_update[k]) + self.eofuncompresseds_same_path.append(self.eofuncompresseds_update[k]) + self.varblockcounts_same_path.append(self.varblockcounts_update[k]) if seg in new_catalog_lst: exist_catalog_lst.append(seg) for seg in update_segno_lst: @@ -778,45 +796,67 @@ class HawqRegister(object): append_eofs = self.sizes_append update_eofs = self.sizes_update same_path_eofs = self.sizes_same_path + append_tupcounts = self.tupcounts_append + update_tupcounts = self.tupcounts_update + same_path_tupcounts = self.tupcounts_same_path + append_eofuncompresseds = self.eofuncompresseds_append + update_eofuncompresseds = self.eofuncompresseds_update + same_path_eofuncompresseds = self.eofuncompresseds_same_path + append_varblockcounts = self.varblockcounts_append + update_varblockcounts = self.varblockcounts_update + same_path_varblockcounts = self.varblockcounts_same_path update_segno_lst = [f.split('/')[-1] for f in self.files_update] same_path_segno_lst = [seg for seg in self.segnos_same_path] query = "" + if mode == 'force': query += "delete from pg_aoseg.%s;" % (self.seg_name) if self.file_format == 'Parquet': if len(update_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1) - for k, update_eof in enumerate(update_eofs[1:]): - query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1) + query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], update_tupcounts[0], update_eofuncompresseds[0]) + k = 0 + for update_eof, update_tupcount, update_eofuncompressed in zip(update_eofs[1:], update_tupcounts[1:], update_eofuncompresseds[1:]): + query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, update_tupcount, update_eofuncompressed) + k += 1 query += ';' if len(same_path_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1) - for k, same_path_eof in enumerate(same_path_eofs[1:]): - query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_eofuncompresseds[0]) + k = 0 + for same_path_eof, same_path_tupcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_eofuncompresseds[1]): + query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_eofuncompressed) + k += 1 query += ';' segno += len(same_path_eofs) if len(append_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1) - for k, append_eof in enumerate(append_eofs[1:]): - query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_eofuncompresseds[0]) + k = 0 + for append_eof, append_tupcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]): + query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_eofuncompressed) + k += 1 query += ';' else: if len(update_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1, -1) - for k, update_eof in enumerate(update_eofs[1:]): - query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, -1, -1, -1) + query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' % (self.seg_name, update_segno_lst[0], update_eofs[0], update_tupcounts[0], update_varblockcounts[0], update_eofuncompresseds[0]) + k = 0 + for update_eof, update_tupcount, update_varblockcount, update_eofuncompresseds in zip(update_eofs[1:], update_tupcounts[1:], update_varblockcounts[1:], update_eofuncompresseds[1:]): + query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 1], update_eof, update_tupcount, update_varblockcount, update_eofuncompresseds) + k += 1 query += ';' if len(same_path_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1) - for k, same_path_eof in enumerate(same_path_eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, -1, -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_varblockcounts[0], same_path_eofuncompresseds[0]) + k = 0 + for same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_varblockcounts[1:], same_path_eofuncompresseds[1:]): + query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed) + k += 1 query += ';' segno += len(same_path_eofs) if len(append_eofs) > 0: - query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], -1, -1, -1) - for k, append_eof in enumerate(append_eofs[1:]): - query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, -1, -1, -1) + query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_varblockcounts[0], append_eofuncompresseds[0]) + k = 0 + for append_eof, append_tupcount, append_varblockcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_varblockcounts[1:], append_eofuncompresseds[1:]): + query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_varblockcount, append_eofuncompressed) + k += 1 query += ';' self.queries += query
