HAWQ-562. Refactor bucket number of external table.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/7daee400 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/7daee400 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/7daee400 Branch: refs/heads/HAWQ-459 Commit: 7daee4002780797587c3dcca265c646fd0c92e1e Parents: 695606f Author: hzhang2 <[email protected]> Authored: Mon Mar 21 10:26:11 2016 +0800 Committer: hzhang2 <[email protected]> Committed: Mon Mar 21 16:29:42 2016 +0800 ---------------------------------------------------------------------- src/backend/cdb/cdbcopy.c | 2 +- src/backend/cdb/cdbdatalocality.c | 97 ++++++++++++++++++++-------------- src/backend/cdb/cdbfilesplit.c | 67 +++++++---------------- src/backend/commands/tablecmds.c | 11 ++-- src/backend/parser/analyze.c | 2 +- src/include/access/filesplit.h | 2 +- src/include/cdb/cdbdatalocality.h | 1 - 7 files changed, 88 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbcopy.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbcopy.c b/src/backend/cdb/cdbcopy.c index 17ffa96..ee2ff80 100644 --- a/src/backend/cdb/cdbcopy.c +++ b/src/backend/cdb/cdbcopy.c @@ -222,7 +222,7 @@ cdbCopyStart(CdbCopy *c, char *copyCmd, Oid relid, Oid relerror, List *err_aoseg List *scantable_splits = NIL; prepareDispatchedCatalogRelation(q->contextdisp, relid, FALSE, NULL); scantable_splits = AssignAOSegFileSplitToSegment(relid, NIL, - true, c->partition_num, + c->partition_num, scantable_splits); ((CopyStmt *)q->utilityStmt)->scantable_splits = scantable_splits; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbdatalocality.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c index 3f8fcb5..4c67429 100644 --- a/src/backend/cdb/cdbdatalocality.c +++ b/src/backend/cdb/cdbdatalocality.c @@ -34,6 +34,7 @@ #include "access/parquetsegfiles.h" #include "catalog/catalog.h" #include "catalog/catquery.h" +#include "catalog/pg_exttable.h" #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "cdb/cdbdatalocality.h" @@ -296,11 +297,13 @@ typedef struct split_to_segment_mapping_context { int64 split_size; MemoryContext old_memorycontext; MemoryContext datalocality_memorycontext; - int externTableSegNum; //expected virtual segment number when external table exists + int externTableOnClauseSegNum; //expected virtual segment number when external table exists + int externTableLocationSegNum; //expected virtual segment number when external table exists int tableFuncSegNum; //expected virtual segment number when table function exists int hashSegNum; // expected virtual segment number when there is hash table in from clause int randomSegNum; // expected virtual segment number when there is random table in from clause int resultRelationHashSegNum; // expected virtual segment number when hash table as a result relation + int minimum_segment_num; //default is 1. int64 randomRelSize; //all the random relation size int64 hashRelSize; //all the hash relation size @@ -529,13 +532,15 @@ static void init_datalocality_context(split_to_segment_mapping_context *context) context->rtc_context.range_tables = NIL; context->rtc_context.full_range_tables = NIL; - context->externTableSegNum = 0; + context->externTableOnClauseSegNum = 0; + context->externTableLocationSegNum = 0; context->tableFuncSegNum = 0; context->hashSegNum = 0; context->resultRelationHashSegNum = 0; context->randomSegNum = 0; context->randomRelSize = 0; context->hashRelSize = 0; + context->minimum_segment_num = 1; /* * initialize the data distribution * static context. @@ -762,17 +767,24 @@ static void check_keep_hash_and_external_table( /* targetPolicy->bucketnum is bucket number of external table, * whose default value is set to default_segment_num */ - if (context->externTableSegNum == 0) { - context->externTableSegNum = targetPolicy->bucketnum; - } else { - if (context->externTableSegNum < targetPolicy->bucketnum) { - context->externTableSegNum = targetPolicy->bucketnum; - /* - * In this case, two external table join but with different bucket number - * we cannot allocate the right segment number. - */ - //now we just restrict that vseg num > bucket number of external table - //elog(ERROR, "All external tables in one query must have the same bucket number!"); + ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id); + if(extEnrty->isweb){ + if (context->externTableOnClauseSegNum == 0) { + context->externTableOnClauseSegNum = targetPolicy->bucketnum; + } else { + if (context->externTableOnClauseSegNum != targetPolicy->bucketnum) { + /* + * In this case, two external table join but with different bucket number + * we cannot allocate the right segment number. + */ + elog(ERROR, "All external tables in one query must have the same bucket number!"); + } + } + } + else{ + if (context->externTableLocationSegNum < targetPolicy->bucketnum) { + context->externTableLocationSegNum = targetPolicy->bucketnum; + context->minimum_segment_num = targetPolicy->bucketnum; } } } @@ -4074,17 +4086,17 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, &(context.rtc_context.range_tables), &isTableFunctionExists, context.datalocality_memorycontext); - /*Table Function VSeg Number = default_segment_number(configured in GUC) if table function exists, + /* set expected virtual segment number for hash table and external table*/ + /* calculate hashSegNum, externTableSegNum, resultRelationHashSegNum */ + check_keep_hash_and_external_table(&context, query, intoPolicy); + + /*Table Function VSeg Number = default_segment_number(configured in GUC) if table function exists or gpfdist exists, *0 Otherwise. */ if (isTableFunctionExists) { context.tableFuncSegNum = GetUserDefinedFunctionVsegNum(); } - /* set expected virtual segment number for hash table and external table*/ - /* calculate hashSegNum, externTableSegNum, resultRelationHashSegNum */ - check_keep_hash_and_external_table(&context, query, intoPolicy); - /* get block location and calculate relation size*/ get_block_locations_and_claculte_table_size(&context); @@ -4163,13 +4175,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, context.randomSegNum = expected_segment_num_with_max_filecount; } /* Step4 we at least use one segment*/ - if (context.randomSegNum < minimum_segment_num) { - context.randomSegNum = minimum_segment_num; + if (context.randomSegNum < context.minimum_segment_num) { + context.randomSegNum = context.minimum_segment_num; } int maxExpectedNonRandomSegNum = 0; - if (maxExpectedNonRandomSegNum < context.externTableSegNum) - maxExpectedNonRandomSegNum = context.externTableSegNum; if (maxExpectedNonRandomSegNum < context.tableFuncSegNum) maxExpectedNonRandomSegNum = context.tableFuncSegNum; if (maxExpectedNonRandomSegNum < context.hashSegNum) @@ -4183,7 +4193,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, fprintf(fpsegnum, "Result relation hash segment num : %d.\n", context.resultRelationHashSegNum); fprintf(fpsegnum, "\n"); fprintf(fpsegnum, "Table function segment num : %d.\n", context.tableFuncSegNum); - fprintf(fpsegnum, "Extern table segment num : %d.\n", context.externTableSegNum); + fprintf(fpsegnum, "Extern table segment num : %d.\n", context.externTableOnClauseSegNum); fprintf(fpsegnum, "From hash relation segment num : %d.\n", context.hashSegNum); fprintf(fpsegnum, "MaxExpectedNonRandom segment num : %d.\n", maxExpectedNonRandomSegNum); fprintf(fpsegnum, "\n"); @@ -4193,29 +4203,33 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, int maxTargetSegmentNumber = 0; /* we keep resultRelationHashSegNum in the highest priority*/ if (context.resultRelationHashSegNum != 0) { - if (context.resultRelationHashSegNum < context.externTableSegNum - && context.externTableSegNum != 0) { + if ((context.resultRelationHashSegNum != context.externTableOnClauseSegNum + && context.externTableOnClauseSegNum != 0) + || (context.resultRelationHashSegNum < context.externTableLocationSegNum)) { cleanup_allocation_algorithm(&context); elog(ERROR, "Could not allocate enough memory! " "bucket number of result hash table and external table should match each other"); } maxTargetSegmentNumber = context.resultRelationHashSegNum; minTargetSegmentNumber = context.resultRelationHashSegNum; - } else if (maxExpectedNonRandomSegNum > 0) { + } + else if(context.externTableOnClauseSegNum > 0){ /* bucket number of external table must be the same with the number of virtual segments*/ - if (maxExpectedNonRandomSegNum == context.externTableSegNum) { - context.externTableSegNum = - context.externTableSegNum < minimum_segment_num ? - minimum_segment_num : context.externTableSegNum; - maxTargetSegmentNumber = context.externTableSegNum; - minTargetSegmentNumber = context.externTableSegNum; - } else if (maxExpectedNonRandomSegNum == context.hashSegNum) { + if(context.externTableOnClauseSegNum < context.externTableLocationSegNum){ + cleanup_allocation_algorithm(&context); + elog(ERROR, "external table bucket number should match each other"); + } + maxTargetSegmentNumber = context.externTableOnClauseSegNum; + minTargetSegmentNumber = context.externTableOnClauseSegNum; + } + else if (maxExpectedNonRandomSegNum > 0) { + if (maxExpectedNonRandomSegNum == context.hashSegNum) { /* in general, we keep bucket number of hash table equals to the number of virtual segments * but this rule can be broken when there is a large random table in the range tables list */ context.hashSegNum = - context.hashSegNum < minimum_segment_num ? - minimum_segment_num : context.hashSegNum; + context.hashSegNum < context.minimum_segment_num ? + context.minimum_segment_num : context.hashSegNum; double considerRandomWhenHashExistRatio = 1.5; /*if size of random table >1.5 *hash table, we consider relax the restriction of hash bucket number*/ if (context.randomRelSize @@ -4224,7 +4238,7 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, context.randomSegNum = context.hashSegNum; } maxTargetSegmentNumber = context.randomSegNum; - minTargetSegmentNumber = minimum_segment_num; + minTargetSegmentNumber = context.minimum_segment_num; } else { maxTargetSegmentNumber = context.hashSegNum; minTargetSegmentNumber = context.hashSegNum; @@ -4232,17 +4246,17 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, } else if (maxExpectedNonRandomSegNum == context.tableFuncSegNum) { /* if there is a table function, we should at least use tableFuncSegNum virtual segments*/ context.tableFuncSegNum = - context.tableFuncSegNum < minimum_segment_num ? - minimum_segment_num : context.tableFuncSegNum; + context.tableFuncSegNum < context.minimum_segment_num ? + context.minimum_segment_num : context.tableFuncSegNum; if (context.randomSegNum < context.tableFuncSegNum) { context.randomSegNum = context.tableFuncSegNum; } maxTargetSegmentNumber = context.randomSegNum; - minTargetSegmentNumber = minimum_segment_num; + minTargetSegmentNumber = context.minimum_segment_num; } } else { maxTargetSegmentNumber = context.randomSegNum; - minTargetSegmentNumber = minimum_segment_num; + minTargetSegmentNumber = context.minimum_segment_num; } if (enforce_virtual_segment_number > 0) { @@ -4254,6 +4268,9 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife, maxTargetSegmentNumber = fixedVsegNum; minTargetSegmentNumber = fixedVsegNum; } + if(maxTargetSegmentNumber < minTargetSegmentNumber){ + maxTargetSegmentNumber = minTargetSegmentNumber; + } uint64_t before_rm_allocate_resource = gettime_microsec(); /* cost is use by RM to balance workload between hosts. the cost is at least one block size*/ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/cdb/cdbfilesplit.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/cdbfilesplit.c b/src/backend/cdb/cdbfilesplit.c index 4a2ad04..128f949 100644 --- a/src/backend/cdb/cdbfilesplit.c +++ b/src/backend/cdb/cdbfilesplit.c @@ -44,8 +44,7 @@ static void fileSplit_free(FileSplitNode *split); static List * computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits, - List *segment_infos, bool keep_hash_policy, - int target_segment_num); + List *segment_infos, int target_segment_num); static List * postProcessSplitsPerSegment(List *oldSplitToSegmentMaps); @@ -54,7 +53,7 @@ static int fileSplitNodeCmp(const void *left, const void *right); static SegFileSplitMapNode * -AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy, int target_segment_num) +AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num) { SegFileSplitMapNode *result = NULL; char storageChar; @@ -108,7 +107,7 @@ AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_ha * data locality into account. */ splitToSegmentMaps = computeSplitToSegmentMaps(relid, targetPolicy, splits, segment_infos, - keep_hash_policy, target_segment_num); + target_segment_num); Assert(splitToSegmentMaps); list_free(splits); @@ -118,7 +117,7 @@ AssignSingleAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_ha } List * -AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy, int target_segment_num, List *existings) +AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num, List *existings) { SegFileSplitMapNode *result = NULL; @@ -130,7 +129,7 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol foreach (child, children) { Oid myrelid = lfirst_oid(child); - result = AssignSingleAOSegFileSplitToSegment(myrelid, segment_infos, keep_hash_policy, + result = AssignSingleAOSegFileSplitToSegment(myrelid, segment_infos, target_segment_num); existings = lappend(existings, result); } @@ -140,7 +139,7 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol } else { - result = AssignSingleAOSegFileSplitToSegment(relid, segment_infos, keep_hash_policy, + result = AssignSingleAOSegFileSplitToSegment(relid, segment_infos, target_segment_num); existings = lappend(existings, result); } @@ -149,25 +148,14 @@ AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_pol } /* - * If keep_hash_policy is false, then the table is treated as randomly - * distributed, regardless of its distribution policy. * * If segment_infos is NIL, then data locality is not needed. */ static List * computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits, List *segment_infos, - bool keep_hash_policy, int target_segment_num) + int target_segment_num) { List *splitToSegmentMaps = NIL; - if (keep_hash_policy) - { - /* - * If we want to keep the hash distribution policy, - * we need to make sure the data partition number - * equals to the number of target segments. - */ - Assert(targetPolicy->bucketnum == target_segment_num); - } if (segment_infos != NIL) { @@ -184,37 +172,22 @@ computeSplitToSegmentMaps(Oid relid, GpPolicy *targetPolicy, List *splits, List */ if (segment_infos == NIL) { - /* - * Sub case 1: we need to keep the hash distribution policy. - * - * In this case, we just need consider the segment file number. - * This case works exactly the same as HAWQ1.x. - */ - if (keep_hash_policy) + ListCell *lc; + int i; + for (i = 0; i < target_segment_num; i++) { - ListCell *lc; - int i; - for (i = 0; i < target_segment_num; i++) - { - splitToSegmentMaps = lappend(splitToSegmentMaps, NIL); - } - - foreach(lc, splits) - { - int assigned_seg_no; - ListCell *per_seg_split; - FileSplit split = (FileSplitNode *)lfirst(lc); - Assert(split); - assigned_seg_no = (split->segno - 1) % target_segment_num; - per_seg_split = list_nth_cell(splitToSegmentMaps, assigned_seg_no); - lfirst(per_seg_split) = lappend((List *)lfirst(per_seg_split), split); - } + splitToSegmentMaps = lappend(splitToSegmentMaps, NIL); } - else + + foreach(lc, splits) { - ereport(ERROR, - (errcode(ERRCODE_CDB_FEATURE_NOT_YET), - errmsg("Assigning splits to segment without keeping hash distribution policy is not allowed"))); + int assigned_seg_no; + ListCell *per_seg_split; + FileSplit split = (FileSplitNode *)lfirst(lc); + Assert(split); + assigned_seg_no = (split->segno - 1) % target_segment_num; + per_seg_split = list_nth_cell(splitToSegmentMaps, assigned_seg_no); + lfirst(per_seg_split) = lappend((List *)lfirst(per_seg_split), split); } } else http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/commands/tablecmds.c ---------------------------------------------------------------------- diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c47b828..259821b 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -968,7 +968,12 @@ DefineExternalRelation(CreateExternalStmt *createExtStmt) locationUris = transformLocationUris(exttypeDesc->location_list, createExtStmt->formatOpts, isweb, iswritable); - + int locLength = list_length(exttypeDesc->location_list); + if (createStmt->policy && locLength > 0) + { + createStmt->policy->bucketnum = locLength; + } + break; case EXTTBL_TYPE_EXECUTE: @@ -6437,7 +6442,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) tab->relid, false, NULL); - ar_tab->scantable_splits = AssignAOSegFileSplitToSegment(tab->relid, NIL, true, + ar_tab->scantable_splits = AssignAOSegFileSplitToSegment(tab->relid, NIL, target_segment_num, ar_tab->scantable_splits); /* * Specify the segno directly as we don't have segno mapping here. @@ -16719,7 +16724,7 @@ ATPExecPartSplit(Relation rel, * Dispatch split-related metadata. */ scantable_splits = AssignAOSegFileSplitToSegment((Oid)intVal((Value *)pc->partid), - NIL, true, target_segment_num, scantable_splits); + NIL, target_segment_num, scantable_splits); pc->scantable_splits = scantable_splits; pc->newpart_aosegnos = segment_segnos; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/backend/parser/analyze.c ---------------------------------------------------------------------- diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index f17c90a..1f48c36 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -2724,7 +2724,7 @@ transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt, sizeof(p->attrs[0])); p->ptype = POLICYTYPE_PARTITIONED; p->nattrs = 0; - p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetDefaultPartitionNum()); + p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum()); p->attrs[0] = 1; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/include/access/filesplit.h ---------------------------------------------------------------------- diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h index e075b18..e627b07 100644 --- a/src/include/access/filesplit.h +++ b/src/include/access/filesplit.h @@ -65,7 +65,7 @@ typedef SegFileSplitMapNode *SegFileSplitMap; * table data. */ List * -AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, bool keep_hash_policy, int target_segment_num, List *existings); +AssignAOSegFileSplitToSegment(Oid relid, List *segment_infos, int target_segment_num, List *existings); /* * Given the relid, and the segment index, return the splits assigned http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/7daee400/src/include/cdb/cdbdatalocality.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/cdbdatalocality.h b/src/include/cdb/cdbdatalocality.h index 224cb53..21e4c68 100644 --- a/src/include/cdb/cdbdatalocality.h +++ b/src/include/cdb/cdbdatalocality.h @@ -33,7 +33,6 @@ #include "nodes/parsenodes.h" #include "executor/execdesc.h" -#define minimum_segment_num 1 /* * structure containing information about data residence * at the host.
