Repository: incubator-hawq Updated Branches: refs/heads/master 68570a720 -> 787c096d4
HAWQ-1145. Add UDF gp_relfile_insert_for_register and add insert metadata into gp_relfile_node and gp_persistent_relfile_node for HAWQ register Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/787c096d Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/787c096d Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/787c096d Branch: refs/heads/master Commit: 787c096d43e4b27a20cc484e422f53ac81093d56 Parents: 68570a7 Author: Chunling Wang <[email protected]> Authored: Fri Nov 18 15:09:24 2016 +0800 Committer: xunzhang <[email protected]> Committed: Tue Nov 22 16:52:50 2016 +0800 ---------------------------------------------------------------------- depends/libhdfs3/src/client/Hdfs.cpp | 4 +- src/backend/cdb/cdbpersistentbuild.c | 117 +++++++++++++++++++ src/include/catalog/pg_proc.h | 4 + src/include/catalog/pg_proc.sql | 2 + src/include/utils/builtins.h | 1 + .../test_hawq_register_partition.cpp | 7 ++ .../data/upgrade41/upg2_catupgrade_41.sql.in | 18 +++ tools/bin/hawqregister | 45 +++++++ 8 files changed, 196 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/depends/libhdfs3/src/client/Hdfs.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/src/client/Hdfs.cpp b/depends/libhdfs3/src/client/Hdfs.cpp index 9906a87..395f4f8 100644 --- a/depends/libhdfs3/src/client/Hdfs.cpp +++ b/depends/libhdfs3/src/client/Hdfs.cpp @@ -646,9 +646,9 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char * path, int flags, int bufferSize, if ((flags & O_CREAT) || (flags & O_APPEND) || (flags & O_WRONLY)) { int internalFlags = 0; - if ((flags & O_CREAT) && (flags & O_EXCL)) { + if (flags & O_CREAT) { internalFlags |= Hdfs::Create; - } else if ((flags & O_CREAT) || ((flags & O_APPEND) && (flags & O_WRONLY))) { + } else if ((flags & O_APPEND) && (flags & O_WRONLY)) { internalFlags |= Hdfs::Create; internalFlags |= Hdfs::Append; } else if (flags & O_WRONLY) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/backend/cdb/cdbpersistentbuild.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbpersistentbuild.c b/src/backend/cdb/cdbpersistentbuild.c index 3fea2bb..17dba4b 100644 --- a/src/backend/cdb/cdbpersistentbuild.c +++ b/src/backend/cdb/cdbpersistentbuild.c @@ -831,6 +831,123 @@ gp_persistent_build_all(PG_FUNCTION_ARGS) PG_RETURN_INT32(1); } +/* + * gp_relfile_insert_for_register + * + * This function is an internal function, for hawq register to insert metadata + * into gp_relfile_node and gp_persistent_relfile_node. + */ +Datum +gp_relfile_insert_for_register(PG_FUNCTION_ARGS) +{ + Oid tablespace = PG_GETARG_OID(0); + Oid database = PG_GETARG_OID(1); + Oid relation = PG_GETARG_OID(2); + Oid relfilenode = PG_GETARG_OID(3); + Oid segfile = PG_GETARG_OID(4); + char *relname = PG_GETARG_CSTRING(5); + char relkind = PG_GETARG_CHAR(6); + char relstorage = PG_GETARG_CHAR(7); + Oid relam = PG_GETARG_OID(8); + + Relation gp_relfile_node; + + RelFileNode relFileNode; + + PersistentFileSysRelStorageMgr relStorageMgr; + + ItemPointerData persistentTid; + int64 persistentSerialNum; + + /* Fetch a copy of the tuple to scribble on */ + HeapTuple tuple = SearchSysCacheCopy(DATABASEOID, + ObjectIdGetDatum(database), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "could not find tuple for database %u", database); + Form_pg_database form_pg_database = (Form_pg_database) GETSTRUCT(tuple); + + Oid defaultTablespace = form_pg_database->dattablespace; + gp_relfile_node = + DirectOpen_GpRelfileNodeOpen( + defaultTablespace, + database); + + relFileNode.spcNode = tablespace; + relFileNode.dbNode = database; + relFileNode.relNode = relfilenode; + + relStorageMgr = ( + (relstorage == RELSTORAGE_AOROWS || + relstorage == RELSTORAGE_PARQUET) ? + PersistentFileSysRelStorageMgr_AppendOnly : + PersistentFileSysRelStorageMgr_BufferPool); + + gp_before_persistence_work = true; + + if (relStorageMgr == PersistentFileSysRelStorageMgr_BufferPool) { + PersistentFileSysRelStorageMgr localRelStorageMgr; + PersistentFileSysRelBufpoolKind relBufpoolKind; + + GpPersistentRelfileNode_GetRelfileInfo( + relkind, + relstorage, + relam, + &localRelStorageMgr, + &relBufpoolKind); + Assert(localRelStorageMgr == PersistentFileSysRelStorageMgr_BufferPool); + + /* + * Heap tables only ever add a single segment_file_num=0 + * entry to gp_persistent_relation regardless of how many + * segment files there really are. + */ + PersistentRelfile_AddCreated( + &relFileNode, + /* segmentFileNum */ 0, + relStorageMgr, + relBufpoolKind, + relname, + &persistentTid, + &persistentSerialNum, + /* flushToXLog */ false); + + InsertGpRelfileNodeTuple( + gp_relfile_node, + relation, //pg_class OID + relname, + relFileNode.relNode, //pg_class relfilenode + /* segmentFileNum */ 0, + /* updateIndex */ true, + &persistentTid, + persistentSerialNum); + } else { + PersistentRelfile_AddCreated( + &relFileNode, + segfile, + relStorageMgr, + PersistentFileSysRelBufpoolKind_None, + relname, + &persistentTid, + &persistentSerialNum, + /* flushToXLog */ false); + + InsertGpRelfileNodeTuple( + gp_relfile_node, + relation, //pg_class OID + relname, + relFileNode.relNode, //pg_class relfilenode + segfile, + /* updateIndex */ true, + &persistentTid, + persistentSerialNum); + } + + gp_before_persistence_work = false; + + PG_RETURN_INT32(1); +} + static void PersistentBuild_FindGpRelationNodeIndex( Oid database, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 5393282..0b1da34 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -9924,6 +9924,10 @@ DESCR("Remove an entry specified by TID from a persistent table for the current DATA(insert OID = 7182 ( gp_persistent_set_relation_bufpool_kind_all PGNSP PGUID 12 f f f f v 0 23 f "" _null_ _null_ _null_ gp_persistent_set_relation_bufpool_kind_all - _null_ n )); DESCR("Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1"); +/* gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) => int4 */ +DATA(insert OID = 7185 ( gp_relfile_insert_for_register PGNSP PGUID 12 f f f f v 9 23 f "26 26 26 26 26 2275 18 18 26" _null_ _null_ _null_ gp_relfile_insert_for_register - _null_ n )); +DESCR("insert record into gp_persistent_relfile_node and gp_relfile_node"); + /* GIN array support */ /* ginarrayextract(anyarray, internal) => internal */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/catalog/pg_proc.sql ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql index 50985d4..1d79f36 100644 --- a/src/include/catalog/pg_proc.sql +++ b/src/include/catalog/pg_proc.sql @@ -5243,6 +5243,8 @@ CREATE FUNCTION gp_persistent_set_relation_bufpool_kind_all() RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_set_relation_bufpool_kind_all' WITH (OID=7182, DESCRIPTION="Populate the gp_persistent_relation_node table's relation_bufpool_kind column for the whole database instance for upgrade from 4.0 to 4.1"); + CREATE FUNCTION gp_relfile_insert_for_register(Oid, Oid, Oid, Oid, Oid, cstring, char, char, Oid) RETURNS int4 LANGUAGE internal VOLATILE AS 'gp_persistent_relnode_insert' WITH (OID=7179, DESCRIPTION="insert record into gp_relfile_insert_for_register and gp_relfile_node"); + -- GIN array support CREATE FUNCTION ginarrayextract(anyarray, internal) RETURNS internal LANGUAGE internal IMMUTABLE STRICT AS 'ginarrayextract' WITH (OID=2743, DESCRIPTION="GIN array support"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/include/utils/builtins.h ---------------------------------------------------------------------- diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 0b7e30d..64b251b 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1106,6 +1106,7 @@ Datum gp_persistent_build_all(PG_FUNCTION_ARGS); Datum gp_persistent_reset_all(PG_FUNCTION_ARGS); Datum gp_persistent_repair_delete(PG_FUNCTION_ARGS); Datum gp_persistent_set_relation_bufpool_kind_all(PG_FUNCTION_ARGS); +Datum gp_relfile_insert_for_register(PG_FUNCTION_ARGS); /* utils/error/elog.c */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/feature/ManagementTool/test_hawq_register_partition.cpp ---------------------------------------------------------------------- diff --git a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp index bb5e970..196f2c3 100644 --- a/src/test/feature/ManagementTool/test_hawq_register_partition.cpp +++ b/src/test/feature/ManagementTool/test_hawq_register_partition.cpp @@ -51,6 +51,8 @@ void TestHawqRegister::runYamlCaseTableExistsPartition(std::string casename, std util.query("select * from nt;", 100); } else { util.query("select * from nt;", checknum); + util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;"); + util.query("select * from nt;", checknum + 100); } EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str()))); @@ -91,6 +93,8 @@ void TestHawqRegister::runYamlCaseTableNotExistsPartition(std::string casename, util.query("select * from pg_class where relname = 'nt';", 0); } else { util.query("select * from nt;", checknum); + util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;"); + util.query("select * from nt;", checknum + 100); } EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str()))); @@ -164,6 +168,9 @@ void TestHawqRegister::runYamlCaseForceModePartition(std::string casename, std:: EXPECT_EQ(result1_2_1 + result1_2_2, result2_2); EXPECT_EQ(result1_3_1 + result1_3_2, result2_3); + util.execute("insert into nt select generate_series(1, 100), 1, 1, 'M', 1;"); + util.query("select * from nt;", checknum + 100); + EXPECT_EQ(0, Command::getCommandStatus(hawq::test::stringFormat("rm -rf %s", t_yml.c_str()))); util.execute("drop table t;"); util.execute("drop table nt;"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in ---------------------------------------------------------------------- diff --git a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in index eb40c01..13d61db 100644 --- a/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in +++ b/src/test/regress/data/upgrade41/upg2_catupgrade_41.sql.in @@ -61,6 +61,24 @@ is 'Populate the gp_persistent_relation_node table''s relation_bufpool_kind colu select gp_persistent_set_relation_bufpool_kind_all(); select gp_persistent_set_relation_bufpool_kind_all() from gp_dist_random('gp_version_at_initdb'); +create function @[email protected]_relfile_insert_for_register( + @[email protected]) +returns @[email protected] +language internal AS 'gp_relfile_insert_for_register' volatile +with (oid = 7183); +select @[email protected](7183); +comment on function @[email protected]_relfile_insert_for_register( + @[email protected], + @[email protected], + @[email protected], + @[email protected], + @[email protected], + @[email protected], + @[email protected], + @[email protected], + @[email protected]) +is 'insert record in gp_persistent_relfile_node and gp_relfile_node'; + update @[email protected]_class set relhaspkey = true where oid = 5094; select catDML('update @[email protected]_class set relhaspkey = true where oid = 5094') from gp_dist_random('gp_version_at_initdb'); -- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/787c096d/tools/bin/hawqregister ---------------------------------------------------------------------- diff --git a/tools/bin/hawqregister b/tools/bin/hawqregister index 5f1c47c..4adbdcc 100755 --- a/tools/bin/hawqregister +++ b/tools/bin/hawqregister @@ -348,6 +348,18 @@ class GpRegisterAccessor(object): query = "select pg_encoding_to_char(%s);" % encoding_indx return self.exec_query(query)[0]['pg_encoding_to_char'] + def get_metadata_for_relfile_insert(self, database, tablename): + _, tablename = tablename_handler(tablename) + query = "select reltablespace from pg_class where relname = '%s';" % tablename + tablespace_oid = int(self.exec_query(query)[0]['reltablespace']) + if tablespace_oid == 0: + query = "select oid from pg_tablespace where spcname='dfs_default';" + tablespace_oid = int(self.exec_query(query)[0]['oid']) + query = "select oid from pg_database where datname='%s';" % database + database_oid = int(self.exec_query(query)[0]['oid']) + query = "select oid, relfilenode, relname, relkind, relstorage, relam from pg_class where relname='%s';" % tablename + return [tablespace_oid, database_oid, int(self.exec_query(query)[0]['oid']), int(self.exec_query(query)[0]['relfilenode']), str(self.exec_query(query)[0]['relname']), str(self.exec_query(query)[0]['relkind']), str(self.exec_query(query)[0]['relstorage']), int(self.exec_query(query)[0]['relam'])] + def update_catalog(self, query): self.conn.query(query) @@ -430,6 +442,9 @@ class HawqRegister(object): self.failure_handler.rollback() sys.exit(1) + def _get_metadata_for_relfile_insert(self): + return self.accessor.get_metadata_for_relfile_insert(self.database, self.tablename) + def _set_yml_data(self, file_format, files, sizes, tupcounts, eofuncompresseds, varblockcounts, tablename, schema, distribution_policy, file_locations,\ bucket_number, partitionby, partitions_constraint, partitions_name, partitions_compression_level,\ partitions_compression_type, partitions_checksum, partitions_filepaths, partitions_filesizes, \ @@ -885,7 +900,9 @@ class HawqRegister(object): same_path_varblockcounts = self.varblockcounts_same_path update_segno_lst = [f.split('/')[-1] for f in self.files_update] same_path_segno_lst = [seg for seg in self.segnos_same_path] + relfile_data = self._get_metadata_for_relfile_insert() query = "" + insert_relfile_segs = [] if mode == 'force': query += "delete from pg_aoseg.%s;" % (self.seg_name) @@ -900,17 +917,21 @@ class HawqRegister(object): query += ';' if len(same_path_eofs) > 0: query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_eofuncompresseds[0]) + insert_relfile_segs.append(int(same_path_segno_lst[0])); k = 0 for same_path_eof, same_path_tupcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_eofuncompresseds[1:]): query += ',(%d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_eofuncompressed) + insert_relfile_segs.append(int(same_path_segno_lst[k + 1])); k += 1 query += ';' segno += len(same_path_eofs) if len(append_eofs) > 0: query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_eofuncompresseds[0]) + insert_relfile_segs.append(segno); k = 0 for append_eof, append_tupcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_eofuncompresseds[1:]): query += ',(%d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_eofuncompressed) + insert_relfile_segs.append(segno + k + 1); k += 1 query += ';' else: @@ -923,20 +944,27 @@ class HawqRegister(object): query += ';' if len(same_path_eofs) > 0: query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, same_path_segno_lst[0], same_path_eofs[0], same_path_tupcounts[0], same_path_varblockcounts[0], same_path_eofuncompresseds[0]) + insert_relfile_segs.append(int(same_path_segno_lst[0])); k = 0 for same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed in zip(same_path_eofs[1:], same_path_tupcounts[1:], same_path_varblockcounts[1:], same_path_eofuncompresseds[1:]): query += ',(%d, %d, %d, %d, %d)' % (same_path_segno_lst[k + 1], same_path_eof, same_path_tupcount, same_path_varblockcount, same_path_eofuncompressed) + insert_relfile_segs.append(int(same_path_segno_lst[k + 1])); k += 1 query += ';' segno += len(same_path_eofs) if len(append_eofs) > 0: query += 'insert into pg_aoseg.%s values(%d, %d, %d, %d, %d)' % (self.seg_name, segno, append_eofs[0], append_tupcounts[0], append_varblockcounts[0], append_eofuncompresseds[0]) + insert_relfile_segs.append(segno); k = 0 for append_eof, append_tupcount, append_varblockcount, append_eofuncompressed in zip(append_eofs[1:], append_tupcounts[1:], append_varblockcounts[1:], append_eofuncompresseds[1:]): query += ',(%d, %d, %d, %d, %d)' % (segno + k + 1, append_eof, append_tupcount, append_varblockcount, append_eofuncompressed) + insert_relfile_segs.append(segno + k + 1); k += 1 query += ';' self.queries += query + for seg in insert_relfile_segs: + self.queries += "select gp_relfile_insert_for_register(%d, %d, %d, %d, %d, '%s', '%s', '%s', %d);" % (relfile_data[0], relfile_data[1], relfile_data[2], relfile_data[3], seg, relfile_data[4], relfile_data[5], relfile_data[6], relfile_data[7]) + def _modify_metadata(self): try: @@ -1016,6 +1044,15 @@ class HawqRegisterPartition(HawqRegister): logger.error('Multi-level partition table is not supported!') sys.exit(1) + parent_tablename = self.tablename + parent_files = self.files + parent_sizes = self.sizes + parent_tupcounts = self.tupcounts + parent_eofuncompresseds = self.eofuncompresseds + parent_varblockcounts = self.varblockcounts + if self.yml: + self.filepath = self.files[0][:self.files[0].rfind('/')] if self.files else '' + self._check_file_not_folder() for k, pn in enumerate(self.partitions_name): self.tablename = pn self.files = self.partitions_filepaths[k] @@ -1034,6 +1071,14 @@ class HawqRegisterPartition(HawqRegister): self.queries = "set allow_system_table_mods='dml';" self.queries += "begin transaction;" self._check_duplicate_constraint() + self.tablename = parent_tablename + self.files = parent_files + self.sizes = parent_sizes + self.tupcounts = parent_tupcounts + self.eofuncompresseds = parent_eofuncompresseds + self.varblockcounts = parent_varblockcounts + self._do_check() + self._prepare_register() schemaname, _ = tablename_handler(self.dst_table_name) for k, pn in enumerate(self.partitions_name): self.constraint = self.partitions_constraint[k]
