Repository: incubator-hawq
Updated Branches:
  refs/heads/master 68570a720 -> 787c096d4


HAWQ-1145. Add UDF gp_relfile_insert_for_register and add insert metadata into 
gp_relfile_node and gp_persistent_relfile_node for HAWQ register


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/787c096d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/787c096d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/787c096d

Branch: refs/heads/master
Commit: 787c096d43e4b27a20cc484e422f53ac81093d56
Parents: 68570a7
Author: Chunling Wang <[email protected]>
Authored: Fri Nov 18 15:09:24 2016 +0800
Committer: xunzhang <[email protected]>
Committed: Tue Nov 22 16:52:50 2016 +0800

----------------------------------------------------------------------
 depends/libhdfs3/src/client/Hdfs.cpp            |   4 +-
 src/backend/cdb/cdbpersistentbuild.c            | 117 +++++++++++++++++++
 src/include/catalog/pg_proc.h                   |   4 +
 src/include/catalog/pg_proc.sql                 |   2 +
 src/include/utils/builtins.h                    |   1 +
 .../test_hawq_register_partition.cpp            |   7 ++
 .../data/upgrade41/upg2_catupgrade_41.sql.in    |  18 +++
 tools/bin/hawqregister                          |  45 +++++++
 8 files changed, 196 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/depends/libhdfs3/src/client/Hdfs.cpp
