This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 70b63558797d193164a08bc73eb99816cf3a6094
Author: narendly <[email protected]>
AuthorDate: Tue Feb 26 17:24:56 2019 -0800

    [HELIX-808] TASK: Fix double-booking of tasks with task CurrentStates
    
    It was observed that TestNoDoubleAssign was failing intermittently. Upon 
debugging with more detailed logs, there was a race condition between newly 
starting tasks and dropping tasks. To prevent this, dropping state transitions 
will be prioritized and prevInstanceToTaskAssignment will be built from 
CurrentStates. This is needed to make sure the right number of tasks are 
assigned every task pipeline and dropping transitions happen right away.
    
        Changelist:
        1\. Change the logic for generating prevInstToTaskAssignment so that 
it's based on CurrentState
        2\. Add a special check for not updating task partition state upon 
Participant connection loss
        3\. TestNoDoubleAssign passes consistently
        4. Fix TestNoDoubleAssign so that there won't be any thread leak
---
 .../apache/helix/task/AbstractTaskDispatcher.java  | 35 +++++++++++++++--
 .../java/org/apache/helix/task/JobDispatcher.java  | 44 ++++++++++++++++++----
 .../helix/integration/task/TestNoDoubleAssign.java |  7 ++--
 3 files changed, 72 insertions(+), 14 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
index 1bfab8b..b853198 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java
@@ -47,20 +47,42 @@ public abstract class AbstractTaskDispatcher {
       ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState,
       Map<String, Set<Integer>> assignedPartitions, Set<Integer> 
partitionsToDropFromIs,
       Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
-      Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache) {
+      Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
+      Map<String, Set<Integer>> tasksToDrop) {
 
     // Get AssignableInstanceMap for releasing resources for tasks in terminal 
states
     AssignableInstanceManager assignableInstanceManager = 
cache.getAssignableInstanceManager();
 
     // Iterate through all instances
     for (String instance : prevInstanceToTaskAssignments.keySet()) {
+      assignedPartitions.put(instance, new HashSet<Integer>());
+
+      // Set all dropping transitions first. These are tasks coming from 
Participant disconnects
+      // that have some active current state (INIT or RUNNING) and the 
requestedState of DROPPED.
+      // These need to be prioritized over any other state transitions because 
of the race condition
+      // with the same pId (task) running on other instances. This is because 
in paMap, we can only
+      // define one transition per pId
+      if (tasksToDrop.containsKey(instance)) {
+        for (int pIdToDrop : tasksToDrop.get(instance)) {
+          paMap.put(pIdToDrop,
+              new PartitionAssignment(instance, 
TaskPartitionState.DROPPED.name()));
+          assignedPartitions.get(instance).add(pIdToDrop);
+        }
+      }
+
       if (excludedInstances.contains(instance)) {
         continue;
       }
 
       // If not an excluded instance, we must instantiate its entry in 
assignedPartitions
-      assignedPartitions.put(instance, new HashSet<Integer>());
       Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance);
+
+      // We need to remove all task pId's to be dropped because we already 
made an assignment in
+      // paMap above for them to be dropped. The following does this.
+      if (tasksToDrop.containsKey(instance)) {
+        pSet.removeAll(tasksToDrop.get(instance));
+      }
+
       // Used to keep track of partitions that are in one of the final states: 
COMPLETED, TIMED_OUT,
       // TASK_ERROR, ERROR.
       Set<Integer> donePartitions = new TreeSet<>();
@@ -123,7 +145,14 @@ public abstract class AbstractTaskDispatcher {
             continue;
           }
 
-          paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
+          // This contains check is necessary because we have already 
traversed pIdsToDrop at the
+          // beginning of this method. If we already have a dropping 
transition, we do not want to
+          // overwrite it. Any other requestedState transitions (for example, 
INIT to RUNNING or
+          // RUNNING to COMPLETE, can wait without affecting correctness - 
they will be picked up
+          // in ensuing runs of the Task pipeline)
+          if (!paMap.containsKey(pId)) {
+            paMap.put(pId, new PartitionAssignment(instance, 
requestedState.name()));
+          }
           assignedPartitions.get(instance).add(pId);
           if (LOG.isDebugEnabled()) {
             LOG.debug(
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java 
b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index 2ae5b2f..596b54b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -204,9 +204,14 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       return new ResourceAssignment(jobResource);
     }
 
+    // This set contains all task pIds that need to be dropped because 
requestedState is DROPPED
+    // Newer versions of Participants, upon connection reset, sets task 
requestedStates to DROPPED
+    // These dropping transitions will be prioritized above all task state 
transition assignments
+    Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
+
     Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments =
         getPrevInstanceToTaskAssignments(liveInstances, 
prevTaskToInstanceStateAssignment,
-            allPartitions, currStateOutput, jobResource);
+            allPartitions, currStateOutput, jobResource, tasksToDrop);
     long currentTime = System.currentTimeMillis();
 
     if (LOG.isDebugEnabled()) {
@@ -217,14 +222,15 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
     // Release resource for tasks in terminal state
     updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, 
excludedInstances, jobResource,
         currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, 
jobState,
-        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions, cache);
+        assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, 
skippedPartitions, cache,
+        tasksToDrop);
 
     addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
 
     if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > 
