This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ce100d2230c2f136092efb0d2ca7ef6d7a697326
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Tue May 26 16:49:42 2020 -0700

    Make the task scheduling decision independent of the PreviousAssignment 
(#994)
    
    In this commit, the previous scheduling logic which was based on 
PreviousAssignment,
    has been changed and will no longer depend on prevAssignment. Instead, the 
task scheduling will be based solely on the CurrentState.
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 100 ++++++++++++++-------
 .../java/org/apache/helix/task/JobDispatcher.java  |  74 +++++----------
 .../helix/task/TestDropTerminalTasksUponReset.java |   3 +-
 3 files changed, 90 insertions(+), 87 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 60c2402..8934337 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -66,7 +66,7 @@ public abstract class AbstractTaskDispatcher {
   // Job Update related methods
 
   public void updatePreviousAssignedTasksStatus(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, 
Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx, JobConfig jobCfg,
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> 
partitionsToDropFromIs,
@@ -78,11 +78,11 @@ public abstract class AbstractTaskDispatcher {
     AssignableInstanceManager assignableInstanceManager = 
cache.getAssignableInstanceManager();
 
     // Iterate through all instances
-    for (String instance : prevInstanceToTaskAssignments.keySet()) {
+    for (String instance : currentInstanceToTaskAssignments.keySet()) {
       assignedPartitions.put(instance, new HashSet<>());
 
       // Set all dropping transitions first. These are tasks coming from 
Participant disconnects
-      // that have some active current state (INIT or RUNNING) and the 
requestedState of DROPPED.
+      // and have the requestedState of DROPPED.
       // These need to be prioritized over any other state transitions because 
of the race condition
       // with the same pId (task) running on other instances. This is because 
in paMap, we can only
       // define one transition per pId
@@ -99,7 +99,7 @@ public abstract class AbstractTaskDispatcher {
       }
 
       // If not an excluded instance, we must instantiate its entry in 
assignedPartitions
-      Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
 
       // We need to remove all task pId's to be dropped because we already 
made an assignment in
       // paMap above for them to be dropped. The following does this.
@@ -107,8 +107,7 @@ public abstract class AbstractTaskDispatcher {
         pSet.removeAll(tasksToDrop.get(instance));
       }
 
-      // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
-      // TASK_ERROR, ERROR.
+      // Used to keep track of partitions that are in either INIT or DROPPED 
states
       Set<Integer> donePartitions = new TreeSet<>();
       for (int pId : pSet) {
         final String pName = pName(jobResource, pId);
@@ -121,17 +120,6 @@ public abstract class AbstractTaskDispatcher {
               instance, pId);
           continue;
         }
-        // This avoids a race condition in the case that although currentState 
is in the following
-        // error condition, the pending message (INIT->RUNNNING) might still 
be present.
-        // This is undesirable because this prevents JobContext from getting 
the proper update of
-        // fields including task state and task's NUM_ATTEMPTS
-        if (currState == TaskPartitionState.ERROR || currState == 
TaskPartitionState.TASK_ERROR
-            || currState == TaskPartitionState.TIMED_OUT
-            || currState == TaskPartitionState.TASK_ABORTED) {
-          // Do not increment the task attempt count here - it will be 
incremented at scheduling
-          // time
-          markPartitionError(jobCtx, pId, currState);
-        }
 
         // Check for pending state transitions on this (partition, instance). 
If there is a pending
         // state transition, we prioritize this pending state transition and 
set the assignment from
@@ -242,16 +230,16 @@ public abstract class AbstractTaskDispatcher {
         }
           break;
         case COMPLETED: {
-          // The task has completed on this partition. Mark as such in the 
context object.
-          donePartitions.add(pId);
+          // The task has completed on this partition. Drop it from the 
instance and add it to assignedPartitions in
+          // order to avoid scheduling it again in this pipeline.
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has completed with state %s. Marking as 
such in rebalancer context.",
                 pName, currState));
           }
           partitionsToDropFromIs.add(pId);
-          markPartitionCompleted(jobCtx, pId);
-
           // This task is COMPLETED, so release this task
           assignableInstanceManager.release(instance, taskConfig, quotaType);
         }
@@ -263,7 +251,11 @@ public abstract class AbstractTaskDispatcher {
         case TASK_ABORTED:
 
         case ERROR: {
-          donePartitions.add(pId); // The task may be rescheduled on a 
different instance.
+          // First make this task which is in terminal state to be dropped.
+          // Later on, in next pipeline in handleAdditionalAssignments, the 
task will be retried if possible.
+          // (meaning it is not ABORTED and max number of attempts has not 
been reached yet)
+          assignedPartitions.get(instance).add(pId);
+          paMap.put(pId, new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
           if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                 "Task partition %s has error state %s with msg %s. Marking as 
such in rebalancer context.",
@@ -389,13 +381,59 @@ public abstract class AbstractTaskDispatcher {
       return stateFromContext == null ? TaskPartitionState.INIT : 
stateFromContext;
     }
     TaskPartitionState currentState = 
TaskPartitionState.valueOf(currentStateString);
+    // Update job context based on current state
+    updatePartitionInformationInJobContext(currentStateOutput, jobResource, 
currentState, jobCtx,
+        pId, pName, instance);
+    return currentState;
+  }
+
+  /**
+   * Based on the CurrentState of this task and Context information, the task 
information in the job
+   * context gets updated.
+   * @param currentStateOutput
+   * @param jobResource
+   * @param currentState
+   * @param jobCtx
+   * @param pId
+   * @param pName
+   * @param instance
+   */
+  private void updatePartitionInformationInJobContext(CurrentStateOutput 
currentStateOutput,
+      String jobResource, TaskPartitionState currentState, JobContext jobCtx, 
Integer pId,
+      String pName, String instance) {
+    // The assignedParticipant field needs to be updated regardless of the 
current state and context
+    // information because it will prevent controller to assign the task to 
the wrong participant
+    // for targeted tasks when two CurrentStates exist for one task.
+    // In the updatePreviousAssignedTasksStatus, we check
+    // instance.equals(jobCtx.getAssignedParticipant(pId)) and bypass the 
assignment if instance is
+    // not equal to job context's AssignedParticipant for this pId.
     jobCtx.setAssignedParticipant(pId, instance);
-    jobCtx.setPartitionState(pId, currentState);
-    String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
-    if (taskMsg != null) {
-      jobCtx.setPartitionInfo(pId, taskMsg);
+    // If job context needs to be updated with new state, update it accordingly
+    // This check is necessary because we are relying on current state and we 
do not want to update
+    // context as long as current state existed. We just want to update 
context information
+    // (specially finish time) once.
+    // This condition checks whether jobContext's state is out of date or not.
+    if (!currentState.equals(jobCtx.getPartitionState(pId))) {
+      jobCtx.setPartitionState(pId, currentState);
+      String taskMsg = currentStateOutput.getInfo(jobResource, new 
Partition(pName), instance);
+      if (taskMsg != null) {
+        jobCtx.setPartitionInfo(pId, taskMsg);
+      }
+      if (currentState == TaskPartitionState.COMPLETED) {
+        markPartitionCompleted(jobCtx, pId);
+      }
+      // This avoids a race condition in the case that although currentState 
is in the following
+      // error condition, the pending message (INIT->RUNNNING) might still be 
present.
+      // This is undesirable because this prevents JobContext from getting the 
proper update of
+      // fields including task state and task's NUM_ATTEMPTS
+      if (currentState == TaskPartitionState.ERROR || currentState == 
TaskPartitionState.TASK_ERROR
+          || currentState == TaskPartitionState.TIMED_OUT
+          || currentState == TaskPartitionState.TASK_ABORTED) {
+        // Do not increment the task attempt count here - it will be 
incremented at scheduling
+        // time
+        markPartitionError(jobCtx, pId, currentState);
+      }
     }
-    return currentState;
   }
 
   /**
@@ -511,7 +549,7 @@ public abstract class AbstractTaskDispatcher {
   // Compute real assignment from theoretical calculation with applied 
throttling
   // This is the actual assigning part
   protected void handleAdditionalTaskAssignment(
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, 
Set<String> excludedInstances,
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments, 
Set<String> excludedInstances,
       String jobResource, CurrentStateOutput currStateOutput, JobContext 
jobCtx,
       final JobConfig jobCfg, final WorkflowConfig workflowConfig, 
WorkflowContext workflowCtx,
       final WorkflowControllerDataProvider cache,
@@ -580,7 +618,7 @@ public abstract class AbstractTaskDispatcher {
       // TODO: isRebalanceRunningTask() was originally put in place to allow 
users to move
       // ("rebalance") long-running tasks, but there hasn't been a clear use 
case for this
       // Previously, there was a bug in the condition above (it was || where 
it should have been &&)
-      dropRebalancedRunningTasks(tgtPartitionAssignments, 
prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, 
currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
 
@@ -588,11 +626,11 @@ public abstract class AbstractTaskDispatcher {
     if (!TaskUtil.isGenericTaskJob(jobCfg) && 
existsLiveInstanceOrCurrentStateChange) {
       // Drop current jobs only if they are assigned to a different instance, 
regardless of
       // the jobCfg.isRebalanceRunningTask() setting
-      dropRebalancedRunningTasks(tgtPartitionAssignments, 
prevInstanceToTaskAssignments, paMap,
+      dropRebalancedRunningTasks(tgtPartitionAssignments, 
currentInstanceToTaskAssignments, paMap,
           jobCtx);
     }
     // Go through ALL instances and assign/throttle tasks accordingly
-    for (Map.Entry<String, SortedSet<Integer>> entry : 
prevInstanceToTaskAssignments.entrySet()) {
+    for (Map.Entry<String, SortedSet<Integer>> entry : 
currentInstanceToTaskAssignments.entrySet()) {
       String instance = entry.getKey();
       if (!tgtPartitionAssignments.containsKey(instance)) {
         // There is no assignment made for this instance, so it is safe to skip
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index c14cee9..b35252c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -238,21 +238,20 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
     // These dropping transitions will be prioritized above all task state 
transition assignments
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
-    Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
-        getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource, tasksToDrop);
+    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
+        getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, 
jobResource, tasksToDrop);
 
-    updateInstanceToTaskAssignmentsFromContext(jobCtx, 
prevInstanceToTaskAssignments);
+    updateInstanceToTaskAssignmentsFromContext(jobCtx, 
currentInstanceToTaskAssignments);
 
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("All partitions: " + allPartitions + " taskAssignment: "
-          + prevInstanceToTaskAssignments + " excludedInstances: " + 
excludedInstances);
+          + currentInstanceToTaskAssignments + " excludedInstances: " + 
excludedInstances);
     }
 
     // Release resource for tasks in terminal state
-    updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
+    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, 
excludedInstances, jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, 
jobState,
         assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions, cache,
         tasksToDrop);
@@ -318,7 +317,7 @@ public class JobDispatcher extends AbstractTaskDispatcher {
     // Make additional task assignments if needed.
     if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
         && jobTgtState == TargetState.START) {
-      handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
+      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, 
excludedInstances, jobResource,
           currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
           prevTaskToInstanceStateAssignment, assignedPartitions, paMap, 
skippedPartitions,
           taskAssignmentCal, allPartitions, currentTime, liveInstances);
@@ -380,45 +379,24 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
 
   /**
    * @param liveInstances
-   * @param prevAssignment task partition -> (instance -> state)
-   * @param allTaskPartitions all task partitionIds
    * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
    *          are accounted for
    * @param jobName job name
    * @param tasksToDrop instance -> pId's, to gather all pIds that need to be 
dropped
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
-  protected static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
-      Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName,
+  protected static Map<String, SortedSet<Integer>> 
getCurrentInstanceToTaskAssignments(
+      Iterable<String> liveInstances, CurrentStateOutput currStateOutput, 
String jobName,
       Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<>());
     }
 
-    // First, add all task partitions from prevAssignment
-    // TODO: Remove this portion to get rid of prevAssignment from Task 
Framework
-    for (Partition partition : prevAssignment.getMappedPartitions()) {
-      int pId = TaskUtil.getPartitionId(partition.getPartitionName());
-      if (allTaskPartitions.contains(pId)) {
-        Map<String, String> replicaMap = 
prevAssignment.getReplicaMap(partition);
-        for (String instance : replicaMap.keySet()) {
-          SortedSet<Integer> pIdSet = result.get(instance);
-          if (pIdSet != null) {
-            pIdSet.add(pId);
-          }
-        }
-      }
-    }
-
-    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source 
of truth
-
-    // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
-    // from previous sessions won't show up in prevInstanceToTaskAssignments
-    // We need to add these back here in order for these task partitions to be 
dropped (after a
-    // copy-over, the Controller will send a message to drop the state 
currentState)
-    // partitions: (partition -> instance -> currentState)
+    // Generate currentInstanceToTaskAssignment with CurrentStateOutput as 
source of truth
+    // Add all pIds existing in CurrentStateOutput
+    // We need to add these pIds to result and update their states in 
JobContext in
+    // updatePreviousAssignedTasksStatus method.
     Map<Partition, Map<String, String>> partitions = 
currStateOutput.getCurrentStateMap(jobName);
     for (Map.Entry<Partition, Map<String, String>> entry : 
partitions.entrySet()) {
       // Get all (instance -> currentState) mappings
@@ -426,26 +404,14 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), 
instance);
-        TaskPartitionState currState = 
TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
 
         if (result.containsKey(instance)) {
-          // We must add all active task pIds back here because dropping 
transition could overwrite
-          // an active transition in paMap
-          // Add all task partitions in the following states:
-          // currState = INIT, requestedState = RUNNING (bootstrap)
-          // currState = RUNNING, requestedState = ANY (active)
-          // ** for tasks that are just in INIT state, we do not add them here 
because old
-          // Participants, upon connection reset, set tasks' currentStates to 
INIT. We cannot
-          // consider those tasks active **
-          if (currState == TaskPartitionState.INIT && requestedState != null
-              && requestedState.equals(TaskPartitionState.RUNNING.name())
-              || currState == TaskPartitionState.RUNNING) {
-            result.get(instance).add(pId);
-          }
-
+          result.get(instance).add(pId);
           // Check if this task needs to be dropped. If so, we need to add to 
tasksToDrop no matter
           // what its current state is so that it will be dropped
+          // This is trying to drop tasks on a reconnected instance with a new 
sessionId that have
+          // all of their requestedState == DROPPED
           if (requestedState != null && 
requestedState.equals(TaskPartitionState.DROPPED.name())) {
             if (!tasksToDrop.containsKey(instance)) {
               tasksToDrop.put(instance, new HashSet<>());
@@ -462,10 +428,10 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
    * If partition is missing from prevInstanceToTaskAssignments (e.g. previous 
assignment is
    * deleted) it is added from context. Otherwise, the context won't be 
updated.
    * @param jobCtx Job Context
-   * @param prevInstanceToTaskAssignments instance -> partitionIds from 
previous assignment
+   * @param currentInstanceToTaskAssignments instance -> partitionIds from 
CurrentStateOutput
    */
   protected void updateInstanceToTaskAssignmentsFromContext(JobContext jobCtx,
-      Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments) {
+      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments) {
     for (Integer partition : jobCtx.getPartitionSet()) {
       // We must add all active task pIds back here
       // The states other than Running and Init do not need to be added.
@@ -474,9 +440,9 @@ public class JobDispatcher extends AbstractTaskDispatcher {
           || jobCtx.getPartitionState(partition) == TaskPartitionState.INIT) {
         String instance = jobCtx.getAssignedParticipant(partition);
         if (instance != null) {
-          if (prevInstanceToTaskAssignments.containsKey(instance)
-              && 
!prevInstanceToTaskAssignments.get(instance).contains(partition)) {
-            prevInstanceToTaskAssignments.get(instance).add(partition);
+          if (currentInstanceToTaskAssignments.containsKey(instance)
+              && 
!currentInstanceToTaskAssignments.get(instance).contains(partition)) {
+            currentInstanceToTaskAssignments.get(instance).add(partition);
           }
         }
       }
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
 
b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
index ca07e97..58dc2d1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestDropTerminalTasksUponReset.java
@@ -90,8 +90,7 @@ public class TestDropTerminalTasksUponReset {
     Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
 
     // Call the static method we are testing
-    JobDispatcher.getPrevInstanceToTaskAssignments(liveInstances, 
prevAssignment, allTaskPartitions,
-        currentStateOutput, jobName, tasksToDrop);
+    JobDispatcher.getCurrentInstanceToTaskAssignments(liveInstances, 
currentStateOutput, jobName, tasksToDrop);
 
     // Check that tasksToDrop has (numTasks / 2) partitions as we intended 
regardless of what the
     // current states of the tasks were

Reply via email to