This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 8e12001f02a5488c5df1e8c6b691033df954a7ad
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Nov 12 12:47:02 2018 -0800

    Apply quota constraint for saving computation time
    
    Existing task framework does not have the constraint of quota limitation. 
For example, if quota is full, Helix should not go through rest of workflows to 
assign as running of quota.
---
 .../stages/task/TaskSchedulingStage.java           | 30 +++++++---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 27 ++++++---
 .../helix/task/AssignableInstanceManager.java      | 19 +++++--
 .../java/org/apache/helix/task/JobDispatcher.java  |  3 +-
 .../helix/task/TaskAssignmentCalculator.java       |  2 +-
 .../org/apache/helix/task/WorkflowDispatcher.java  |  6 +-
 .../TestQuotaConstraintSkipWorkflowAssignment.java | 65 ++++++++++++++++++++++
 7 files changed, 124 insertions(+), 28 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
index 5614e98..ff31240 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/task/TaskSchedulingStage.java
@@ -26,6 +26,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.AssignableInstanceManager;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.task.WorkflowConfig;
@@ -82,6 +83,8 @@ public class TaskSchedulingStage extends AbstractBaseStage {
     }
 
     // Current rest of resources including: only current state left over ones
+    // Original resource map contains workflows + jobs + other invalid 
resources
+    // After removing workflows + jobs, only leftover ones will go over old 
rebalance pipeline.
     for (Resource resource : restOfResources.values()) {
       if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
         failureResources.add(resource.getResourceName());
@@ -222,10 +225,7 @@ public class TaskSchedulingStage extends AbstractBaseStage 
{
 
     for (String workflowId : cache.getWorkflowConfigMap().keySet()) {
       WorkflowConfig workflowConfig = cache.getWorkflowConfig(workflowId);
-      String workflowType = workflowConfig.getWorkflowType();
-      if (workflowType == null || 
!_quotaBasedWorkflowPQs.containsKey(workflowType)) {
-        workflowType = AssignableInstance.DEFAULT_QUOTA_TYPE;
-      }
+      String workflowType = getQuotaType(workflowConfig);
       // TODO: We can support customized sorting field for user. Currently 
sort by creation time
       _quotaBasedWorkflowPQs.get(workflowType)
           .add(new WorkflowObject(workflowId, 
workflowConfig.getRecord().getCreationTime()));
@@ -241,6 +241,7 @@ public class TaskSchedulingStage extends AbstractBaseStage {
   private void scheduleWorkflows(Map<String, Resource> resourceMap, 
ClusterDataCache cache,
       Map<String, Resource> restOfResources, List<String> failureResources,
       CurrentStateOutput currentStateOutput, BestPossibleStateOutput 
bestPossibleOutput) {
+    AssignableInstanceManager assignableInstanceManager = 
cache.getAssignableInstanceManager();
     for (PriorityQueue<WorkflowObject> quotaBasedWorkflowPQ : 
_quotaBasedWorkflowPQs.values()) {
       Iterator<WorkflowObject> it = quotaBasedWorkflowPQ.iterator();
       while (it.hasNext()) {
@@ -255,10 +256,17 @@ public class TaskSchedulingStage extends 
AbstractBaseStage {
             _workflowDispatcher
                 .updateWorkflowStatus(workflowId, 
cache.getWorkflowConfig(workflowId), context,
                     currentStateOutput, bestPossibleOutput);
-            _workflowDispatcher
-                .assignWorkflow(workflowId, 
cache.getWorkflowConfig(workflowId), context,
-                    currentStateOutput, bestPossibleOutput, resourceMap);
+            String quotaType = 
getQuotaType(cache.getWorkflowConfig(workflowId));
             restOfResources.remove(workflowId);
+            if (assignableInstanceManager.hasGlobalCapacity(quotaType)) {
+              _workflowDispatcher
+                  .assignWorkflow(workflowId, 
cache.getWorkflowConfig(workflowId), context,
+                      currentStateOutput, bestPossibleOutput, resourceMap);
+            } else {
+              LogUtil.logInfo(logger, _eventId, String.format(
+                  "Fail to schedule new jobs assignment for Workflow %s due to 
quota %s is full",
+                  workflowId, quotaType));
+            }
           } catch (Exception e) {
             LogUtil.logError(logger, _eventId,
                 "Error computing assignment for Workflow " + workflowId + ". 
Skipping.", e);
@@ -279,4 +287,12 @@ public class TaskSchedulingStage extends AbstractBaseStage 
{
     
resource.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
     resourceMap.put(jobName, resource);
   }
+
+  private String getQuotaType(WorkflowConfig workflowConfig) {
+    String workflowType = workflowConfig.getWorkflowType();
+    if (workflowType == null || 
!_quotaBasedWorkflowPQs.containsKey(workflowType)) {
+      workflowType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+    return workflowType;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index b2f1120..34832c7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -13,7 +13,6 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
-import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterDataCache;
@@ -21,7 +20,6 @@ import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.assigner.AssignableInstance;
@@ -1021,10 +1019,28 @@ public abstract class AbstractTaskDispatcher {
    */
   protected boolean isJobReadyToSchedule(String job, WorkflowConfig 
workflowCfg,
       WorkflowContext workflowCtx, int incompleteAllCount, Map<String, 
JobConfig> jobConfigMap,
-      ClusterDataCache clusterDataCache) {
+      ClusterDataCache clusterDataCache, AssignableInstanceManager 
assignableInstanceManager) {
     int notStartedCount = 0;
     int failedOrTimeoutCount = 0;
     int incompleteParentCount = 0;
+    JobConfig jobConfig = jobConfigMap.get(job);
+
+    if (jobConfig == null) {
+      LOG.error(String.format("The job config is missing for job %s", job));
+      return false;
+    }
+
+    String quotaType = TaskAssignmentCalculator.getQuotaType(workflowCfg, 
jobConfig);
+    if (quotaType == null || 
!assignableInstanceManager.hasQuotaType(quotaType)) {
+      quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+    }
+
+    if (!assignableInstanceManager.hasGlobalCapacity(quotaType)) {
+      LOG.info(String
+          .format("Job %s not ready to schedule due to not having enough quota 
for quota type %s",
+              job, quotaType));
+      return false;
+    }
 
     for (String parent : workflowCfg.getJobDag().getDirectParents(job)) {
       TaskState jobState = workflowCtx.getJobState(parent);
@@ -1048,11 +1064,6 @@ public abstract class AbstractTaskDispatcher {
 
     // If there is parent job failed, schedule the job only when ignore 
dependent
     // job failure enabled
-    JobConfig jobConfig = jobConfigMap.get(job);
-    if (jobConfig == null) {
-      LOG.error(String.format("The job config is missing for job %s", job));
-      return false;
-    }
     if (failedOrTimeoutCount > 0 && !jobConfig.isIgnoreDependentJobFailure()) {
       markJobFailed(job, null, workflowCfg, workflowCtx, jobConfigMap, 
clusterDataCache);
       if (LOG.isDebugEnabled()) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index fada4ad..242eab2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -275,15 +275,22 @@ public class AssignableInstanceManager {
   }
 
   /**
-   * Get remained global quota of certain quota type for skipping redundant 
computation
+   * Check remained global quota of certain quota type for skipping redundant 
computation
    * @param quotaType
    * @return
    */
-  public int getGlobalCapacity(String quotaType) {
-    if (_globalThreadBasedQuotaMap.containsKey(quotaType)) {
-      return _globalThreadBasedQuotaMap.get(quotaType);
-    }
-    return QUOTA_TYPE_NOT_EXIST;
+  public boolean hasGlobalCapacity(String quotaType) {
+    return _globalThreadBasedQuotaMap.containsKey(quotaType)
+        && _globalThreadBasedQuotaMap.get(quotaType) > 0;
+  }
+
+  /**
+   * Check whether quota maps contains the quota type or not
+   * @param quotaType
+   * @return
+   */
+  public boolean hasQuotaType(String quotaType) {
+    return _globalThreadBasedQuotaMap.containsKey(quotaType);
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 9d602a4..c15205b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -87,7 +87,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
 
     if (!TaskUtil.isJobStarted(jobName, workflowCtx) && 
!isJobReadyToSchedule(jobName, workflowCfg,
         workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
-        _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
+        _clusterDataCache.getJobConfigMap(), _clusterDataCache,
+        _clusterDataCache.getAssignableInstanceManager())) {
       LOG.info("Job is not ready to run " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index dc90502..62ed935 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -53,7 +53,7 @@ public abstract class TaskAssignmentCalculator {
    * @param jobConfig
    * @return
    */
-  String getQuotaType(WorkflowConfig workflowConfig, JobConfig jobConfig) {
+  public static String getQuotaType(WorkflowConfig workflowConfig, JobConfig 
jobConfig) {
     String workflowType = workflowConfig.getWorkflowType();
     if (workflowType == null || workflowType.equals("")) {
       // Workflow type is null, so we go by the job type
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
index 319aea7..cb3161d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java
@@ -241,12 +241,8 @@ public class WorkflowDispatcher extends 
AbstractTaskDispatcher {
 
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx, 
inCompleteAllJobCount, jobConfigMap,
-          clusterDataCache)) {
+          clusterDataCache, clusterDataCache.getAssignableInstanceManager())) {
         JobConfig jobConfig = jobConfigMap.get(job);
-        if (jobConfig == null) {
-          LOG.error(String.format("The job config is missing for job %s", 
job));
-          continue;
-        }
 
         // Since the start time is calculated base on the time of completion 
of parent jobs for this
         // job, the calculated start time should only be calculate once. 
Persist the calculated time
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java
new file mode 100644
index 0000000..0bf2832
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestQuotaConstraintSkipWorkflowAssignment.java
@@ -0,0 +1,65 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Collections;
+import java.util.HashMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.stages.task.TaskSchedulingStage;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.assigner.AssignableInstance;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestQuotaConstraintSkipWorkflowAssignment extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+    _controller.syncStop();
+  }
+
+  @Test
+  public void testQuotaConstraintSkipWorkflowAssignment() throws Exception {
+    ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
+    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+    JobConfig.Builder job = new JobConfig.Builder();
+
+    job.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, 
"100000"));
+    TaskDriver driver = new TaskDriver(_manager);
+    for (int i = 0; i < 10; i++) {
+      Workflow.Builder workflow = new Workflow.Builder("Workflow" + i);
+      job.setWorkflow("Workflow" + i);
+      TaskConfig taskConfig =
+          new TaskConfig(MockTask.TASK_COMMAND, new HashMap<String, String>(), 
null, null);
+      job.addTaskConfigMap(Collections.singletonMap(taskConfig.getId(), 
taskConfig));
+      job.setJobId(TaskUtil.getNamespacedJobName("Workflow" + i, "JOB"));
+      workflow.addJob("JOB", job);
+      driver.start(workflow.build());
+    }
+    ConfigAccessor accessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = accessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTaskQuotaRatio(AssignableInstance.DEFAULT_QUOTA_TYPE, 3);
+    clusterConfig.setTaskQuotaRatio("OtherType", 37);
+    accessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    cache.setTaskCache(true);
+    cache.refresh(_manager.getHelixDataAccessor());
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.helixmanager.name(), _manager);
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new TaskSchedulingStage());
+    Assert.assertTrue(!cache.getAssignableInstanceManager()
+        .hasGlobalCapacity(AssignableInstance.DEFAULT_QUOTA_TYPE));
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    Assert.assertTrue(bestPossibleStateOutput.getStateMap().size() == 3);
+  }
+}

Reply via email to