HAWQ-529. Allocate resource for udf in resource negotiator.

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/4d997b57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/4d997b57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/4d997b57

Branch: refs/heads/HAWQ-459
Commit: 4d997b5714bb09ed9020a56acfce13b1d6fd637c
Parents: 2c033d9
Author: hzhang2 <[email protected]>
Authored: Fri Mar 18 10:14:42 2016 +0800
Committer: hzhang2 <[email protected]>
Committed: Fri Mar 18 10:42:44 2016 +0800

----------------------------------------------------------------------
 src/backend/cdb/cdbdatalocality.c    | 213 ++++++++++++++++++++----------
 src/backend/executor/spi.c           |  71 ++++++++--
 src/backend/optimizer/plan/planner.c |  59 ++++++++-
 src/backend/optimizer/util/clauses.c |  11 +-
 src/backend/optimizer/util/walkers.c |   1 +
 src/include/cdb/cdbdatalocality.h    |  19 ++-
 src/include/executor/spi.h           |   7 +
 src/include/optimizer/planner.h      |   4 +
 8 files changed, 294 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c 
b/src/backend/cdb/cdbdatalocality.c
index eec87b4..3f8fcb5 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -33,6 +33,9 @@
 #include "access/filesplit.h"
 #include "access/parquetsegfiles.h"
 #include "catalog/catalog.h"
+#include "catalog/catquery.h"
+#include "catalog/pg_inherits.h"
+#include "catalog/pg_proc.h"
 #include "cdb/cdbdatalocality.h"
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
@@ -41,6 +44,7 @@
 #include "utils/tqual.h"
 #include "utils/memutils.h"
 #include "executor/execdesc.h"
+#include "executor/spi.h"
 #include "nodes/nodes.h"
 #include "nodes/parsenodes.h"
 #include "optimizer/walkers.h"
@@ -54,11 +58,7 @@
 #include "catalog/pg_proc.h"
 #include "postgres.h"
 #include "resourcemanager/utils/hashtable.h"
-#include "catalog/pg_inherits.h"
-//#include "utils/misc/guc.c"
 
-#define PRONAME 1
-#define PROISAGG 5
 /* We need to build a mapping from host name to host index */
 
 extern bool            optimizer; /* Enable the optimizer */
