HAWQ-364. Make resource manager dynamically adjust minimum YARN container count in each segment
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/e22956c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/e22956c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/e22956c6 Branch: refs/heads/HAWQ-369 Commit: e22956c6f84cbeb79ea32615f5f7f9908ffca553 Parents: 898820b Author: YI JIN <y...@pivotal.io> Authored: Thu Jan 28 14:37:07 2016 +1100 Committer: YI JIN <y...@pivotal.io> Committed: Thu Jan 28 14:37:07 2016 +1100 ---------------------------------------------------------------------- .../communication/rmcomm_RM2RMSEG.c | 2 + .../resourcemanager/include/resourcepool.h | 1 + .../resourcemanager/include/resqueuemanager.h | 6 +- src/backend/resourcemanager/requesthandler.c | 2 + .../resourcebroker/resourcebroker_LIBYARN.c | 20 ++++- src/backend/resourcemanager/resourcemanager.c | 2 + src/backend/resourcemanager/resqueuemanager.c | 79 +++++++++++++++++++- 7 files changed, 108 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c index 3591ac1..e6b861b 100644 --- a/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c +++ b/src/backend/resourcemanager/communication/rmcomm_RM2RMSEG.c @@ -244,6 +244,7 @@ void receivedRUAliveResponse(AsyncCommMessageHandlerContext context, GET_SEGRESOURCE_HOSTNAME(segres)); refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); } else { elog(DEBUG3, "Resource manager find host %s is down already.", @@ -293,6 +294,7 @@ void sentRUAliveError(AsyncCommMessageHandlerContext context) GET_SEGRESOURCE_HOSTNAME(segres)); refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/include/resourcepool.h ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/include/resourcepool.h b/src/backend/resourcemanager/include/resourcepool.h index d63a6cb..b7b25a1 100644 --- a/src/backend/resourcemanager/include/resourcepool.h +++ b/src/backend/resourcemanager/include/resourcepool.h @@ -390,6 +390,7 @@ struct ResourcePoolData { */ ResourceBundleData FTSTotal; ResourceBundleData GRMTotal; + ResourceBundleData GRMTotalHavingNoHAWQNode; uint64_t LastUpdateTime; /* Last time the GRM cluster report is gotten. */ uint64_t LastRequestTime;/* Last time the GRM cluster report is sent. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/include/resqueuemanager.h ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/include/resqueuemanager.h b/src/backend/resourcemanager/include/resqueuemanager.h index 0b38520..171b399 100644 --- a/src/backend/resourcemanager/include/resqueuemanager.h +++ b/src/backend/resourcemanager/include/resqueuemanager.h @@ -329,6 +329,8 @@ struct DynResourceQueueManagerData { int ForcedReturnGRMContainerCount; bool toRunQueryDispatch; bool hasResourceProblem[RESPROBLEM_COUNT]; + + int ActualMinGRMContainerPerSeg; }; typedef struct DynResourceQueueManagerData *DynResourceQueueManager; typedef struct DynResourceQueueManagerData DynResourceQueueManagerData; @@ -344,8 +346,10 @@ typedef struct DynResourceQueueManagerData DynResourceQueueManagerData; void initializeResourceQueueManager(void); /* collect resource queues' resource usage status from bottom up. */ void refreshMemoryCoreRatioLevelUsage(uint64_t curmicrosec); -/* Refresh reosurce queue resource capacity and adjusts all queued requests. */ +/* Refresh resource queue resource capacity and adjusts all queued requests. */ void refreshResourceQueueCapacity(bool queuechanged); +/* Refresh actual minimum GRM container water level. */ +void refreshActualMinGRMContainerPerSeg(void); /* Dispatch resource to the queuing queries. */ void dispatchResourceToQueries(void); /* Time out the resource allocated whose QD owner does not have chance to return. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/requesthandler.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/requesthandler.c b/src/backend/resourcemanager/requesthandler.c index c6e9a34..cc2a216 100644 --- a/src/backend/resourcemanager/requesthandler.c +++ b/src/backend/resourcemanager/requesthandler.c @@ -783,6 +783,7 @@ bool handleRMSEGRequestIMAlive(void **arg) { /* Refresh resource queue capacities. */ refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); /* Recalculate all memory/core ratio instances' limits. */ refreshMemoryCoreRatioLimits(); /* Refresh memory/core ratio level water mark. */ @@ -1049,6 +1050,7 @@ bool handleRMRequestSegmentIsDown(void **arg) } refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); RPCResponseSegmentIsDownData response; response.Result = res; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c index c6d26af..c97e340 100644 --- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c +++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN.c @@ -649,12 +649,21 @@ int handleRB2RM_ClusterReport(void) setAllSegResourceGRMUnavailable(); /* - * Start to update resource pool content. + * Start to update resource pool content. The YARN cluster total size is + * also counted the same time. */ + + resetResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode), 0, 0.0, 0); + MEMORY_CONTEXT_SWITCH_TO(PCONTEXT) while( list_length(segstats) > 0 ) { SegStat segstat = (SegStat)lfirst(list_head(segstats)); + + addResourceBundleData(&(PRESPOOL->GRMTotalHavingNoHAWQNode), + segstat->GRMTotalMemoryMB, + segstat->GRMTotalCore); + res = updateHAWQSegWithGRMSegStat(segstat); if ( res == FUNC_RETURN_OK ) { @@ -676,6 +685,14 @@ int handleRB2RM_ClusterReport(void) } MEMORY_CONTEXT_SWITCH_BACK + elog(LOG, "Resource manager YARN resource broker counted HAWQ cluster now " + "having (%d MB, %lf CORE) in a YARN cluster of total resource " + "(%d MB, %lf CORE).", + PRESPOOL->GRMTotal.MemoryMB, + PRESPOOL->GRMTotal.Core, + PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB, + PRESPOOL->GRMTotalHavingNoHAWQNode.Core); + /* * If the segment is not GRM available, RM should return all containers * located upon them. @@ -695,6 +712,7 @@ int handleRB2RM_ClusterReport(void) PQUEMGR->GRMQueueResourceTight = response.ResourceTight > 0 ? true : false; refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); PRESPOOL->LastUpdateTime = gettime_microsec(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resourcemanager.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c index 819da83..b8a7bb5 100644 --- a/src/backend/resourcemanager/resourcemanager.c +++ b/src/backend/resourcemanager/resourcemanager.c @@ -2645,6 +2645,7 @@ void updateStatusOfAllNodes() if ( changedstatus ) { refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); } validateResourcePoolStatus(true); @@ -2808,6 +2809,7 @@ int loadHostInformationIntoResourcePool(void) /* Refresh resource queue capacities. */ refreshResourceQueueCapacity(false); + refreshActualMinGRMContainerPerSeg(); /* Recalculate all memory/core ratio instances' limits. */ refreshMemoryCoreRatioLimits(); /* Refresh memory/core ratio level water mark. */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/e22956c6/src/backend/resourcemanager/resqueuemanager.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resqueuemanager.c b/src/backend/resourcemanager/resqueuemanager.c index e2a2f43..10a970b 100644 --- a/src/backend/resourcemanager/resqueuemanager.c +++ b/src/backend/resourcemanager/resqueuemanager.c @@ -258,6 +258,8 @@ void initializeResourceQueueManager(void) { PQUEMGR->hasResourceProblem[i] = false; } + + PQUEMGR->ActualMinGRMContainerPerSeg = rm_min_resource_perseg; } /* @@ -2472,6 +2474,77 @@ int returnResourceToResQueMgr(ConnectionTrack conntrack) return res; } +void refreshActualMinGRMContainerPerSeg(void) +{ + /*-------------------------------------------------------------------------- + * There are 3 limits should be considered, the actual water level is the + * least value of the 3 limits : resource queue normal capacity caused mean + * GRM container number, minimum value of all segments' maximum GRM container + * numbers, user setting saved in guc. + * + *-------------------------------------------------------------------------- + */ + + /* STEP 1. go through each segment to get segment maximum capacity. */ + int minctncount = INT32_MAX; + int normalctncount = INT32_MAX; + if ( DRMGlobalInstance->ImpType != NONE_HAWQ2 ) + { + List *allsegres = NULL; + ListCell *cell = NULL; + getAllPAIRRefIntoList(&(PRESPOOL->Segments), &allsegres); + + foreach(cell, allsegres) + { + SegResource segres = (SegResource)(((PAIR)lfirst(cell))->Value); + if ( !IS_SEGSTAT_FTSAVAILABLE(segres->Stat) || + !IS_SEGSTAT_GRMAVAILABLE(segres->Stat) ) + { + continue; + } + + if ( segres->Stat->GRMTotalCore < minctncount ) + { + minctncount = segres->Stat->GRMTotalCore; + } + } + freePAIRRefList(&(PRESPOOL->Segments), &allsegres); + + elog(RMLOG, "Resource manager finds minimum global resource manager " + "container count can contained by all segments is %d", + minctncount); + + /* STEP 2. check the queue normal capacity introduced water level. */ + if ( PRESPOOL->AvailNodeCount > 0 && + PQUEMGR->GRMQueueCapacity > 0 && + PRESPOOL->GRMTotalHavingNoHAWQNode.Core > 0 ) + { + normalctncount = trunc(PRESPOOL->GRMTotalHavingNoHAWQNode.Core * + PQUEMGR->GRMQueueCapacity / + PRESPOOL->AvailNodeCount); + + elog(RMLOG, "Resource manager calculates normal global resource " + "manager container count based on target queue capacity " + "is %d", + normalctncount); + } + } + + /* STEP 3. Get final water level result. */ + int oldval = PQUEMGR->ActualMinGRMContainerPerSeg; + int newval = minctncount < normalctncount ? minctncount : normalctncount; + newval = newval < rm_min_resource_perseg ? newval : rm_min_resource_perseg; + + if ( newval != oldval ) + { + elog(WARNING, "Resource manager adjusts minimum global resource manager " + "container count in each segment from %d to %d.", + oldval, + newval); + } + PQUEMGR->ActualMinGRMContainerPerSeg = newval; +} + void refreshResourceQueueCapacity(bool queuechanged) { static char errorbuf[ERRORMESSAGE_SIZE]; @@ -2510,8 +2583,10 @@ void refreshResourceQueuePercentageCapacity(bool queuechanged) { if ( DRMGlobalInstance->ImpType == YARN_LIBYARN ) { - mem = PRESPOOL->GRMTotal.MemoryMB * PQUEMGR->GRMQueueMaxCapacity; - core = PRESPOOL->GRMTotal.Core * PQUEMGR->GRMQueueMaxCapacity; + mem = PRESPOOL->GRMTotalHavingNoHAWQNode.MemoryMB * + PQUEMGR->GRMQueueMaxCapacity; + core = PRESPOOL->GRMTotalHavingNoHAWQNode.Core * + PQUEMGR->GRMQueueMaxCapacity; } else if ( DRMGlobalInstance->ImpType == NONE_HAWQ2 ) {