jobCfg.getFailureThreshold()
         || (jobCfg.getTargetResource() != null
-        && cache.getIdealState(jobCfg.getTargetResource()) != null
-        && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
+            && cache.getIdealState(jobCfg.getTargetResource()) != null
+            && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
       if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
         failJob(jobResource, workflowCtx, jobCtx, workflowConfig, 
cache.getJobConfigMap(), cache);
         return buildEmptyAssignment(jobResource, currStateOutput);
@@ -345,17 +351,20 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
    * @param currStateOutput currentStates to make sure currentStates copied 
over expired sessions
    *          are accounted for
    * @param jobName job name
+   * @param tasksToDrop instance -> pId's, to gather all pIds that need to be 
dropped
    * @return instance -> partitionIds from previous assignment, if the 
instance is still live
    */
   private static Map<String, SortedSet<Integer>> 
getPrevInstanceToTaskAssignments(
       Iterable<String> liveInstances, ResourceAssignment prevAssignment,
-      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName) {
+      Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, 
String jobName,
+      Map<String, Set<Integer>> tasksToDrop) {
     Map<String, SortedSet<Integer>> result = new HashMap<>();
     for (String instance : liveInstances) {
       result.put(instance, new TreeSet<Integer>());
     }
 
     // First, add all task partitions from JobContext
+    // TODO: Remove this portion to get rid of prevAssignment from Task 
Framework
     for (Partition partition : prevAssignment.getMappedPartitions()) {
       int pId = TaskUtil.getPartitionId(partition.getPartitionName());
       if (allTaskPartitions.contains(pId)) {
@@ -369,6 +378,8 @@ public class JobDispatcher extends AbstractTaskDispatcher {
       }
     }
 
+    // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source 
of truth
+
     // Add all pIds existing in CurrentStateOutput as well because task 
currentStates copied over
     // from previous sessions won't show up in prevInstanceToTaskAssignments
     // We need to add these back here in order for these task partitions to be 
dropped (after a
@@ -381,10 +392,27 @@ public class JobDispatcher extends AbstractTaskDispatcher 
{
         String instance = instanceToCurrState.getKey();
         String requestedState =
             currStateOutput.getRequestedState(jobName, entry.getKey(), 
instance);
+        TaskPartitionState currState = 
TaskPartitionState.valueOf(instanceToCurrState.getValue());
         int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName());
-        if (result.containsKey(instance) && requestedState != null
-            && requestedState.equals(TaskPartitionState.DROPPED.name())) {
-          // Only if this instance is live and requestedState is DROPPED
+
+        // We must add all active task pIds back here because dropping 
transition could overwrite an
+        // active transition in paMap
+        // Add all task partitions in the following states:
+        // currState = INIT, requestedState = RUNNING (bootstrap)
+        // currState = RUNNING, requestedState = ANY (active)
+        // ** for tasks that are just in INIT state, we do not add them here 
because old
+        // Participants, upon connection reset, set tasks' currentStates to 
INIT. We cannot consider
+        // those tasks active **
+        if (result.containsKey(instance) && (currState == 
TaskPartitionState.INIT
+            && requestedState != null && 
requestedState.equals(TaskPartitionState.RUNNING.name())
+            || currState == TaskPartitionState.RUNNING)) {
+          // Check if this is a dropping transition
+          if (requestedState != null && 
requestedState.equals(TaskPartitionState.DROPPED.name())) {
+            if (!tasksToDrop.containsKey(instance)) {
+              tasksToDrop.put(instance, new HashSet<Integer>());
+            }
+            tasksToDrop.get(instance).add(pId);
+          }
           result.get(instance).add(pId);
         }
       }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
index 8fa3f94..88da7fb 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java
@@ -45,7 +45,7 @@ import com.google.common.collect.ImmutableMap;
 public class TestNoDoubleAssign extends TaskTestBase {
   private static final int THREAD_COUNT = 20;
   private static final long CONNECTION_DELAY = 100L;
-  private static final long POLL_DELAY = 100L;
+  private static final long POLL_DELAY = 50L;
   private static final String TASK_DURATION = "200";
   private static final Random RANDOM = new Random();
 
@@ -102,8 +102,6 @@ public class TestNoDoubleAssign extends TaskTestBase {
     pollForDoubleAssign();
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
-    Assert.assertFalse(_existsDoubleAssign.get());
-
     // Shut down thread pools
     _executorServicePoll.shutdown();
     _executorServiceConnection.shutdown();
@@ -118,6 +116,9 @@ public class TestNoDoubleAssign extends TaskTestBase {
       _executorServicePoll.shutdownNow();
       _executorServiceConnection.shutdownNow();
     }
+    Thread.sleep(500L);
+
+    Assert.assertFalse(_existsDoubleAssign.get());
   }
 
   /**

Reply via email to