@@ -376,7 +376,7 @@ static Block_Host_Index * update_data_dist_stat(
 static HostDataVolumeInfo *search_host_in_stat_context(
                split_to_segment_mapping_context *context, char *hostname);
 
-static bool IsAggFunction(char* funcName);
+static bool IsBuildInFunction(Oid funcOid);
 
 static bool allocate_hash_relation(Relation_Data* rel_data,
                Assignment_Log_Context *log_context, TargetSegmentIDMap* idMap,
@@ -638,39 +638,35 @@ static void collect_range_tables(Query *query, List* 
full_range_table,
 /*
  *
  */
-static bool IsAggFunction(char* funcName) {
-       if (funcName == NULL) {
-               return false;
-       }
-       Relation pg_proc_rel;
-       TupleDesc pg_proc_dsc;
-       HeapTuple tuple;
-       SysScanDesc pg_proc_scan;
-
-       pg_proc_rel = heap_open(ProcedureRelationId, AccessShareLock);
-       pg_proc_dsc = RelationGetDescr(pg_proc_rel);
-       ScanKeyData skey;
+static bool IsBuildInFunction(Oid foid) {
 
-       ScanKeyInit(&skey, PRONAME, BTEqualStrategyNumber,
-       F_NAMEEQ, CStringGetDatum(funcName));
-
-       pg_proc_scan = systable_beginscan(pg_proc_rel, InvalidOid, FALSE,
-                       ActiveSnapshot, 1, &skey);
-       while (HeapTupleIsValid(tuple = systable_getnext(pg_proc_scan))) {
+       cqContext  *pcqCtx;
+       HeapTuple       procedureTuple;
+       Form_pg_proc procedureStruct;
 
-               bool isAgg = DatumGetBool(fastgetattr(tuple, PROISAGG, 
pg_proc_dsc, NULL));
-               systable_endscan(pg_proc_scan);
-               heap_close(pg_proc_rel, AccessShareLock);
-               if (isAgg) {
-                       return true;
-               } else {
-                       return false;
-               }
+       /*
+        * get the procedure tuple corresponding to the given function Oid
+        */
+       pcqCtx = caql_beginscan(
+                       NULL,
+                       cql("SELECT * FROM pg_proc "
+                               " WHERE oid = :1 ",
+                               ObjectIdGetDatum(foid)));
+
+       procedureTuple = caql_getnext(pcqCtx);
+
+       if (!HeapTupleIsValid(procedureTuple))
+               elog(ERROR, "cache lookup failed for function %u", foid);
+       procedureStruct = (Form_pg_proc) GETSTRUCT(procedureTuple);
+       caql_endscan(pcqCtx);
+       /* we treat proc namespace = 11 to build in function.*/
+       if (procedureStruct->pronamespace == 11) {
+               return true;
+       } else {
+               return false;
        }
-       systable_endscan(pg_proc_scan);
-       heap_close(pg_proc_rel, AccessShareLock);
-       return true;
 }
+
 /*
  *
  */
@@ -684,25 +680,6 @@ static void 
convert_range_tables_to_oids_and_check_table_functions(List **range_
        foreach(old_lc, *range_tables)
        {
                RangeTblEntry *entry = (RangeTblEntry *) lfirst(old_lc);
-               if (entry->rtekind == RTE_FUNCTION || entry->rtekind == 
RTE_TABLEFUNCTION) {
-                       *isTableFunctionExists = true;
-               }
-               if (entry->rtekind == RTE_SUBQUERY) {
-                       Query* subQuery = entry->subquery;
-                       ListCell *lc;
-                       foreach(lc, subQuery->targetList)
-                       {
-                               TargetEntry *te = (TargetEntry *) lfirst(lc);
-                               bool isAggFunc = IsAggFunction(te->resname);
-                               // if target list of subquery contains non 
aggregate function,
-                               // then we consider the query contains and use 
default_segment_num guc
-                               // as the number of virtual segment
-                               if (!isAggFunc) {
-                                       *isTableFunctionExists = true;
-                               }
-                       }
-
-               }
                if (entry->rtekind != RTE_RELATION) {
                        continue;
                }
@@ -1833,7 +1810,7 @@ static int 
select_random_host_algorithm(Relation_Assignment_Context *context,
        }
        if (debug_fake_datalocality) {
                fprintf(fp,
-                               "cur_size_of_whole_query is:"INT64_FORMAT", 
avg_size_of_whole_query is: %.3f",
+                               "cur_size_of_whole_query is:%.0f, 
avg_size_of_whole_query is: %.3f",
                                context->totalvols_with_penalty[minindex] + 
net_disk_ratio * splitsize,
                                context->avg_size_of_whole_query);
        }
@@ -2393,7 +2370,7 @@ static Relation_File** 
change_file_order_based_on_continuity(
                Relation_Data *rel_data, TargetSegmentIDMap* idMap, int 
host_num,
                int* fileCount, Relation_Assignment_Context 
*assignment_context) {
 
-       Relation_File** file_vector;
+       Relation_File** file_vector = NULL;
        int* isBlockContinue = (int *) palloc(sizeof(int) * host_num);
        for (int i = 0; i < host_num; i++) {
                isBlockContinue[i] = 0;
@@ -3429,45 +3406,45 @@ static void 
print_datalocality_overall_log_information(SplitAllocResult *result,
                        if(log_context->minSegmentNumofHost > 0 ){
                                fprintf(fpratio, 
"segmentnumber_perhost_max/min=%.2f\n", 
(double)(log_context->maxSegmentNumofHost / log_context->minSegmentNumofHost));
                        }else{
-                               fprintf(fpratio, 
"segmentnumber_perhost_max/min="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segmentnumber_perhost_max/min=%lld\n", INT64_MAX);
                        }
                        if(log_context->avgSegmentNumofHost > 0 ){
                                fprintf(fpratio, 
"segmentnumber_perhost_max/avg=%.2f\n", 
(double)(log_context->maxSegmentNumofHost / log_context->avgSegmentNumofHost));
                        }else{
-                               fprintf(fpratio, 
"segmentnumber_perhost_max/avg="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segmentnumber_perhost_max/avg=%lld\n", INT64_MAX);
                        }
 
                        if (log_context->minSizeSegmentOverall > 0){
                                fprintf(fpratio, 
"segments_size_max/min=%.5f\n", (double)log_context->maxSizeSegmentOverall / 
(double)log_context->minSizeSegmentOverall);
                        }else{
-                               fprintf(fpratio, 
"segments_size_max/min="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segments_size_max/min=%lld\n", INT64_MAX);
                        }
                        if (log_context->avgSizeOverall > 0){
                                fprintf(fpratio, 
"segments_size_max/avg=%.5f\n", log_context->maxSizeSegmentOverall / 
log_context->avgSizeOverall);
                        }else{
-                               fprintf(fpratio, 
"segments_size_max/avg="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segments_size_max/avg=%lld\n", INT64_MAX);
                        }
 
                        if (log_context->minSizeSegmentOverallPenalty > 0){
                                fprintf(fpratio, 
"segments_size_penalty_max/min=%.5f\n",(double)log_context->maxSizeSegmentOverallPenalty
 / (double)log_context->minSizeSegmentOverallPenalty);
                        }else{
-                               fprintf(fpratio, 
"segments_size_penalty_max/min="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segments_size_penalty_max/min=%lld\n", INT64_MAX);
                        }
                        if (log_context->avgSizeOverallPenalty > 0){
                                fprintf(fpratio, 
"segments_size_penalty_max/avg=%.5f\n",log_context->maxSizeSegmentOverallPenalty
 / log_context->avgSizeOverallPenalty);
                        }else{
-                               fprintf(fpratio, 
"segments_size_penalty_max/avg="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, 
"segments_size_penalty_max/avg=%lld\n", INT64_MAX);
                        }
 
                        if (log_context->minContinuityOverall > 0){
                                fprintf(fpratio, 
"continuity_max/min=%.5f\n",log_context->maxContinuityOverall / 
log_context->minContinuityOverall);
                        }else{
-                               fprintf(fpratio, 
"continuity_max/min="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, "continuity_max/min=%lld\n", 
INT64_MAX);
                        }
                        if (log_context->avgContinuityOverall > 0){
                                fprintf(fpratio, 
"continuity_max/avg=%.5f\n",log_context->maxContinuityOverall / 
log_context->avgContinuityOverall);
                        }else{
-                               fprintf(fpratio, 
"continuity_max/avg="INT64_FORMAT"\n", INT64_MAX);
+                               fprintf(fpratio, "continuity_max/avg=%lld\n", 
INT64_MAX);
                        }
                        fflush(fpratio);
                        fclose(fpratio);
@@ -3976,11 +3953,54 @@ static void cleanup_allocation_algorithm(
 }
 
 /*
+ * udf_collector_walker: the routine to file udfs.
+ */
+bool udf_collector_walker(Node *node,
+               udf_collector_context *context) {
+       if (node == NULL) {
+               return false;
+       }
+
+       if (IsA(node, Query)) {
+               return query_tree_walker((Query *) node, udf_collector_walker,
+                               (void *) context,
+                               QTW_EXAMINE_RTES);
+       }
+
+       /*For Aggref, we don't consider it as udf.*/
+
+       if(IsA(node,FuncExpr)){
+               if(!IsBuildInFunction(((FuncExpr *) node)->funcid)){
+                       context->udf_exist = true;
+               }
+               return false;
+       }
+
+       return expression_tree_walker(node, udf_collector_walker,
+                       (void *) context);
+
+       return false;
+}
+
+/*
+ * find_udf: collect all udf, and store them into the udf_collector_context.
+ */
+void find_udf(Query *query, udf_collector_context *context) {
+
+       query_tree_walker(query, udf_collector_walker, (void *) context,
+       QTW_EXAMINE_RTES);
+
+       return;
+}
+
+
+/*
  * calculate_planner_segment_num
+ * fixedVsegNum is used by PBE, since all the execute should use the same 
number of vsegs.
  */
 SplitAllocResult *
 calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-               List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum) {
+               List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int 
fixedVsegNum) {
        SplitAllocResult *result = NULL;
        QueryResource *resource = NULL;
        QueryResourceParameters *resource_parameters = NULL;
@@ -3989,14 +4009,14 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
        List *alloc_result = NIL;
        split_to_segment_mapping_context context;
 
-       int planner_segments = -1; /*virtual segments number for explain 
statement */
+       int planner_segments = 0; /*virtual segments number for explain 
statement */
 
        result = (SplitAllocResult *) palloc(sizeof(SplitAllocResult));
        result->resource = NULL;
        result->resource_parameters = NULL;
        result->alloc_results = NIL;
        result->relsType = NIL;
-       result->planner_segments = -1;
+       result->planner_segments = 0;
        result->datalocalityInfo = makeStringInfo();
     result->datalocalityTime = 0;
 
@@ -4014,7 +4034,7 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
                result->resource_parameters = NULL;
                result->alloc_results = NIL;
                result->relsType = NIL;
-               result->planner_segments = -1;
+               result->planner_segments = 0;
                return result;
        }
 
@@ -4039,6 +4059,11 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
                 * 5 data size of random "from" relation
                 */
 
+               udf_collector_context udf_context;
+               udf_context.udf_exist = false;
+
+               find_udf(query, &udf_context);
+               isTableFunctionExists = udf_context.udf_exist;
                /*convert range table list to oid list and check whether table 
function exists
                 *we keep a full range table list and a range table list 
without result relation separately
                 */
@@ -4065,7 +4090,15 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
 
                /*use inherit resource*/
                if (resourceLife == QRL_INHERIT) {
-                       resource = AllocateResource(resourceLife, sliceNum, 0, 
0, 0, NULL, 0);
+
+                       if ( SPI_IsInPrepare() && (GetActiveQueryResource() == 
NULL) )
+                       {
+                               resource = NULL;
+                       }
+                       else
+                       {
+                               resource = AllocateResource(resourceLife, 
sliceNum, 0, 0, 0, NULL, 0);
+                       }
 
                        saveQueryResourceParameters(
                                                        resource_parameters,  
/* resource_parameters */
@@ -4216,6 +4249,11 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
                                maxTargetSegmentNumber = 
enforce_virtual_segment_number;
                                minTargetSegmentNumber = 
enforce_virtual_segment_number;
                        }
+                       /* in PBE mode, the execute should use the same vseg 
number. */
+                       if(fixedVsegNum > 0 ){
+                               maxTargetSegmentNumber = fixedVsegNum;
+                               minTargetSegmentNumber = fixedVsegNum;
+                       }
                        uint64_t before_rm_allocate_resource = 
gettime_microsec();
 
                        /* cost is use by RM to balance workload between hosts. 
the cost is at least one block size*/
@@ -4223,9 +4261,40 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
                        mincost <<= 20;
                        int64 queryCost = context.total_size < mincost ? 
mincost : context.total_size;
                        if (QRL_NONE != resourceLife) {
-                               resource = AllocateResource(QRL_ONCE, sliceNum, 
queryCost,
-                                               maxTargetSegmentNumber, 
minTargetSegmentNumber,
-                                               
context.host_context.hostnameVolInfos, context.host_context.size);
+
+                               if (SPI_IsInPrepare())
+                               {
+                                       resource = NULL;
+                                       /*
+                                        * prepare need to get resource quota 
from RM
+                                        * and pass quota(planner_segments) to 
Orca or Planner to generate plan
+                                        * the following executes(in PBE) 
should reallocate the same number
+                                        * of resources.
+                                        */
+                                       uint32 seg_num;
+                                       uint32 seg_num_min;
+                                       uint32 seg_memory_mb;
+                                       double seg_core;
+
+                                       GetResourceQuota(maxTargetSegmentNumber,
+                                                        minTargetSegmentNumber,
+                                                        &seg_num,
+                                                        &seg_num_min,
+                                                        &seg_memory_mb,
+                                                        &seg_core);
+
+                                       planner_segments = seg_num;
+                                       minTargetSegmentNumber = 
planner_segments;
+                                       maxTargetSegmentNumber = 
planner_segments;
+                               }
+                               else
+                               {
+                                       resource = AllocateResource(QRL_ONCE, 
sliceNum, queryCost,
+                                                                   
maxTargetSegmentNumber,
+                                                                   
minTargetSegmentNumber,
+                                                                   
context.host_context.hostnameVolInfos,
+                                                                   
context.host_context.size);
+                               }
 
                                saveQueryResourceParameters(
                                                                
resource_parameters,                   /* resource_parameters */
@@ -4258,7 +4327,7 @@ calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
 
                        if (resource == NULL) {
                                result->resource = NULL;
-                               result->resource_parameters = NULL;
+                               result->resource_parameters = 
resource_parameters;
                                result->alloc_results = NIL;
                                result->relsType = NIL;
                                result->planner_segments = planner_segments;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/executor/spi.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 07812aa..d0029fc 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -46,6 +46,7 @@
 #include "cdb/memquota.h"
 #include "executor/nodeFunctionscan.h"
 #include "nodes/stack.h"
+#include "cdb/cdbdatalocality.h"
 
 extern char *savedSeqServerHost;
 extern int savedSeqServerPort;
@@ -74,6 +75,7 @@ static int    _SPI_curid = -1;
 
 static PGconn *_QD_conn = NULL; /* To call back to the QD for SQL execution */
 static char *_QD_currently_prepared_stmt = NULL;
+static int SPI_prepare_depth = 0;
 
 static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan);
 
@@ -106,6 +108,32 @@ static bool _SPI_checktuples(void);
 
 /* =================== interface functions =================== */
 
+bool SPI_IsInPrepare(void)
+{
+       if (SPI_prepare_depth > 0)
+       {
+               return true;
+       }
+       else if (SPI_prepare_depth < 0)
+       {
+               elog(ERROR, "Invalid SPI_prepare_depth %d while getting SPI 
prepare depth",
+                           SPI_prepare_depth);
+       }
+
+       return false;
+}
+
+void SPI_IncreasePrepareDepth(void)
+{
+       SPI_prepare_depth++;
+}
+
+void SPI_DecreasePrepareDepth(void)
+{
+       SPI_prepare_depth--;
+}
+
+
 int
 SPI_connect(void)
 {
@@ -566,6 +594,8 @@ SPI_prepare(const char *src, int nargs, Oid *argtypes)
        _SPI_plan       plan;
        _SPI_plan  *result;
 
+       SPI_IncreasePrepareDepth();
+
        if (src == NULL || nargs < 0 || (nargs > 0 && argtypes == NULL))
        {
                SPI_result = SPI_ERROR_ARGUMENT;
@@ -591,9 +621,13 @@ SPI_prepare(const char *src, int nargs, Oid *argtypes)
 
                /* copy plan to procedure context */
                result = _SPI_copy_plan(&plan, _SPI_CPLAN_PROCXT);
+
+               SPI_DecreasePrepareDepth();
        }
        PG_CATCH();
        {
+               SPI_DecreasePrepareDepth();
+
                _SPI_end_call(true);
                PG_RE_THROW();
        }
@@ -1112,6 +1146,20 @@ SPI_cursor_open(const char *name, SPIPlanPtr plan,
        qtlist = copyObject(qtlist);
        ptlist = copyObject(ptlist);
 
+       PlannedStmt* stmt = (PlannedStmt*)linitial(ptlist);
+
+       if ( (Gp_role == GP_ROLE_DISPATCH) &&
+                        (stmt->resource_parameters != NULL) )
+       {
+               /*
+                * Now, we want to allocate resource.
+                */
+               stmt->resource = 
AllocateResource(stmt->resource_parameters->life, 
stmt->resource_parameters->slice_size,
+                               stmt->resource_parameters->iobytes, 
stmt->resource_parameters->max_target_segment_num,
+                               
stmt->resource_parameters->min_target_segment_num, 
stmt->resource_parameters->vol_info,
+                               stmt->resource_parameters->vol_info_size);
+       }
+
        /* If the plan has parameters, set them up */
        if (spiplan->nargs > 0)
        {
@@ -1812,22 +1860,21 @@ _SPI_execute_plan(_SPI_plan * plan, Datum *Values, 
const char *Nulls,
                                 * We only allocate resource for multiple 
executions of queries, NOT for utility commands.
                                 * SELECT/INSERT are supported at present.
                                 */
-                               if( (queryTree->commandType == CMD_SELECT) ||
-                                   (queryTree->commandType == CMD_INSERT) )
+                               if((queryTree->commandType == CMD_SELECT) ||
+                                               (queryTree->commandType == 
CMD_INSERT))
                                {
-                                       if ( (Gp_role == GP_ROLE_DISPATCH) &&
-                                            (stmt->resource == NULL) &&
-                                            (stmt->resource_parameters != 
NULL) )
+                                       if ((Gp_role == GP_ROLE_DISPATCH) &&
+                                                       (stmt->resource == 
NULL) &&
+                                                       
(stmt->resource_parameters != NULL))
                                        {
                                                stmt->resource = 
AllocateResource(stmt->resource_parameters->life,
-                                                                       
stmt->resource_parameters->slice_size,
-                                                                       
stmt->resource_parameters->iobytes,
-                                                                       
stmt->resource_parameters->max_target_segment_num,
-                                                                       
stmt->resource_parameters->min_target_segment_num,
-                                                                       
stmt->resource_parameters->vol_info,
-                                                                       
stmt->resource_parameters->vol_info_size);
+                                                               
stmt->resource_parameters->slice_size,
+                                                               
stmt->resource_parameters->iobytes,
+                                                               
stmt->resource_parameters->max_target_segment_num,
+                                                               
stmt->resource_parameters->min_target_segment_num,
+                                                               
stmt->resource_parameters->vol_info,
+                                                               
stmt->resource_parameters->vol_info_size);
                                        }
-
                                        originalStmt->resource = NULL;
                                }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/plan/planner.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/planner.c 
b/src/backend/optimizer/plan/planner.c
index 6b2e13e..ebad24f 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -91,6 +91,8 @@ planner_hook_type planner_hook = NULL;
 
 ParamListInfo PlannerBoundParamList = NULL;            /* current boundParams 
*/
 
+static int PlanningDepth = 0;          /* Planning depth */
+
 /* Expression kind codes for preprocess_expression */
 #define EXPRKIND_QUAL                  0
 #define EXPRKIND_TARGET                        1
@@ -165,6 +167,32 @@ static void sort_canonical_gs_list(List *gs, int *p_nsets, 
Bitmapset ***p_sets);
 static Plan *pushdown_preliminary_limit(Plan *plan, Node *limitCount, int64 
count_est, Node *limitOffset, int64 offset_est);
 bool is_dummy_plan(Plan *plan);
 
+
+bool is_in_planning_phase(void)
+{
+       if (PlanningDepth > 0)
+       {
+               return true;
+       }
+       else if (PlanningDepth < 0)
+       {
+               elog(ERROR, "Invalid PlanningDepth %d while getting planning 
phase", PlanningDepth);
+       }
+
+       return false;
+}
+
+void increase_planning_depth(void)
+{
+       PlanningDepth++;
+}
+
+void decrease_planning_depth(void)
+{
+       PlanningDepth--;
+}
+
+
 #ifdef USE_ORCA
 /**
  * Logging of optimization outcome
@@ -285,7 +313,7 @@ planner(Query *parse, int cursorOptions,
        PlannedStmt *result = NULL;
        instr_time      starttime, endtime;
        ResourceNegotiatorResult *ppResult = (ResourceNegotiatorResult *) 
palloc(sizeof(ResourceNegotiatorResult));
-       SplitAllocResult initResult = {NULL, NULL, NIL, -1, NIL, NULL};
+       SplitAllocResult initResult = {NULL, NULL, NIL, 0, NIL, NULL};
        ppResult->saResult = initResult;
        ppResult->stmt = NULL;
        static int plannerLevel = 0;
@@ -300,6 +328,8 @@ planner(Query *parse, int cursorOptions,
         * resource to run this query. After gaining the resource, we can 
perform the
         * actual optimization.
         */
+       increase_planning_depth();
+
        plannerLevel++;
        if (!resourceNegotiateDone)
        {
@@ -309,6 +339,8 @@ planner(Query *parse, int cursorOptions,
       {
         resource_negotiator(parse, cursorOptions, boundParams, resourceLife, 
&ppResult);
 
+               decrease_planning_depth();
+
                if(ppResult->stmt && ppResult->stmt->planTree)
                {
                        isDispatchParallel = ppResult->stmt->planTree->dispatch 
== DISPATCH_PARALLEL;
@@ -318,6 +350,8 @@ planner(Query *parse, int cursorOptions,
          }
          PG_CATCH();
          {
+               decrease_planning_depth();
+
                if ((ppResult != NULL))
                {
                  pfree(ppResult);
@@ -430,7 +464,7 @@ planner(Query *parse, int cursorOptions,
          }
          else
          {
-           gp_segments_for_planner = -1;
+           gp_segments_for_planner = 0;
          }
          SetActiveQueryResource(savedQueryResource);
          if ((ppResult != NULL))
@@ -450,7 +484,7 @@ planner(Query *parse, int cursorOptions,
                }
                else
                {
-                       gp_segments_for_planner = -1;
+                       gp_segments_for_planner = 0;
                }
                SetActiveQueryResource(savedQueryResource);
 
@@ -471,6 +505,7 @@ planner(Query *parse, int cursorOptions,
        return result;
 }
 
+
 /*
  * The new framework for HAWQ 2.0 query optimizer
  */
@@ -481,6 +516,8 @@ resource_negotiator(Query *parse, int cursorOptions, 
ParamListInfo boundParams,
   PlannedStmt *plannedstmt = NULL;
   do
   {
+               udf_collector_context udf_context;
+               udf_context.udf_exist = false;
     SplitAllocResult *allocResult = NULL;
     Query *my_parse = copyObject(parse);
     ParamListInfo my_boundParams = copyParamList(boundParams);
@@ -496,12 +533,26 @@ resource_negotiator(Query *parse, int cursorOptions, 
ParamListInfo boundParams,
        */
       allocResult = calculate_planner_segment_num(my_parse, resourceLife,
                                           plannedstmt->rtable, 
plannedstmt->intoPolicy,
-                                          plannedstmt->nMotionNodes + 
plannedstmt->nInitPlans + 1);
+                                          plannedstmt->nMotionNodes + 
plannedstmt->nInitPlans + 1,
+                                          -1);
 
       Assert(allocResult);
 
       (*result)->saResult = *allocResult;
       pfree(allocResult);
+    }else{
+               find_udf(my_parse, &udf_context);
+               if(udf_context.udf_exist){
+                       if ((resourceLife == QRL_ONCE) || (resourceLife == 
QRL_NONE)) {
+                               int64 mincost = min_cost_for_each_query;
+                               mincost <<= 20;
+                               int avgSliceNum = 3;
+                               (*result)->saResult.resource = 
AllocateResource(QRL_ONCE, avgSliceNum, mincost,
+                                               
GetUserDefinedFunctionVsegNum(),GetUserDefinedFunctionVsegNum(),NULL, 0);
+                       }else{
+                               (*result)->saResult.resource = 
AllocateResource(resourceLife, 0, 0, 0, 0, NULL, 0);
+                       }
+               }
     }
   } while (0);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/util/clauses.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/util/clauses.c 
b/src/backend/optimizer/util/clauses.c
index a80737f..a3d67f2 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2831,8 +2831,15 @@ simplify_function(Oid funcid, Oid result_type, List 
*args,
        if (!HeapTupleIsValid(func_tuple))
                elog(ERROR, "cache lookup failed for function %u", funcid);
 
-       newexpr = evaluate_function(funcid, result_type, args,
-                                                               func_tuple, 
context);
+       if (is_in_planning_phase())
+       {
+               newexpr = NULL;
+       }
+       else
+       {
+               newexpr = evaluate_function(funcid, result_type, args,
+                                           func_tuple, context);
+       }
 
        if (large_const(newexpr, context->max_size))
        {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/backend/optimizer/util/walkers.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/util/walkers.c 
b/src/backend/optimizer/util/walkers.c
index 6f1ee51..a8222fb 100644
--- a/src/backend/optimizer/util/walkers.c
+++ b/src/backend/optimizer/util/walkers.c
@@ -162,6 +162,7 @@ expression_tree_walker(Node *node,
                case T_PartBoundExpr:
                case T_PartBoundInclusionExpr:
                case T_PartBoundOpenExpr:
+               case T_RangeTblEntry    :
                        /* primitive node types with no expression subnodes */
                        break;
                case T_Aggref:

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/cdb/cdbdatalocality.h
----------------------------------------------------------------------
diff --git a/src/include/cdb/cdbdatalocality.h 
b/src/include/cdb/cdbdatalocality.h
index d28fc3e..224cb53 100644
--- a/src/include/cdb/cdbdatalocality.h
+++ b/src/include/cdb/cdbdatalocality.h
@@ -50,6 +50,13 @@ typedef struct SplitAllocResult
 } SplitAllocResult;
 
 /*
+ * structure containing all relation range table entries.
+ */
+typedef struct udf_collector_context {
+       bool udf_exist;
+} udf_collector_context;
+
+/*
  * structure containing rel and type when execution
  */
 typedef struct CurrentRelType {
@@ -87,7 +94,17 @@ void saveQueryResourceParameters(
  * we calculate the appropriate planner segment_num.
  */
 SplitAllocResult * calculate_planner_segment_num(Query *query, 
QueryResourceLife resourceLife,
-                                                List *rtable, GpPolicy 
*intoPolicy, int sliceNum);
+                                                List *rtable, GpPolicy 
*intoPolicy, int sliceNum, int fixedVsegNum);
+
+/*
+ * udf_collector_walker: the routine to file udfs.
+ */
+bool udf_collector_walker(Node *node,  udf_collector_context *context);
+
+/*
+ * find_udf: collect all udf, and store them into the udf_collector_context.
+ */
+void find_udf(Query *query, udf_collector_context *context);
 
 FILE *fp;
 FILE *fpaoseg;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/executor/spi.h
----------------------------------------------------------------------
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 867daf5..7b4ba1b 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -155,4 +155,11 @@ extern uint64 SPI_GetMemoryReservation(void);
 extern void SPI_ReserveMemory(uint64 mem_reserved);
 extern bool SPI_IsMemoryReserved(void);
 
+/**
+ * Query resource related routines.
+ */
+extern bool SPI_IsInPrepare(void);
+extern void SPI_IncreasePrepareCounter(void);
+extern void SPI_DecreasePrepareCounter(void);
+
 #endif   /* SPI_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4d997b57/src/include/optimizer/planner.h
----------------------------------------------------------------------
diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h
index 48de348..cc2eb7d 100644
--- a/src/include/optimizer/planner.h
+++ b/src/include/optimizer/planner.h
@@ -50,4 +50,8 @@ extern bool choose_hashed_grouping(PlannerInfo *root,
                                                                   double 
dNumGroups, 
                                                                   
AggClauseCounts *agg_counts);
 
+extern bool is_in_planning_phase(void);
+extern void increase_planning_depth(void);
+extern void decrease_planning_depth(void);
+
 #endif   /* PLANNER_H */

Reply via email to