Replace BestPossibleStage with TaskSchedulingStage

For new task framework, we dont have to loop through all the idealstates. For 
the first step, we can have a simple version of BestPossibleStage.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/50c9aa3a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/50c9aa3a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/50c9aa3a

Branch: refs/heads/master
Commit: 50c9aa3a2eaf82e75743c693b7e40ad741e51198
Parents: c45d3a6
Author: Junkai Xue <[email protected]>
Authored: Tue Jul 17 17:56:36 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 17 15:45:45 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   2 +-
 .../stages/BestPossibleStateCalcStage.java      |  57 +-----
 .../controller/stages/TaskSchedulingStage.java  | 204 +++++++++++++++++++
 .../task/TestWorkflowTermination.java           |  26 +--
 4 files changed, 227 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 9f94755..48677d3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -306,7 +306,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       // TODO: Junkai will work on refactoring existing pipeline log into 
abstract logic and
       // extend the logic to separate pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
-      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+      rebalancePipeline.addStage(new TaskSchedulingStage());
       rebalancePipeline.addStage(new IntermediateStateCalcStage());
       rebalancePipeline.addStage(new MessageGenerationPhase());
       rebalancePipeline.addStage(new MessageSelectionStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index a6ba5b8..5cc9593 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -103,35 +103,25 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
-    PriorityQueue<ResourcePriority> resourcePriorityQueue = new 
PriorityQueue<>();
-    TaskDriver taskDriver = null;
     HelixManager helixManager = 
event.getAttribute(AttributeName.helixmanager.name());
-    if (helixManager != null) {
-      taskDriver = new TaskDriver(helixManager);
-    }
-    for (Resource resource : resourceMap.values()) {
-      resourcePriorityQueue.add(new ResourcePriority(resource, 
cache.getIdealState(resource.getResourceName()),
-          taskDriver));
-    }
 
     final List<String> failureResources = new ArrayList<>();
-    Iterator<ResourcePriority> itr = resourcePriorityQueue.iterator();
+    Iterator<Resource> itr = resourceMap.values().iterator();
     while (itr.hasNext()) {
-      Resource resource = itr.next().getResource();
+      Resource resource = itr.next();
       if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
         failureResources.add(resource.getResourceName());
-        LogUtil.logWarn(logger, _eventId, "Failed to calculate best possible 
states for " + resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId,
+            "Failed to calculate best possible states for " + 
resource.getResourceName());
       }
     }
 
     // Check and report if resource rebalance has failure
-    if (!cache.isTaskCache()) {
-      ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, 
clusterStatusMonitor,
-          "Failed to calculate best possible states for " + 
failureResources.size()
-              + " resources.");
-    }
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, 
clusterStatusMonitor,
+        "Failed to calculate best possible states for " + 
failureResources.size() + " resources.");
+
     return output;
   }
 
@@ -356,33 +346,4 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
 
     return mappingCalculator;
   }