----------------------------------------------------------------------
diff --git a/depends/libhdfs3/src/client/Hdfs.cpp 
b/depends/libhdfs3/src/client/Hdfs.cpp
index 9906a87..395f4f8 100644
--- a/depends/libhdfs3/src/client/Hdfs.cpp
+++ b/depends/libhdfs3/src/client/Hdfs.cpp
@@ -646,9 +646,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char * path, int 
flags, int bufferSize,
         if ((flags & O_CREAT) || (flags & O_APPEND) || (flags & O_WRONLY)) {
             int internalFlags = 0;
 
-            if ((flags & O_CREAT) && (flags & O_EXCL)) {
+            if (flags & O_CREAT)  {
                 internalFlags |= Hdfs::Create;
-            } else if ((flags & O_CREAT) || ((flags & O_APPEND) && (flags & 
O_WRONLY))) {
+            } else if ((flags & O_APPEND) && (flags & O_WRONLY)) {
                 internalFlags |= Hdfs::Create;
                 internalFlags |= Hdfs::Append;
             } else if (flags & O_WRONLY) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/backend/cdb/cdbpersistentbuild.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpersistentbuild.c 
b/src/backend/cdb/cdbpersistentbuild.c
index 3fea2bb..17dba4b 100644
--- a/src/backend/cdb/cdbpersistentbuild.c
+++ b/src/backend/cdb/cdbpersistentbuild.c
@@ -831,6 +831,123 @@ gp_persistent_build_all(PG_FUNCTION_ARGS)
        PG_RETURN_INT32(1);
 }
 
+/*
+ * gp_relfile_insert_for_register
+ * 
+ * This function is an internal function, for hawq register to insert metadata
+ * into gp_relfile_node and gp_persistent_relfile_node.
+ */
+Datum
+gp_relfile_insert_for_register(PG_FUNCTION_ARGS)
+{
+       Oid             tablespace = PG_GETARG_OID(0);
+       Oid             database = PG_GETARG_OID(1);
+       Oid             relation = PG_GETARG_OID(2);
+       Oid             relfilenode = PG_GETARG_OID(3);
+       Oid             segfile = PG_GETARG_OID(4);
+       char           *relname = PG_GETARG_CSTRING(5);
+       char            relkind = PG_GETARG_CHAR(6);
+       char            relstorage = PG_GETARG_CHAR(7);
+       Oid             relam = PG_GETARG_OID(8);
+
+       Relation        gp_relfile_node;
+
+       RelFileNode     relFileNode;
+
+       PersistentFileSysRelStorageMgr relStorageMgr;
+
+       ItemPointerData persistentTid;
+       int64           persistentSerialNum;
+
+       /* Fetch a copy of the tuple to scribble on */
+       HeapTuple       tuple = SearchSysCacheCopy(DATABASEOID,
+                                            ObjectIdGetDatum(database),
+                                            0, 0, 0);
+       if (!HeapTupleIsValid(tuple))
+               elog(ERROR, "could not find tuple for database %u", database);
+       Form_pg_database form_pg_database = (Form_pg_database) GETSTRUCT(tuple);
+
+       Oid             defaultTablespace = form_pg_database->dattablespace;
+       gp_relfile_node =
+               DirectOpen_GpRelfileNodeOpen(
+                                            defaultTablespace,
+                                            database);
+
+       relFileNode.spcNode = tablespace;
+       relFileNode.dbNode = database;
+       relFileNode.relNode = relfilenode;
+
+       relStorageMgr = (
+                        (relstorage == RELSTORAGE_AOROWS ||
+                         relstorage == RELSTORAGE_PARQUET) ?
+                        PersistentFileSysRelStorageMgr_AppendOnly :
+                        PersistentFileSysRelStorageMgr_BufferPool);
+
+       gp_before_persistence_work = true;
+
+       if (relStorageMgr == PersistentFileSysRelStorageMgr_BufferPool) {
+               PersistentFileSysRelStorageMgr localRelStorageMgr;
+               PersistentFileSysRelBufpoolKind relBufpoolKind;
+
+               GpPersistentRelfileNode_GetRelfileInfo(
+                                                      relkind,
+                                                      relstorage,
+                                                      relam,
+                                                      &localRelStorageMgr,
+                                                      &relBufpoolKind);
+               Assert(localRelStorageMgr == 
PersistentFileSysRelStorageMgr_BufferPool);
+
+               /*
+                * Heap tables only ever add a single segment_file_num=0
+                * entry to gp_persistent_relation regardless of how many
+                * segment files there really are.
+                */
+               PersistentRelfile_AddCreated(
+                                            &relFileNode,
+                                             /* segmentFileNum */ 0,
+                                            relStorageMgr,
+                                            relBufpoolKind,
+                                            relname,
+                                            &persistentTid,
+                                            &persistentSerialNum,
+                                             /* flushToXLog */ false);
+
+               InsertGpRelfileNodeTuple(
+                                        gp_relfile_node,
+                                        relation, //pg_class OID
+                                        relname,
+                                relFileNode.relNode, //pg_class relfilenode
+                                         /* segmentFileNum */ 0,
+                                         /* updateIndex */ true,
+                                        &persistentTid,
+                                        persistentSerialNum);
+       } else {
+               PersistentRelfile_AddCreated(
+                                            &relFileNode,
+                                            segfile,
+                                            relStorageMgr,
+                                      PersistentFileSysRelBufpoolKind_None,
+                                            relname,
+                                            &persistentTid,
+                                            &persistentSerialNum,
+                                             /* flushToXLog */ false);
+
+               InsertGpRelfileNodeTuple(
+                                        gp_relfile_node,
+                                        relation, //pg_class OID
+                                        relname,
+                                relFileNode.relNode, //pg_class relfilenode
+                                        segfile,
+                                         /* updateIndex */ true,
+                                        &persistentTid,
+                                        persistentSerialNum);
+       }
+
+       gp_before_persistence_work = false;
+
+       PG_RETURN_INT32(1);
+}
+
 static void
 PersistentBuild_FindGpRelationNodeIndex(
        Oid                             database,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.h
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5393282..0b1da34 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -9924,6 +9924,10 @@ DESCR("Remove an entry specified by TID from a 
persistent table for the current
 DATA(insert OID = 7182 ( gp_persistent_set_relation_bufpool_kind_all  PGNSP 
PGUID 12 f f f f v 0 23 f "" _null_ _null_ _null_ 
gp_persistent_set_relation_bufpool_kind_all - _null_ n ));
 DESCR("Populate the gp_persistent_relation_node table's relation_bufpool_kind 
column for the whole database instance for upgrade from 4.0 to 4.1");
 
+/* gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, 
char, Oid) => int4 */
+DATA(insert OID = 7185 ( gp_relfile_insert_for_register  PGNSP PGUID 12 f f f 
f v 9 23 f "26 26 26 26 26 2275 18 18 26" _null_ _null_ _null_ 
gp_relfile_insert_for_register - _null_ n ));
+DESCR("insert record into gp_persistent_relfile_node and gp_relfile_node");
+
 
 /* GIN array support */
 /* ginarrayextract(anyarray, internal) => internal */ 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.sql
----------------------------------------------------------------------
diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql
index 50985d4..1d79f36 100644
--- a/src/include/catalog/pg_proc.sql
+++ b/src/include/catalog/pg_proc.sql
@@ -5243,6 +5243,8 @@
 
  CREATE FUNCTION gp_persistent_set_relation_bufpool_kind_all() RETURNS int4 
LANGUAGE internal VOLATILE AS 'gp_persistent_set_relation_bufpool_kind_all' 
WITH (OID=7182, DESCRIPTION="Populate the gp_persistent_relation_node table's 
relation_bufpool_kind column for the whole database instance for upgrade from 
4.0 to 4.1");
 
+ CREATE FUNCTION gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, 
cstring, char, char, Oid) RETURNS int4 LANGUAGE internal VOLATILE AS 
'gp_persistent_relnode_insert' WITH (OID=7179, DESCRIPTION="insert record into 
gp_relfile_insert_for_register and gp_relfile_node");
+
 -- GIN array support
  CREATE FUNCTION ginarrayextract(anyarray, internal) RETURNS internal LANGUAGE 
internal IMMUTABLE STRICT AS 'ginarrayextract' WITH (OID=2743, DESCRIPTION="GIN 
array support");
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/utils/builtins.h
----------------------------------------------------------------------
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 0b7e30d..64b251b 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1106,6 +1106,7 @@ Datum gp_persistent_build_all(PG_FUNCTION_ARGS);
 Datum gp_persistent_reset_all(PG_FUNCTION_ARGS);
 Datum gp_persistent_repair_delete(PG_FUNCTION_ARGS);
 Datum gp_persistent_set_relation_bufpool_kind_all(PG_FUNCTION_ARGS);
+Datum gp_relfile_insert_for_register(PG_FUNCTION_ARGS);
 
 
 /* utils/error/elog.c */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp 
b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
index bb5e970..196f2c3 100644
--- a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
+++ b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp
@@ -51,6 +51,8 @@ void 
TestHawqRegister::runYamlCaseTableExistsPartition(std::string casename, std
         util.query("select * from nt;", 100);
     } else {
         util.query("select * from nt;", checknum);
+        util.execute("insert into nt select generate_series(1, 100), 1, 1, 
'M', 1;");
+        util.query("select * from nt;", checknum + 100);
     }
     
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf 
%s", t_yml.c_str())));
@@ -91,6 +93,8 @@ void 
TestHawqRegister::runYamlCaseTableNotExistsPartition(std::string casename,
         util.query("select * from pg_class where relname = 'nt';", 0);
     } else {
         util.query("select * from nt;", checknum);
+        util.execute("insert into nt select generate_series(1, 100), 1, 1, 
'M', 1;");
+        util.query("select * from nt;", checknum + 100);
     }
     
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf 
%s", t_yml.c_str())));
@@ -164,6 +168,9 @@ void 
TestHawqRegister::runYamlCaseForceModePartition(std::string casename, std::
     EXPECT_EQ(result1_2_1 + result1_2_2, result2_2);
     EXPECT_EQ(result1_3_1 + result1_3_2, result2_3);
 
+    util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 
1;");
+    util.query("select * from nt;", checknum + 100);
+
     EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf 
%s", t_yml.c_str())));
     util.execute("drop table t;");
     util.execute("drop table nt;");

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
----------------------------------------------------------------------
diff --git a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in 
b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
index eb40c01..13d61db 100644
--- a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
+++ b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in
@@ -61,6 +61,24 @@ is 'Populate the gp_persistent_relation_node table''s 
relation_bufpool_kind colu
 select gp_persistent_set_relation_bufpool_kind_all();
 select gp_persistent_set_relation_bufpool_kind_all() from 
gp_dist_random('gp_version_at_initdb');
 
+create function @[email protected]_relfile_insert_for_register(
+     @[email protected])
+returns @[email protected]
+language internal AS 'gp_relfile_insert_for_register' volatile
+with (oid = 7183);
+select @[email protected](7183);
+comment on function @[email protected]_relfile_insert_for_register(
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected],
+     @[email protected])
+is 'insert record in gp_persistent_relfile_node and gp_relfile_node';
+
 update @[email protected]_class set relhaspkey = true where oid = 5094;
 select catDML('update @[email protected]_class set relhaspkey = true 
where oid = 5094') from gp_dist_random('gp_version_at_initdb');
 --

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/tools/bin/hawqregister
----------------------------------------------------------------------
diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister
index 5f1c47c..4adbdcc 100755
--- a/tools/bin/hawqregister
+++ b/tools/bin/hawqregister
@@ -348,6 +348,18 @@ class GpRegisterAccessor(object):
         query = "select pg_encoding_to_char(%s);" % encoding_indx
         return self.exec_query(query)[0]['pg_encoding_to_char']
 
+    def get_metadata_for_relfile_insert(self, database, tablename):
+        _, tablename = tablename_handler(tablename)
+        query = "select reltablespace from pg_class where relname = '%s';" % 
tablename
+        tablespace_oid = int(self.exec_query(query)[0]['reltablespace'])
+        if tablespace_oid == 0:
+            query = "select oid from pg_tablespace where 
spcname='dfs_default';"
+            tablespace_oid = int(self.exec_query(query)[0]['oid'])
+        query = "select oid from pg_database where datname='%s';" % database
+        database_oid = int(self.exec_query(query)[0]['oid'])
+        query = "select oid, relfilenode, relname, relkind, relstorage, relam 
from pg_class where relname='%s';" % tablename
+        return [tablespace_oid, database_oid, 
int(self.exec_query(query)[0]['oid']), 
int(self.exec_query(query)[0]['relfilenode']), 
str(self.exec_query(query)[0]['relname']), 
str(self.exec_query(query)[0]['relkind']), 
str(self.exec_query(query)[0]['relstorage']), 
int(self.exec_query(query)[0]['relam'])]
+
     def update_catalog(self, query):
         self.conn.query(query)
 
@@ -430,6 +442,9 @@ class HawqRegister(object):
             self.failure_handler.rollback()
             sys.exit(1)
 
+    def _get_metadata_for_relfile_insert(self):
+        return self.accessor.get_metadata_for_relfile_insert(self.database, 
self.tablename)
+
     def _set_yml_data(self, file_format, files, sizes, tupcounts, 
eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, 
file_locations,\
                       bucket_number, partitionby, partitions_constraint, 
partitions_name, partitions_compression_level,\
                       partitions_compression_type, partitions_checksum, 
partitions_filepaths, partitions_filesizes, \
@@ -885,7 +900,9 @@ class HawqRegister(object):
         same_path_varblockcounts = self.varblockcounts_same_path
         update_segno_lst = [f.split('/')[-1] for f in self.files_update]
         same_path_segno_lst = [seg for seg in self.segnos_same_path]
+        relfile_data = self._get_metadata_for_relfile_insert()
         query = ""
+        insert_relfile_segs = []
         
         if mode == 'force':
             query += "delete from pg_aoseg.%s;" % (self.seg_name)
@@ -900,17 +917,21 @@ class HawqRegister(object):
                 query += ';'
             if len(same_path_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, same_path_segno_lst[0], same_path_eofs[0], 
same_path_tupcounts[0], same_path_eofuncompresseds[0])
+                insert_relfile_segs.append(int(same_path_segno_lst[0]));
                 k = 0
                 for same_path_eof, same_path_tupcount, 
same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], 
same_path_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 
1], same_path_eof, same_path_tupcount, same_path_eofuncompressed)
+                    insert_relfile_segs.append(int(same_path_segno_lst[k + 
1]));
                     k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % 
(self.seg_name, segno, append_eofs[0], append_tupcounts[0], 
append_eofuncompresseds[0])
+                insert_relfile_segs.append(segno);
                 k = 0
                 for append_eof, append_tupcount, append_eofuncompressed in 
zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, 
append_tupcount, append_eofuncompressed)
+                    insert_relfile_segs.append(segno + k + 1);
                     k += 1
                 query += ';'
         else:
@@ -923,20 +944,27 @@ class HawqRegister(object):
                 query += ';'
             if len(same_path_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], 
same_path_tupcounts[0], same_path_varblockcounts[0], 
same_path_eofuncompresseds[0])
+                insert_relfile_segs.append(int(same_path_segno_lst[0]));
                 k = 0
                 for same_path_eof, same_path_tupcount, 
same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], 
same_path_tupcounts[1:], same_path_varblockcounts[1:], 
same_path_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k 
+ 1], same_path_eof, same_path_tupcount, same_path_varblockcount, 
same_path_eofuncompressed)
+                    insert_relfile_segs.append(int(same_path_segno_lst[k + 
1]));
                     k += 1
                 query += ';'
             segno += len(same_path_eofs)
             if len(append_eofs) > 0:
                 query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' 
% (self.seg_name, segno, append_eofs[0], append_tupcounts[0], 
append_varblockcounts[0], append_eofuncompresseds[0])
+                insert_relfile_segs.append(segno);
                 k = 0
                 for append_eof, append_tupcount, append_varblockcount, 
append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], 
append_varblockcounts[1:], append_eofuncompresseds[1:]):
                     query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, 
