Revert "Revert "HAWQ-532. Optimise vseg number for copy to statement.""

This reverts commit 86d9b0c03de349398ddb0337f7ac5dac3b58d2b9.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/695606ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/695606ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/695606ff

Branch: refs/heads/HAWQ-459
Commit: 695606ff442afcf4e9317451e02e3a6a31f900ff
Parents: 802d537
Author: hzhang2 <[email protected]>
Authored: Fri Mar 18 14:24:41 2016 +0800
Committer: hzhang2 <[email protected]>
Committed: Mon Mar 21 16:29:41 2016 +0800

----------------------------------------------------------------------
 src/backend/commands/copy.c | 97 ++++++++++++++++++++++++++++++++++++----
 1 file changed, 89 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/695606ff/src/backend/commands/copy.c
----------------------------------------------------------------------
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index bcd5384..e37fb83 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -91,6 +91,11 @@
 #include "postmaster/autovacuum.h"
 #include "cdb/dispatcher.h"
 
+/*
+ * in dbsize.c
+ */
+extern int64 calculate_relation_size(Relation rel);
+
 /* DestReceiver for COPY (SELECT) TO */
 typedef struct
 {
@@ -137,6 +142,7 @@ static void copy_in_error_callback(void *arg);
 static void CopyInitPartitioningState(EState *estate);
 static void CopyInitDataParser(CopyState cstate);
 static bool CopyCheckIsLastLine(CopyState cstate);
+static int calculate_virtual_segment_number(List* candidateRelations);
 
 /* ==========================================================================
  * The follwing macros aid in major refactoring of data processing code (in
@@ -1556,7 +1562,16 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 
                        target_policy = GpPolicyFetch(CurrentMemoryContext, 
relid);
                        Assert(target_policy);
-                       target_segment_num = target_policy->bucketnum;
+                       /*
+                        * For hash table we use table bucket number to request 
vsegs
+                        * For random table, we use a fixed GUC value to 
request vsegs.
+                        */
+                       if(target_policy->nattrs > 0){
+                               target_segment_num = target_policy->bucketnum;
+                       }
+                       else{
+                               target_segment_num = 
hawq_rm_nvseg_for_copy_from_perquery;
+                       }
                        pfree(target_policy);
 
                        cstate->resource = AllocateResource(QRL_ONCE, 1, 1, 
target_segment_num, target_segment_num,NULL,0);
@@ -1749,6 +1764,63 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 }
 
 /*
+ * calculate virtual segment number for copy statement.
+ * if there is hash distributed relations exist, use the max bucket number.
+ * if all relation are random, use the data size to determine vseg number.
+ */
+static int calculate_virtual_segment_number(List* candidateOids) {
+       ListCell* le1;
+       int vsegNumber = 1;
+       int64 totalDataSize = 0;
+       bool isHashRelationExist = false;
+       int maxHashBucketNumber = 0;
+
+       foreach (le1, candidateOids)
+       {
+               Oid                             candidateOid      = InvalidOid;
+               candidateOid = lfirst_oid(le1);
+
+               //Relation rel = (Relation)lfirst(le1);
+               Relation rel = relation_open(candidateOid, AccessShareLock);
+               if (candidateOid > 0 ) {
+                       GpPolicy *targetPolicy = 
GpPolicyFetch(CurrentMemoryContext,
+                                       candidateOid);
+                       if(targetPolicy == NULL){
+                               return GetAnalyzeVSegNumLimit();
+                       }
+                       if (targetPolicy->nattrs > 0) {
+                               isHashRelationExist = true;
+                               if(maxHashBucketNumber < 
targetPolicy->bucketnum){
+                                       maxHashBucketNumber = 
targetPolicy->bucketnum;
+                               }
+                       }
+                       /*
+                        * if no hash relation, we calculate the data size of 
all the relations.
+                        */
+                       if (!isHashRelationExist) {
+                               totalDataSize += calculate_relation_size(rel);
+                       }
+               }
+               relation_close(rel, AccessShareLock);
+       }
+
+       if (isHashRelationExist) {
+               vsegNumber = maxHashBucketNumber;
+       } else {
+               /*we allocate one virtual segment for each 128M data */
+               totalDataSize >>= 27;
+               vsegNumber = totalDataSize + 1;
+       }
+       Assert(vsegNumber > 0);
+       /*vsegNumber should be less than GetUtilPartitionNum*/
+       if(vsegNumber > GetQueryVsegNum()){
+               vsegNumber = GetQueryVsegNum();
+       }
+
+       return vsegNumber;
+}
+
+/*
  * This intermediate routine exists mainly to localize the effects of setjmp
  * so we don't need to plaster a lot of variables with "volatile".
  */
@@ -1809,13 +1881,22 @@ DoCopyTo(CopyState cstate)
                 */
                if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && 
cstate->rel->rd_cdbpolicy)
                {
-                       GpPolicy *target_policy = NULL;
                        int target_segment_num = 0;
+                       /*
+                        * copy hash table use table bucket number
+                        * copy random table use table size.
+                        */
+                       PartitionNode *pn = get_parts(cstate->rel->rd_id, 0 
/*level*/ ,
+                                                                               
                        0 /*parent*/, false /* inctemplate */, 
CurrentMemoryContext, true /*includesubparts*/);
+                       List            *lFullRelOids = NIL;
+                       if(pn){
+                               lFullRelOids = all_leaf_partition_relids(pn);
+                               lFullRelOids = list_concat(lFullRelOids, 
all_interior_partition_relids(pn)); /* interior partitions */
+                       }
+                       lFullRelOids = lappend_oid(lFullRelOids, 
cstate->rel->rd_id);
 
-                       target_policy = GpPolicyFetch(CurrentMemoryContext, 
cstate->rel->rd_id);
-                       Assert(target_policy);
-                       target_segment_num = target_policy->bucketnum;
-                       pfree(target_policy);
+                       target_segment_num = 
calculate_virtual_segment_number(lFullRelOids);
+                       elog(LOG, "virtual segment number of copy to is: %d\n", 
target_segment_num);
 
                        cstate->resource = AllocateResource(QRL_ONCE, 1, 1, 
target_segment_num, target_segment_num,NULL,0);
                        CopyToDispatch(cstate);
@@ -4200,7 +4281,7 @@ CopyFrom(CopyState cstate)
                                        if (cstate->oids && file_has_oids)
                                                MemTupleSetOid(tuple, 
resultRelInfo->ri_aoInsertDesc->mt_bind, loaded_oid);
                                }
-                               else if ((relstorage == RELSTORAGE_PARQUET))
+                               else if (relstorage == RELSTORAGE_PARQUET)
                                {
                     tuple = NULL;
                                }
@@ -4231,7 +4312,7 @@ CopyFrom(CopyState cstate)
                                {
                                        HeapTuple       newtuple;
 
-                                       if((relstorage == RELSTORAGE_PARQUET))
+                                       if(relstorage == RELSTORAGE_PARQUET)
                                        {
                                                Assert(!tuple);
                                                elog(ERROR, "triggers are not 
supported on tables that use column-oriented storage");

Reply via email to