Revert "Revert "HAWQ-532. Optimise vseg number for copy to statement.""
This reverts commit 86d9b0c03de349398ddb0337f7ac5dac3b58d2b9. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/695606ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/695606ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/695606ff Branch: refs/heads/HAWQ-459 Commit: 695606ff442afcf4e9317451e02e3a6a31f900ff Parents: 802d537 Author: hzhang2 <[email protected]> Authored: Fri Mar 18 14:24:41 2016 +0800 Committer: hzhang2 <[email protected]> Committed: Mon Mar 21 16:29:41 2016 +0800 ---------------------------------------------------------------------- src/backend/commands/copy.c | 97 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 89 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/695606ff/src/backend/commands/copy.c ---------------------------------------------------------------------- diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index bcd5384..e37fb83 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -91,6 +91,11 @@ #include "postmaster/autovacuum.h" #include "cdb/dispatcher.h" +/* + * in dbsize.c + */ +extern int64 calculate_relation_size(Relation rel); + /* DestReceiver for COPY (SELECT) TO */ typedef struct { @@ -137,6 +142,7 @@ static void copy_in_error_callback(void *arg); static void CopyInitPartitioningState(EState *estate); static void CopyInitDataParser(CopyState cstate); static bool CopyCheckIsLastLine(CopyState cstate); +static int calculate_virtual_segment_number(List* candidateRelations); /* ========================================================================== * The follwing macros aid in major refactoring of data processing code (in @@ -1556,7 +1562,16 @@ DoCopy(const CopyStmt *stmt, const char *queryString) target_policy = GpPolicyFetch(CurrentMemoryContext, relid); Assert(target_policy); - target_segment_num = target_policy->bucketnum; + /* + * For hash table we use table bucket number to request vsegs + * For random table, we use a fixed GUC value to request vsegs. + */ + if(target_policy->nattrs > 0){ + target_segment_num = target_policy->bucketnum; + } + else{ + target_segment_num = hawq_rm_nvseg_for_copy_from_perquery; + } pfree(target_policy); cstate->resource = AllocateResource(QRL_ONCE, 1, 1, target_segment_num, target_segment_num,NULL,0); @@ -1749,6 +1764,63 @@ DoCopy(const CopyStmt *stmt, const char *queryString) } /* + * calculate virtual segment number for copy statement. + * if there is hash distributed relations exist, use the max bucket number. + * if all relation are random, use the data size to determine vseg number. + */ +static int calculate_virtual_segment_number(List* candidateOids) { + ListCell* le1; + int vsegNumber = 1; + int64 totalDataSize = 0; + bool isHashRelationExist = false; + int maxHashBucketNumber = 0; + + foreach (le1, candidateOids) + { + Oid candidateOid = InvalidOid; + candidateOid = lfirst_oid(le1); + + //Relation rel = (Relation)lfirst(le1); + Relation rel = relation_open(candidateOid, AccessShareLock); + if (candidateOid > 0 ) { + GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, + candidateOid); + if(targetPolicy == NULL){ + return GetAnalyzeVSegNumLimit(); + } + if (targetPolicy->nattrs > 0) { + isHashRelationExist = true; + if(maxHashBucketNumber < targetPolicy->bucketnum){ + maxHashBucketNumber = targetPolicy->bucketnum; + } + } + /* + * if no hash relation, we calculate the data size of all the relations. + */ + if (!isHashRelationExist) { + totalDataSize += calculate_relation_size(rel); + } + } + relation_close(rel, AccessShareLock); + } + + if (isHashRelationExist) { + vsegNumber = maxHashBucketNumber; + } else { + /*we allocate one virtual segment for each 128M data */ + totalDataSize >>= 27; + vsegNumber = totalDataSize + 1; + } + Assert(vsegNumber > 0); + /*vsegNumber should be less than GetUtilPartitionNum*/ + if(vsegNumber > GetQueryVsegNum()){ + vsegNumber = GetQueryVsegNum(); + } + + return vsegNumber; +} + +/* * This intermediate routine exists mainly to localize the effects of setjmp * so we don't need to plaster a lot of variables with "volatile". */ @@ -1809,13 +1881,22 @@ DoCopyTo(CopyState cstate) */ if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && cstate->rel->rd_cdbpolicy) { - GpPolicy *target_policy = NULL; int target_segment_num = 0; + /* + * copy hash table use table bucket number + * copy random table use table size. + */ + PartitionNode *pn = get_parts(cstate->rel->rd_id, 0 /*level*/ , + 0 /*parent*/, false /* inctemplate */, CurrentMemoryContext, true /*includesubparts*/); + List *lFullRelOids = NIL; + if(pn){ + lFullRelOids = all_leaf_partition_relids(pn); + lFullRelOids = list_concat(lFullRelOids, all_interior_partition_relids(pn)); /* interior partitions */ + } + lFullRelOids = lappend_oid(lFullRelOids, cstate->rel->rd_id); - target_policy = GpPolicyFetch(CurrentMemoryContext, cstate->rel->rd_id); - Assert(target_policy); - target_segment_num = target_policy->bucketnum; - pfree(target_policy); + target_segment_num = calculate_virtual_segment_number(lFullRelOids); + elog(LOG, "virtual segment number of copy to is: %d\n", target_segment_num); cstate->resource = AllocateResource(QRL_ONCE, 1, 1, target_segment_num, target_segment_num,NULL,0); CopyToDispatch(cstate); @@ -4200,7 +4281,7 @@ CopyFrom(CopyState cstate) if (cstate->oids && file_has_oids) MemTupleSetOid(tuple, resultRelInfo->ri_aoInsertDesc->mt_bind, loaded_oid); } - else if ((relstorage == RELSTORAGE_PARQUET)) + else if (relstorage == RELSTORAGE_PARQUET) { tuple = NULL; } @@ -4231,7 +4312,7 @@ CopyFrom(CopyState cstate) { HeapTuple newtuple; - if((relstorage == RELSTORAGE_PARQUET)) + if(relstorage == RELSTORAGE_PARQUET) { Assert(!tuple); elog(ERROR, "triggers are not supported on tables that use column-oriented storage");
