Repository: incubator-hawq
Updated Branches:
  refs/heads/master 85690dc92 -> 00286d7f2


HAWQ-1104. extract and register tupcount/varblocknumber/eofuncompressed


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/00286d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/00286d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/00286d7f

Branch: refs/heads/master
Commit: 00286d7f2e3528efc57f494f4269a146f5eff5d7
Parents: 85690dc
Author: Wen Lin <[email protected]>
Authored: Thu Oct 27 11:25:55 2016 +0800
Committer: Wen Lin <[email protected]>
Committed: Thu Oct 27 11:25:55 2016 +0800

----------------------------------------------------------------------
 .../test_hawq_register_usage2_case1.cpp         |  64 +++++++----
 src/test/feature/lib/sql_util.cpp               |  11 ++
 src/test/feature/lib/sql_util.h                 |   2 +
 tools/bin/hawqextract                           |  24 ++---
 tools/bin/hawqregister                          | 106 +++++++++++++------
 5 files changed, 140 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
----------------------------------------------------------------------
diff --git 
a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp 
b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
index 0b5eabf..8411f0c 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_usage2_case1.cpp
@@ -15,44 +15,54 @@ using hawq::test::Command;
 using hawq::test::HdfsConfig;
 
 TEST_F(TestHawqRegister, TestUsage2Case1EmptyTable) {
-  SQLUtility util;
-  util.execute("drop table if exists t9;");
-  util.execute("create table t9(i int) with (appendonly=true, orientation=row) 
distributed randomly;");
-  EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) HAWQ_DB 
+ " -o t9.yml testhawqregister_testusage2case1emptytable.t9"));
-  EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c t9.yml testhawqregister_testusage2case1emptytable.nt9"));
-  util.query("select * from nt9;", 0);
-  EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml"));
-  util.execute("drop table t9;");
-  util.execute("drop table nt9;");
+    SQLUtility util;
+    util.execute("drop table if exists t9;");
+    util.execute("create table t9(i int) with (appendonly=true, 
orientation=row) distributed randomly;");
+    EXPECT_EQ(0, Command::getCommandStatus("hawq extract -d " + (string) 
HAWQ_DB + " -o t9.yml testhawqregister_testusage2case1emptytable.t9"));
+    EXPECT_EQ(0, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c t9.yml testhawqregister_testusage2case1emptytable.nt9"));
+    util.query("select * from nt9;", 0);
+    std::string reloid = getTableOid("nt9");
+    /* An empty table has no row in pg_aoseg.pg_aoseg_xxx table */
+    util.query(hawq::test::stringFormat("select * from pg_aoseg.pg_aoseg_%s;", 
reloid.c_str()), 0);
+    EXPECT_EQ(0, Command::getCommandStatus("rm -rf t9.yml"));
+    util.execute("drop table t9;");
+    util.execute("drop table nt9;");
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case1IncorrectYaml) {
-  SQLUtility util;
-  string filePath = util.getTestRootPath() + "/ManagementTool/";
-
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_pagesize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_rowgroupsize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_filesize.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "wrong_schema.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_checksum.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "wrong_dfs_url.yml xx"));
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_bucketnum.yml xx"));
+    SQLUtility util;
+    string filePath = util.getTestRootPath() + "/ManagementTool/";
+
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_pagesize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_rowgroupsize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_filesize.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "wrong_schema.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_checksum.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "wrong_dfs_url.yml xx"));
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "missing_bucketnum.yml xx"));
 }
 
 TEST_F(TestHawqRegister, TestUsage2Case1MismatchFileNumber) {
-  SQLUtility util;
-  string filePath = util.getTestRootPath() + "/ManagementTool/";
-  EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "files_incomplete.yml xx"));
+    SQLUtility util;
+    string filePath = util.getTestRootPath() + "/ManagementTool/";
+    EXPECT_EQ(1, Command::getCommandStatus("hawq register -d " + (string) 
HAWQ_DB + " -c " + filePath + "files_incomplete.yml xx"));
 }
 
 
 TEST_F(TestHawqRegister, TestUsage2Case1Expected) {
     SQLUtility util;
+    string fmt_prefix;
     std::vector<string> create_table_matrix = {"distributed by (i)", 
"distributed randomly"};
     std::vector<string> fmt_matrix = {"row", "parquet"};
     int suffix=0;
+
     for (auto & ddl : create_table_matrix) {
         for (auto & fmt : fmt_matrix) {
+            if (fmt.compare("row") == 0)
+                fmt_prefix = "aoseg";
+            else
+                fmt_prefix = "paqseg";
+
             suffix++;
             auto t = hawq::test::stringFormat("t_usage2_case1_%s", 
std::to_string(suffix).c_str());
             auto nt = hawq::test::stringFormat("nt_usage2_case1_%s", 
std::to_string(suffix).c_str());
@@ -63,10 +73,20 @@ TEST_F(TestHawqRegister, TestUsage2Case1Expected) {
             util.execute(hawq::test::stringFormat("create table %s(i int) with 
(appendonly=true, orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str()));
             util.execute(hawq::test::stringFormat("insert into %s select 
generate_series(1, 100);", t.c_str()));
             util.query(hawq::test::stringFormat("select * from %s;", 
t.c_str()), 100);
+
+            // get pg_aoseg.pg_xxxseg_xxx table
+            std::string reloid1 = getTableOid(t.c_str());
+            string result1 = 
util.getQueryResultSetString(hawq::test::stringFormat("select * from 
pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid1.c_str()));
+
             EXPECT_EQ(0, 
Command::getCommandStatus(hawq::test::stringFormat("hawq extract -d %s -o 
t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, 
std::to_string(suffix).c_str(), t.c_str())));
             EXPECT_EQ(0, 
Command::getCommandStatus(hawq::test::stringFormat("hawq register -d %s -c 
t_%s.yml testhawqregister_testusage2case1expected.%s", HAWQ_DB, 
std::to_string(suffix).c_str(), nt.c_str())));
             util.query(hawq::test::stringFormat("select * from %s;", 
nt.c_str()), 100);
 
+            // check pg_aoseg.pg_xxxseg_xxx table
+            std::string reloid2 = getTableOid(nt.c_str());
+            string result2 = 
util.getQueryResultSetString(hawq::test::stringFormat("select * from 
pg_aoseg.pg_%s_%s order by segno;", fmt_prefix.c_str(), reloid2.c_str()));
+            EXPECT_EQ(result1, result2);
+
             // hawq register -d hawq_feature_test -c t_usage2_case1_#.yml 
nt_usage2_case1_#, where nt_usage2_case1_# exists
             util.execute(hawq::test::stringFormat("drop table if exists %s;", 
t.c_str()));
             util.execute(hawq::test::stringFormat("create table %s(i int) with 
(appendonly=true, orientation=%s) %s;", t.c_str(), fmt.c_str(), ddl.c_str()));

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/lib/sql_util.cpp 
b/src/test/feature/lib/sql_util.cpp
index f0568a2..6eb34eb 100644
--- a/src/test/feature/lib/sql_util.cpp
+++ b/src/test/feature/lib/sql_util.cpp
@@ -268,6 +268,17 @@ std::string SQLUtility::getQueryResult(const std::string 
&query) {
   return value;
 }
 
+std::string SQLUtility::getQueryResultSetString(const std::string &query) {
+  const hawq::test::PSQLQueryResult &result = executeQuery(query);
+  std::vector<std::vector<string> > resultString = result.getRows();
+  string resultStr;
+  for (auto row : result.getRows()) {
+    for (auto column : row) resultStr += column + "|";
+    resultStr += "\n";
+  }
+  return resultStr;
+}
+
 FilePath SQLUtility::splitFilePath(const string &filePath) const {
   FilePath fp;
   size_t found1 = filePath.find_last_of("/");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/src/test/feature/lib/sql_util.h
----------------------------------------------------------------------
diff --git a/src/test/feature/lib/sql_util.h b/src/test/feature/lib/sql_util.h
index 9bf1f90..e6d4768 100644
--- a/src/test/feature/lib/sql_util.h
+++ b/src/test/feature/lib/sql_util.h
@@ -91,6 +91,8 @@ class SQLUtility {
   // @return the query result
   std::string getQueryResult(const std::string &query);
 
+  std::string getQueryResultSetString(const std::string &query);
+
   // execute expect error message
   // @param sql the given sql command
   // @param errmsg the expected sql error message

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqextract
----------------------------------------------------------------------
diff --git a/tools/bin/hawqextract b/tools/bin/hawqextract
index 3a6ef66..255db84 100644
--- a/tools/bin/hawqextract
+++ b/tools/bin/hawqextract
@@ -147,17 +147,17 @@ class GpMetadataAccessor:
 
     def get_aoseg_files(self, oid):
         '''
-        Return rows in pg_aoseg_`oid` table, excluding
-        rows whose content id is -1.
+        Return rows in pg_aoseg_`oid` table
 
         Example:
         >>> accessor.get_aoseg_files(35709)
-        >>> [{'fileno':'1', 'filesize':'320'},
-        ...  {'fileno':'2', 'filesize':'880'},
-        ...  {'fileno':'3', 'filesize':'160'}]
+        >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 
'varblockcount':'2', eofuncompressed:'320'},
+        ...  {'fileno':'2', 'filesize':'880', 'tupcount':'27', 
'varblockcount':'3', eofuncompressed:'880'},
+        ...  {'fileno':'3', 'filesize':'160', 'tupcount':'5', 
'varblockcount':'2', eofuncompressed:'160'}]
         '''
         qry = """
-        SELECT segno as fileno, eof as filesize
+        SELECT segno as fileno, eof as filesize, tupcount as tupcount,
+        varblockcount as varblockcount, eofuncompressed as eofuncompressed
         FROM pg_aoseg.pg_aoseg_%d
         ORDER by fileno;
         """ % oid
@@ -170,12 +170,12 @@ class GpMetadataAccessor:
 
         Example:
         >>> accessor.get_paqseg_files(35709)
-        >>> [{'fileno':'1', 'filesize':'320'},
-        ...  {'fileno':'2', 'filesize':'880'},
-        ...  {'fileno':'3', 'filesize':'160'}]
+        >>> [{'fileno':'1', 'filesize':'320', 'tupcount':'10', 
'eofuncompressed':'320'},
+        ...  {'fileno':'2', 'filesize':'880', 'tupcount':'27', 
'eofuncompressed':'880'},
+        ...  {'fileno':'3', 'filesize':'160', 'tupcount':'5', 
'eofuncompressed':'160'}]
         '''
         qry = """
-        SELECT segno as fileno, eof as filesize
+        SELECT segno as fileno, eof as filesize, tupcount, eofuncompressed
         FROM pg_aoseg.pg_paqseg_%d
         ORDER by fileno;
         """ % oid
@@ -357,7 +357,7 @@ def extract_metadata(conn, tbname):
                     relfilenode,
                     f['fileno']
             )
-            files.append({'path': path, 'size': int(f['filesize'])})
+            files.append({'path': path, 'size': int(f['filesize']), 
'tupcount': int(f['tupcount']), 'varblockcount': int(f['varblockcount']), 
'eofuncompressed': int(f['eofuncompressed'])})
         return files
 
     def get_parquet_table_files(oid, relfilenode):
@@ -375,7 +375,7 @@ def extract_metadata(conn, tbname):
                     relfilenode,
                     f['fileno']
             )
-            files.append({'path': path, 'size': int(f['filesize'])})
+            files.append({'path': path, 'size': int(f['filesize']), 
'tupcount': int(f['tupcount']), 'eofuncompressed': int(f['eofuncompressed'])})
         return files
 
     def extract_AO_metadata():

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/00286d7f/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 82ce274..46a9a8f 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -372,12 +372,15 @@ class HawqRegister(object):
             self.failure_handler.rollback()
             sys.exit(1)
 
-    def _set_yml_dataa(self, file_format, files, sizes, tablename, schema, 
distribution_policy, file_locations,\
+    def _set_yml_data(self, file_format, files, sizes, tupcounts, 
eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, 
file_locations,\
                       bucket_number, partitionby, partitions_constraint, 
partitions_name, partitions_compression_level,\
                       partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, encoding):
         self.file_format = file_format
         self.files = files
         self.sizes = sizes
+        self.tupcounts = tupcounts
+        self.eofuncompresseds = eofuncompresseds
+        self.varblockcounts = varblockcounts
         self.src_table_name = tablename
         self.schema = schema
         self.distribution_policy = distribution_policy
@@ -392,9 +395,6 @@ class HawqRegister(object):
         self.partitions_filepaths = partitions_filepaths
         self.partitions_filesizes = partitions_filesizes
         self.encoding = encoding
-        self.files_same_path = []
-        self.sizes_same_path = []
-        self.segnos_same_path = []
 
     def _option_parser_yml(self, yml_file):
         import yaml
@@ -414,7 +414,7 @@ class HawqRegister(object):
         partitions_checksum = []
         partitions_compression_level = []
         partitions_compression_type = []
-        files, sizes = [], []
+        files, sizes, tupcounts, eofuncompresseds, varblockcounts = [], [], 
[], [], []
 
         if params['FileFormat'].lower() == 'parquet':
             Format = 'Parquet'
@@ -437,11 +437,16 @@ class HawqRegister(object):
                     partitions_filesizes.append([item['size'] for item in 
pfile])
             partitions_name = [d['Name'] for d in 
params[Format_FileLocations]['Partitions']]
         if len(params[Format_FileLocations]['Files']):
-            files, sizes = [params['DFS_URL'] + d['path'] for d in 
params[Format_FileLocations]['Files']], [d['size'] for d in 
params[Format_FileLocations]['Files']]
+            for ele in params[Format_FileLocations]['Files']:
+                files.append(params['DFS_URL'] + ele['path'])
+                sizes.append(ele['size'])
+                tupcounts.append(ele['tupcount'] if ele.has_key('tupcount') 
else -1)
+                eofuncompresseds.append(ele['eofuncompressed'] if 
ele.has_key('eofuncompressed') else -1)
+                varblockcounts.append(ele['varblockcount'] if 
ele.has_key('varblockcount') else -1)
+
         encoding = params['Encoding']
         bucketNum = params['Bucketnum'] if 
params['Distribution_Policy'].startswith('DISTRIBUTED BY') else 6
-        self._set_yml_dataa(Format, files, sizes, params['TableName'], 
params['%s_Schema' % Format], params['Distribution_Policy'], 
params[Format_FileLocations], bucketNum, partitionby,\
-                      partitions_constraint, partitions_name, 
partitions_compression_level, partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, encoding)
+        self._set_yml_data(Format, files, sizes, tupcounts, eofuncompresseds, 
varblockcounts, params['TableName'], params['%s_Schema' % Format], 
params['Distribution_Policy'], params[Format_FileLocations], bucketNum, 
partitionby, partitions_constraint, partitions_name, 
partitions_compression_level, partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, encoding)
 
 
     # check conflicting distributed policy
@@ -524,6 +529,12 @@ class HawqRegister(object):
                     sys.exit(1)
 
     def _init(self):
+        self.files_same_path = []
+        self.sizes_same_path = []
+        self.tupcounts_same_path = []
+        self.varblockcounts_same_path = []
+        self.eofuncompresseds_same_path = []
+        self.segnos_same_path = []
         if self.yml:
             self._option_parser_yml(options.yml_config)
             self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
@@ -537,9 +548,6 @@ class HawqRegister(object):
                 logger.error('-e option is only supported with single file 
case.')
                 sys.exit(1)
             self.file_format = 'Parquet'
-            self.files_same_path = []
-            self.sizes_same_path = []
-            self.segnos_same_path = []
             self._check_hash_type() # Usage1 only support randomly distributed 
table
         self.queries = "set allow_system_table_mods='dml';"
         self.queries += "begin transaction;"
@@ -602,9 +610,10 @@ class HawqRegister(object):
         if not self.yml:
             self._check_no_regex_filepath([self.filepath])
             self.files, self.sizes = self._get_files_in_hdfs(self.filepath)
+            self.tupcounts = self.eofuncompresseds = self.varblockcounts = [-1 
for i in range(len(self.files))]
 
-        self.do_not_move, self.files_update, self.sizes_update = False, [], []
-        self.files_append, self.sizes_append = [f for f in self.files], [sz 
for sz in self.sizes]
+        self.do_not_move, self.files_update, self.sizes_update, 
self.tupcounts_update, self.eofuncompresseds_update, self.varblockcounts_update 
= False, [], [], [], [], []
+        self.files_append, self.sizes_append, self.tupcounts_append, 
self.eofuncompresseds_append, self.varblockcounts_append = [f for f in 
self.files], [sz for sz in self.sizes], [tc for tc in self.tupcounts], [eof for 
eof in self.eofuncompresseds], [v for v in self.varblockcounts]
         if self.mode == 'force':
             if len(self.files) == len(existed_files):
                 if sorted(self.files) != sorted(existed_files):
@@ -612,8 +621,8 @@ class HawqRegister(object):
                     self.failure_handler.rollback()
                     sys.exit(1)
                 else:
-                    self.do_not_move, self.files_update, self.sizes_update = 
True, self.files, self.sizes
-                self.files_append, self.sizes_append = [],[]
+                    self.do_not_move, self.files_update, self.sizes_update, 
self.tupcounts_update, self.eofuncompresseds_update, self.varblockcounts_update 
= True, self.files, self.sizes, self.tupcounts, self.eofuncompresseds, 
self.varblockcounts
+                self.files_append, self.sizes_append, self.tupcounts_append, 
self.eofuncompresseds_append, self.varblockcounts_append = [], [], [], [], []
             elif len(self.files) < len(existed_files):
                 logger.error('In force mode, you should include existing table 
files in yaml configuration file. Otherwise you should drop the previous table 
before register --force.')
                 self.failure_handler.rollback()
@@ -623,8 +632,14 @@ class HawqRegister(object):
                     if f in existed_files:
                         self.files_update.append(self.files[k])
                         self.sizes_update.append(self.sizes[k])
+                        self.tupcounts_update.append(self.tupcounts[k])
+                        
self.eofuncompresseds_update.append(self.eofuncompresseds[k])
+                        
self.varblockcounts_update.append(self.varblockcounts[k])
                         self.files_append.remove(self.files[k])
                         self.sizes_append.remove(self.sizes[k])
+                        self.tupcounts_append.remove(self.tupcounts[k])
+                        
self.eofuncompresseds_append.remove(self.eofuncompresseds[k])
+                        
self.varblockcounts_append.remove(self.varblockcounts[k])
                 if sorted(self.files_update) != sorted(existed_files):
                     logger.error('In force mode, you should include existing 
table files in yaml configuration file. Otherwise you should drop the previous 
table before register --force.')
                     self.failure_handler.rollback()
@@ -729,6 +744,9 @@ class HawqRegister(object):
             if seg not in catalog_lst:
                 self.files_same_path.append(self.files_update[k])
                 self.sizes_same_path.append(self.sizes_update[k])
+                self.tupcounts_same_path.append(self.tupcounts_update[k])
+                
self.eofuncompresseds_same_path.append(self.eofuncompresseds_update[k])
+                
self.varblockcounts_same_path.append(self.varblockcounts_update[k])
             if seg in new_catalog_lst:
                 exist_catalog_lst.append(seg)
         for seg in update_segno_lst:
@@ -778,45 +796,67 @@ class HawqRegister(object):
         append_eofs = self.sizes_append
         update_eofs = self.sizes_update
         same_path_eofs = self.sizes_same_path
+        append_tupcounts = self.tupcounts_append
+        update_tupcounts = self.tupcounts_update
+        same_path_tupcounts = self.tupcounts_same_path
+        append_eofuncompresseds = self.eofuncompresseds_append
+        update_eofuncompresseds = self.eofuncompresseds_update
+        same_path_eofuncompresseds = self.eofuncompresseds_same_path
+        append_varblockcounts = self.varblockcounts_append
+        update_varblockcounts = self.varblockcounts_update
+        same_path_varblockcounts = self.varblockcounts_same_path
         update_segno_lst = [f.split('/')[-1] for f in self.files_update]
         same_path_segno_lst = [seg for seg in self.segnos_same_path]
         query = ""
+        
         if mode == 'force':
             query += "delete from pg_aoseg.%s;" % (self.seg_name)
         
         if self.file_format == 'Parquet': 
             if len(update_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % 
(self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1)
-                for k, update_eof in enumerate(update_eofs[1:]):
-                    query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], 
update_eof, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d)' % 
(self.seg_name, update_segno_lst[0], update_eofs[0], update_tupcounts[0], 
update_eofuncompresseds[0])
+                k = 0
+                for update_eof, update_tupcount, update_eofuncompressed in 
zip(update_eofs[1:], update_tupcounts[1:], update_eofuncompresseds[1:]):
+                    query += ',(%s, %d, %d, %d)' % (update_segno_lst[k + 1], 
update_eof, update_tupcount, update_eofuncompressed)
+                    k += 1
                 query += ';'
             if len(same_path_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1)
-                for k, same_path_eof in enumerate(same_path_eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 
1], same_path_eof, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, same_path_segno_lst[0], same_path_eofs[0], 
same_path_tupcounts[0], same_path_eofuncompresseds[0])
+                k = 0
+                for same_path_eof, same_path_tupcount, 
same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], 
same_path_eofuncompresseds[1]):
+                    query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 
1], same_path_eof, same_path_tupcount, same_path_eofuncompressed)
+                    k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, segno, append_eofs[0], -1, -1)
-                for k, append_eof in enumerate(append_eofs[1:]):
-                    query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, 
-1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, segno, append_eofs[0], append_tupcounts[0], 
append_eofuncompresseds[0])
+                k = 0
+                for append_eof, append_tupcount, append_eofuncompressed in 
zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, 
append_tupcount, append_eofuncompressed)
+                    k += 1
                 query += ';'
         else:
             if len(update_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' 
% (self.seg_name, update_segno_lst[0], update_eofs[0], -1, -1, -1)
-                for k, update_eof in enumerate(update_eofs[1:]):
-                    query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 
1], update_eof, -1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%s, %d, %d, %d, %d)' 
% (self.seg_name, update_segno_lst[0], update_eofs[0], update_tupcounts[0], 
update_varblockcounts[0], update_eofuncompresseds[0])
+                k = 0
+                for update_eof, update_tupcount, update_varblockcount, 
update_eofuncompresseds in zip(update_eofs[1:], update_tupcounts[1:], 
update_varblockcounts[1:], update_eofuncompresseds[1:]):
+                    query += ',(%s, %d, %d, %d, %d)' % (update_segno_lst[k + 
1], update_eof, update_tupcount, update_varblockcount, update_eofuncompresseds)
+                    k += 1
                 query += ';'
             if len(same_path_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], -1, -1, -1)
-                for k, same_path_eof in enumerate(same_path_eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k 
+ 1], same_path_eof, -1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], 
same_path_tupcounts[0], same_path_varblockcounts[0], 
same_path_eofuncompresseds[0])
+                k = 0
+                for same_path_eof, same_path_tupcount, 
same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], 
same_path_tupcounts[1:], same_path_varblockcounts[1:], 
same_path_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k 
+ 1], same_path_eof, same_path_tupcount, same_path_varblockcount, 
same_path_eofuncompressed)
+                    k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
-                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, segno, append_eofs[0], -1, -1, -1)
-                for k, append_eof in enumerate(append_eofs[1:]):
-                    query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, 
append_eof, -1, -1, -1)
+                query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, segno, append_eofs[0], append_tupcounts[0], 
append_varblockcounts[0], append_eofuncompresseds[0])
+                k = 0
+                for append_eof, append_tupcount, append_varblockcount, 
append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], 
append_varblockcounts[1:], append_eofuncompresseds[1:]):
+                    query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, 
append_eof, append_tupcount, append_varblockcount, append_eofuncompressed)
+                    k += 1
                 query += ';'
         self.queries += query
     

Reply via email to