-
-  class ResourcePriority implements Comparable<ResourcePriority> {
-    final Resource _resource;
-    // By default, non-job resources and new jobs are assigned lowest priority
-    Long _priority = Long.MAX_VALUE;
-
-    Resource getResource() {
-      return _resource;
-    }
-
-    public ResourcePriority(Resource resource, IdealState idealState, 
TaskDriver taskDriver) {
-      _resource = resource;
-
-      if (taskDriver != null && idealState != null
-          && idealState.getRebalancerClassName() != null
-          && 
idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) {
-        // Update priority for job resources, note that older jobs will be 
processed earlier
-        JobContext jobContext = 
taskDriver.getJobContext(resource.getResourceName());
-        if (jobContext != null && jobContext.getStartTime() != 
WorkflowContext.UNSTARTED) {
-          _priority = jobContext.getStartTime();
-        }
-      }
-    }
-
-    @Override
-    public int compareTo(ResourcePriority otherJob) {
-      return _priority.compareTo(otherJob._priority);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
new file mode 100644
index 0000000..e2596b1
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskSchedulingStage.java
@@ -0,0 +1,204 @@
+package org.apache.helix.controller.stages;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobRebalancer;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskSchedulingStage extends AbstractBaseStage {
+  private static final Logger logger = 
LoggerFactory.getLogger(TaskSchedulingStage.class.getName());
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final Map<String, Resource> resourceMap =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
+
+    if (currentStateOutput == null || resourceMap == null || cache == null) {
+      throw new StageException(
+          "Missing attributes in event:" + event + ". Requires 
CURRENT_STATE|RESOURCES|DataCache");
+    }
+
+    // Reset current INIT/RUNNING tasks on participants for throttling
+    cache.resetActiveTaskCount(currentStateOutput);
+
+    final BestPossibleStateOutput bestPossibleStateOutput =
+        compute(event, resourceMap, currentStateOutput);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+
+  }
+
+  private BestPossibleStateOutput compute(ClusterEvent event, Map<String, 
Resource> resourceMap,
+      CurrentStateOutput currentStateOutput) {
+    ClusterDataCache cache = 
event.getAttribute(AttributeName.ClusterDataCache.name());
+    BestPossibleStateOutput output = new BestPossibleStateOutput();
+
+    PriorityQueue<TaskSchedulingStage.ResourcePriority> resourcePriorityQueue =
+        new PriorityQueue<>();
+    TaskDriver taskDriver = null;
+    HelixManager helixManager = 
event.getAttribute(AttributeName.helixmanager.name());
+    if (helixManager != null) {
+      taskDriver = new TaskDriver(helixManager);
+    }
+    for (Resource resource : resourceMap.values()) {
+      resourcePriorityQueue.add(new 
TaskSchedulingStage.ResourcePriority(resource,
+          cache.getIdealState(resource.getResourceName()), taskDriver));
+    }
+
+    // TODO: Replace this looping available resources with Workflow Queues
+    for (Iterator<TaskSchedulingStage.ResourcePriority> itr = 
resourcePriorityQueue.iterator();
+        itr.hasNext(); ) {
+      Resource resource = itr.next().getResource();
+      if (!computeResourceBestPossibleState(event, cache, currentStateOutput, 
resource, output)) {
+        LogUtil
+            .logWarn(logger, _eventId, "Failed to assign tasks for " + 
resource.getResourceName());
+      }
+    }
+
+    return output;
+  }
+
+
+  private boolean computeResourceBestPossibleState(ClusterEvent event, 
ClusterDataCache cache,
+      CurrentStateOutput currentStateOutput, Resource resource, 
BestPossibleStateOutput output) {
+    // for each ideal state
+    // read the state model def
+    // for each resource
+    // get the preference list
+    // for each instanceName check if its alive then assign a state
+
+    String resourceName = resource.getResourceName();
+    LogUtil.logDebug(logger, _eventId, "Processing resource:" + resourceName);
+    // Ideal state may be gone. In that case we need to get the state model 
name
+    // from the current state
+    IdealState idealState = cache.getIdealState(resourceName);
+    if (idealState == null) {
+      // if ideal state is deleted, use an empty one
+      LogUtil.logInfo(logger, _eventId, "resource:" + resourceName + " does 
not exist anymore");
+      idealState = new IdealState(resourceName);
+      idealState.setStateModelDefRef(resource.getStateModelDefRef());
+    }
+
+    Rebalancer rebalancer = null;
+    String rebalancerClassName = idealState.getRebalancerClassName();
+    if (rebalancerClassName != null) {
+      if (logger.isDebugEnabled()) {
+        LogUtil.logDebug(logger, _eventId,
+            "resource " + resourceName + " use idealStateRebalancer " + 
rebalancerClassName);
+      }
+      try {
+        rebalancer = Rebalancer.class
+            .cast(HelixUtil.loadClass(getClass(), 
rebalancerClassName).newInstance());
+      } catch (Exception e) {
+        LogUtil.logError(logger, _eventId,
+            "Exception while invoking custom rebalancer class:" + 
rebalancerClassName, e);
+      }
+    }
+
+    MappingCalculator mappingCalculator = null;
+    if (rebalancer != null) {
+      try {
+        mappingCalculator = MappingCalculator.class.cast(rebalancer);
+      } catch (ClassCastException e) {
+        LogUtil.logWarn(logger, _eventId,
+            "Rebalancer does not have a mapping calculator, defaulting to 
SEMI_AUTO, resource: "
+                + resourceName);
+      }
+    } else {
+      return false;
+    }
+
+    TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
+    taskRebalancer.setClusterStatusMonitor(
+        (ClusterStatusMonitor) 
event.getAttribute(AttributeName.clusterStatusMonitor.name()));
+
+    ResourceAssignment partitionStateAssignment = null;
+    try {
+      HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
+      rebalancer.init(manager);
+
+      // Use the internal MappingCalculator interface to compute the final 
assignment
+      // The next release will support rebalancers that compute the mapping 
from start to finish
+        partitionStateAssignment = mappingCalculator
+            .computeBestPossiblePartitionState(cache, idealState, resource, 
currentStateOutput);
+        for (Partition partition : resource.getPartitions()) {
+          Map<String, String> newStateMap = 
partitionStateAssignment.getReplicaMap(partition);
+          output.setState(resourceName, partition, newStateMap);
+        }
+
+        // Check if calculation is done successfully
+        return true;
+      } catch (Exception e) {
+        LogUtil
+            .logError(logger, _eventId, "Error computing assignment for 
resource " + resourceName + ". Skipping.", e);
+        // TODO : remove this part after debugging NPE
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(String
+            .format("HelixManager is null : %s\n", 
event.getAttribute("helixmanager") == null));
+        sb.append(String.format("Rebalancer is null : %s\n", rebalancer == 
null));
+        sb.append(String.format("Calculated idealState is null : %s\n", 
idealState == null));
+        sb.append(String.format("MappingCaculator is null : %s\n", 
mappingCalculator == null));
+        sb.append(
+            String.format("PartitionAssignment is null : %s\n", 
partitionStateAssignment == null));
+        sb.append(String.format("Output is null : %s\n", output == null));
+
+        LogUtil.logError(logger, _eventId, sb.toString());
+      }
+
+    // Exception or rebalancer is not found
+    return false;
+  }
+
+  class ResourcePriority implements Comparable<ResourcePriority> {
+    final Resource _resource;
+    // By default, non-job resources and new jobs are assigned lowest priority
+    Long _priority = Long.MAX_VALUE;
+
+    Resource getResource() {
+      return _resource;
+    }
+
+    public ResourcePriority(Resource resource, IdealState idealState, 
TaskDriver taskDriver) {
+      _resource = resource;
+
+      if (taskDriver != null && idealState != null
+          && idealState.getRebalancerClassName() != null
+          && 
idealState.getRebalancerClassName().equals(JobRebalancer.class.getName())) {
+        // Update priority for job resources, note that older jobs will be 
processed earlier
+        JobContext jobContext = 
taskDriver.getJobContext(resource.getResourceName());
+        if (jobContext != null && jobContext.getStartTime() != 
WorkflowContext.UNSTARTED) {
+          _priority = jobContext.getStartTime();
+        }
+      }
+    }
+
+    @Override
+    public int compareTo(ResourcePriority otherJob) {
+      return _priority.compareTo(otherJob._priority);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/50c9aa3a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index f303c52..1a08468 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -130,10 +130,10 @@ public class TestWorkflowTermination extends TaskTestBase 
{
   public void testWorkflowPausedTimeout() throws InterruptedException {
     String workflowName = TestHelper.getTestMethodName();
     long workflowExpiry = 2000; // 2sec expiry time
-    long timeout = 2000;
+    long timeout = 5000;
     String notStartedJobName = JOB_NAME + "-NotStarted";
 
-    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 
100);
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 
5000);
     jobBuilder.setWorkflow(workflowName);
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(
@@ -151,7 +151,7 @@ public class TestWorkflowTermination extends TaskTestBase {
 
     // Wait a bit for the job to get scheduled. Job runs for 100ms so this 
will very likely
     // to trigger a job stopped
-    Thread.sleep(40);
+    Thread.sleep(100);
 
     // Pause the queue
     _driver.waitToStop(workflowName, 10000L);
@@ -208,17 +208,17 @@ public class TestWorkflowTermination extends TaskTestBase 
{
     String job2 = JOB_NAME + "2";
     String job3 = JOB_NAME + "3";
     String job4 = JOB_NAME + "4";
-    long workflowExpiry = 2000;
-    long timeout = 5000;
+    long workflowExpiry = 10000;
+    long timeout = 8000;
 
-    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 
50);
-    JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, 
true, 10);
+    JobConfig.Builder jobBuilder = createJobConfigBuilder(workflowName, false, 
1);
+    JobConfig.Builder failedJobBuilder = createJobConfigBuilder(workflowName, 
true, 1);
 
     Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName)
         .setWorkflowConfig(
             new WorkflowConfig.Builder(workflowName)
                 .setWorkFlowType(WORKFLOW_TYPE)
-                .setTimeout(timeout)
+                .setTimeout(timeout).setParallelJobs(4)
                 .setFailureThreshold(1)
                 .build()
         )
@@ -234,26 +234,26 @@ public class TestWorkflowTermination extends TaskTestBase 
{
 
     _driver.start(workflowBuilder.build());
 
-    _driver.pollForWorkflowState(workflowName, 5000L, TaskState.FAILED);
+    _driver.pollForWorkflowState(workflowName, 10000L, TaskState.FAILED);
 
     // Timeout is longer than fail time, so the failover should occur earlier
     WorkflowContext context = _driver.getWorkflowContext(workflowName);
     Assert.assertTrue(context.getFinishTime() - context.getStartTime() < 
timeout);
 
     // job1 will complete
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job1), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job1), 10000L,
         TaskState.COMPLETED);
 
     // Possible race between 2 and 3 so it's likely for job2 to stay in either 
COMPLETED or ABORTED
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job2), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job2), 10000L,
         TaskState.COMPLETED, TaskState.ABORTED);
 
     // job3 meant to fail
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job3), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job3), 10000L,
         TaskState.FAILED);
 
     // because job4 has dependency over job3, it will fail as well
-    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job4), 5000L,
+    _driver.pollForJobState(workflowName, getJobNameToPoll(workflowName, 
job4), 10000L,
         TaskState.FAILED);
 
     // Check MBean is updated

Reply via email to