append_eof, append_tupcount, append_varblockcount, append_eofuncompressed)
+                    insert_relfile_segs.append(segno + k + 1);
                     k += 1
                 query += ';'
         self.queries += query
+        for seg in insert_relfile_segs:
+            self.queries += "select gp_relfile_insert_for_register(%d, %d, %d, 
%d, %d, '%s', '%s', '%s', %d);" % (relfile_data[0], relfile_data[1], 
relfile_data[2], relfile_data[3], seg, relfile_data[4], relfile_data[5], 
relfile_data[6], relfile_data[7])
+
     
     def _modify_metadata(self):
         try:
@@ -1016,6 +1044,15 @@ class HawqRegisterPartition(HawqRegister):
             logger.error('Multi-level partition table is not supported!')
             sys.exit(1)
 
+        parent_tablename = self.tablename
+        parent_files = self.files
+        parent_sizes = self.sizes
+        parent_tupcounts = self.tupcounts
+        parent_eofuncompresseds = self.eofuncompresseds
+        parent_varblockcounts = self.varblockcounts
+        if self.yml:
+            self.filepath = self.files[0][:self.files[0].rfind('/')] if 
self.files else ''
+            self._check_file_not_folder()
         for k, pn in enumerate(self.partitions_name):
             self.tablename = pn
             self.files = self.partitions_filepaths[k]
@@ -1034,6 +1071,14 @@ class HawqRegisterPartition(HawqRegister):
         self.queries = "set allow_system_table_mods='dml';"
         self.queries += "begin transaction;"
         self._check_duplicate_constraint()
+        self.tablename = parent_tablename
+        self.files = parent_files
+        self.sizes = parent_sizes
+        self.tupcounts = parent_tupcounts
+        self.eofuncompresseds = parent_eofuncompresseds
+        self.varblockcounts = parent_varblockcounts
+        self._do_check()
+        self._prepare_register()
         schemaname, _ = tablename_handler(self.dst_table_name)
         for k, pn in enumerate(self.partitions_name):
             self.constraint = self.partitions_constraint[k]

Reply via email to