This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8e12001f02a5488c5df1e8c6b691033df954a7ad Author: Junkai Xue <[email protected]> AuthorDate: Mon Nov 12 12:47:02 2018 -0800 Apply quota constraint for saving computation time Existing task framework does not have the constraint of quota limitation. For example, if quota is full, Helix should not go through rest of workflows to assign as running of quota. --- .../stages/task/TaskSchedulingStage.java | 30 +++++++--- .../apache/helix/task/AbstractTaskDispatcher.java | 27 ++++++--- .../helix/task/AssignableInstanceManager.java | 19 +++++-- .../java/org/apache/helix/task/JobDispatcher.java | 3 +- .../helix/task/TaskAssignmentCalculator.java | 2 +- .../org/apache/helix/task/WorkflowDispatcher.java | 6 +- .../TestQuotaConstraintSkipWorkflowAssignment.java | 65 ++++++++++++++++++++++ 7 files changed, 124 insertions(+), 28 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java index 5614e98..ff31240 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java @@ -26,6 +26,7 @@ import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.task.AssignableInstanceManager; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskRebalancer; import org.apache.helix.task.WorkflowConfig; @@ -82,6 +83,8 @@ public class TaskSchedulingStage extends AbstractBaseStage { } // Current rest of resources including: only current state left over ones + // Original resource map contains workflows + jobs + other invalid resources + // After removing workflows + jobs, only leftover ones will go over old rebalance pipeline. for (Resource resource : restOfResources.values()) { if (!computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output)) { failureResources.add(resource.getResourceName()); @@ -222,10 +225,7 @@ public class TaskSchedulingStage extends AbstractBaseStage { for (String workflowId : cache.getWorkflowConfigMap().keySet()) { WorkflowConfig workflowConfig = cache.getWorkflowConfig(workflowId); - String workflowType = workflowConfig.getWorkflowType(); - if (workflowType == null || !_quotaBasedWorkflowPQs.containsKey(workflowType)) { - workflowType = AssignableInstance.DEFAULT_QUOTA_TYPE; - } + String workflowType = getQuotaType(workflowConfig); // TODO: We can support customized sorting field for user. Currently sort by creation time _quotaBasedWorkflowPQs.get(workflowType) .add(new WorkflowObject(workflowId, workflowConfig.getRecord().getCreationTime())); @@ -241,6 +241,7 @@ public class TaskSchedulingStage extends AbstractBaseStage { private void scheduleWorkflows(Map<String, Resource> resourceMap, ClusterDataCache cache, Map<String, Resource> restOfResources, List<String> failureResources, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { + AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager(); for (PriorityQueue<WorkflowObject> quotaBasedWorkflowPQ : _quotaBasedWorkflowPQs.values()) { Iterator<WorkflowObject> it = quotaBasedWorkflowPQ.iterator(); while (it.hasNext()) { @@ -255,10 +256,17 @@ public class TaskSchedulingStage extends AbstractBaseStage { _workflowDispatcher .updateWorkflowStatus(workflowId, cache.getWorkflowConfig(workflowId), context, currentStateOutput, bestPossibleOutput); - _workflowDispatcher - .assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), context, - currentStateOutput, bestPossibleOutput, resourceMap); + String quotaType = getQuotaType(cache.getWorkflowConfig(workflowId)); restOfResources.remove(workflowId); + if (assignableInstanceManager.hasGlobalCapacity(quotaType)) { + _workflowDispatcher + .assignWorkflow(workflowId, cache.getWorkflowConfig(workflowId), context, + currentStateOutput, bestPossibleOutput, resourceMap); + } else { + LogUtil.logInfo(logger, _eventId, String.format( + "Fail to schedule new jobs assignment for Workflow %s due to quota %s is full", + workflowId, quotaType)); + } } catch (Exception e) { LogUtil.logError(logger, _eventId, "Error computing assignment for Workflow " + workflowId + ". Skipping.", e); @@ -279,4 +287,12 @@ public class TaskSchedulingStage extends AbstractBaseStage { resource.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); resourceMap.put(jobName, resource); } + + private String getQuotaType(WorkflowConfig workflowConfig) { + String workflowType = workflowConfig.getWorkflowType(); + if (workflowType == null || !_quotaBasedWorkflowPQs.containsKey(workflowType)) { + workflowType = AssignableInstance.DEFAULT_QUOTA_TYPE; + } + return workflowType; + } } diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index b2f1120..34832c7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -13,7 +13,6 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; -import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterDataCache; @@ -21,7 +20,6 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.task.assigner.AssignableInstance; @@ -1021,10 +1019,28 @@ public abstract class AbstractTaskDispatcher { */ protected boolean isJobReadyToSchedule(String job, WorkflowConfig workflowCfg, WorkflowContext workflowCtx, int incompleteAllCount, Map<String, JobConfig> jobConfigMap, - ClusterDataCache clusterDataCache) { + ClusterDataCache clusterDataCache, AssignableInstanceManager assignableInstanceManager) { int notStartedCount = 0; int failedOrTimeoutCount = 0; int incompleteParentCount = 0; + JobConfig jobConfig = jobConfigMap.get(job); + + if (jobConfig == null) { + LOG.error(String.format("The job config is missing for job %s", job)); + return false; + } + + String quotaType = TaskAssignmentCalculator.getQuotaType(workflowCfg, jobConfig); + if (quotaType == null || !assignableInstanceManager.hasQuotaType(quotaType)) { + quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE; + } + + if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) { + LOG.info(String + .format("Job %s not ready to schedule due to not having enough quota for quota type %s", + job, quotaType)); + return false; + } for (String parent : workflowCfg.getJobDag().getDirectParents(job)) { TaskState jobState = workflowCtx.getJobState(parent); @@ -1048,11 +1064,6 @@ public abstract class AbstractTaskDispatcher { // If there is parent job failed, schedule the job only when ignore dependent // job failure enabled - JobConfig jobConfig = jobConfigMap.get(job); - if (jobConfig == null) { - LOG.error(String.format("The job config is missing for job %s", job)); - return false; - } if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) { markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, clusterDataCache); if (LOG.isDebugEnabled()) { diff --git a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java index fada4ad..242eab2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java +++ b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java @@ -275,15 +275,22 @@ public class AssignableInstanceManager { } /** - * Get remained global quota of certain quota type for skipping redundant computation + * Check remained global quota of certain quota type for skipping redundant computation * @param quotaType * @return */ - public int getGlobalCapacity(String quotaType) { - if (_globalThreadBasedQuotaMap.containsKey(quotaType)) { - return _globalThreadBasedQuotaMap.get(quotaType); - } - return QUOTA_TYPE_NOT_EXIST; + public boolean hasGlobalCapacity(String quotaType) { + return _globalThreadBasedQuotaMap.containsKey(quotaType) + && _globalThreadBasedQuotaMap.get(quotaType) > 0; + } + + /** + * Check whether quota maps contains the quota type or not + * @param quotaType + * @return + */ + public boolean hasQuotaType(String quotaType) { + return _globalThreadBasedQuotaMap.containsKey(quotaType); } /** diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 9d602a4..c15205b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -87,7 +87,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx), - _clusterDataCache.getJobConfigMap(), _clusterDataCache)) { + _clusterDataCache.getJobConfigMap(), _clusterDataCache, + _clusterDataCache.getAssignableInstanceManager())) { LOG.info("Job is not ready to run " + jobName); return buildEmptyAssignment(jobName, currStateOutput); } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java index dc90502..62ed935 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java @@ -53,7 +53,7 @@ public abstract class TaskAssignmentCalculator { * @param jobConfig * @return */ - String getQuotaType(WorkflowConfig workflowConfig, JobConfig jobConfig) { + public static String getQuotaType(WorkflowConfig workflowConfig, JobConfig jobConfig) { String workflowType = workflowConfig.getWorkflowType(); if (workflowType == null || workflowType.equals("")) { // Workflow type is null, so we go by the job type diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 319aea7..cb3161d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -241,12 +241,8 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { // check ancestor job status if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, inCompleteAllJobCount, jobConfigMap, - clusterDataCache)) { + clusterDataCache, clusterDataCache.getAssignableInstanceManager())) { JobConfig jobConfig = jobConfigMap.get(job); - if (jobConfig == null) { - LOG.error(String.format("The job config is missing for job %s", job)); - continue; - } // Since the start time is calculated base on the time of completion of parent jobs for this // job, the calculated start time should only be calculate once. Persist the calculated time diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java new file mode 100644 index 0000000..0bf2832 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java @@ -0,0 +1,65 @@ +package org.apache.helix.controller.stages; + +import java.util.Collections; +import java.util.HashMap; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.controller.stages.task.TaskSchedulingStage; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.assigner.AssignableInstance; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestQuotaConstraintSkipWorkflowAssignment extends TaskTestBase { + + @BeforeClass + public void beforeClass() throws Exception { + setSingleTestEnvironment(); + super.beforeClass(); + _controller.syncStop(); + } + + @Test + public void testQuotaConstraintSkipWorkflowAssignment() throws Exception { + ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown); + ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME); + JobConfig.Builder job = new JobConfig.Builder(); + + job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "100000")); + TaskDriver driver = new TaskDriver(_manager); + for (int i = 0; i < 10; i++) { + Workflow.Builder workflow = new Workflow.Builder("Workflow" + i); + job.setWorkflow("Workflow" + i); + TaskConfig taskConfig = + new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), null, null); + job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), taskConfig)); + job.setJobId(TaskUtil.getNamespacedJobName("Workflow" + i, "JOB")); + workflow.addJob("JOB", job); + driver.start(workflow.build()); + } + ConfigAccessor accessor = new ConfigAccessor(_gZkClient); + ClusterConfig clusterConfig = accessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTaskQuotaRatio(AssignableInstance.DEFAULT_QUOTA_TYPE, 3); + clusterConfig.setTaskQuotaRatio("OtherType", 37); + accessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + cache.setTaskCache(true); + cache.refresh(_manager.getHelixDataAccessor()); + event.addAttribute(AttributeName.ClusterDataCache.name(), cache); + event.addAttribute(AttributeName.helixmanager.name(), _manager); + runStage(event, new ResourceComputationStage()); + runStage(event, new CurrentStateComputationStage()); + runStage(event, new TaskSchedulingStage()); + Assert.assertTrue(!cache.getAssignableInstanceManager() + .hasGlobalCapacity(AssignableInstance.DEFAULT_QUOTA_TYPE)); + BestPossibleStateOutput bestPossibleStateOutput = + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + Assert.assertTrue(bestPossibleStateOutput.getStateMap().size() == 3); + } +}
