This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 70b63558797d193164a08bc73eb99816cf3a6094 Author: narendly <[email protected]> AuthorDate: Tue Feb 26 17:24:56 2019 -0800 [HELIX-808] TASK: Fix double-booking of tasks with task CurrentStates It was observed that TestNoDoubleAssign was failing intermittently. Upon debugging with more detailed logs, there was a race condition between newly starting tasks and dropping tasks. To prevent this, dropping state transitions will be prioritized and prevInstanceToTaskAssignment will be built from CurrentStates. This is needed to make sure the right number of tasks are assigned every task pipeline and dropping transitions happen right away. Changelist: 1\. Change the logic for generating prevInstToTaskAssignment so that it's based on CurrentState 2\. Add a special check for not updating task partition state upon Participant connection loss 3\. TestNoDoubleAssign passes consistently 4. Fix TestNoDoubleAssign so that there won't be any thread leak --- .../apache/helix/task/AbstractTaskDispatcher.java | 35 +++++++++++++++-- .../java/org/apache/helix/task/JobDispatcher.java | 44 ++++++++++++++++++---- .../helix/integration/task/TestNoDoubleAssign.java | 7 ++-- 3 files changed, 72 insertions(+), 14 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 1bfab8b..b853198 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -47,20 +47,42 @@ public abstract class AbstractTaskDispatcher { ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState, Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs, Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState, - Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache) { + Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache, + Map<String, Set<Integer>> tasksToDrop) { // Get AssignableInstanceMap for releasing resources for tasks in terminal states AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager(); // Iterate through all instances for (String instance : prevInstanceToTaskAssignments.keySet()) { + assignedPartitions.put(instance, new HashSet<Integer>()); + + // Set all dropping transitions first. These are tasks coming from Participant disconnects + // that have some active current state (INIT or RUNNING) and the requestedState of DROPPED. + // These need to be prioritized over any other state transitions because of the race condition + // with the same pId (task) running on other instances. This is because in paMap, we can only + // define one transition per pId + if (tasksToDrop.containsKey(instance)) { + for (int pIdToDrop : tasksToDrop.get(instance)) { + paMap.put(pIdToDrop, + new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + assignedPartitions.get(instance).add(pIdToDrop); + } + } + if (excludedInstances.contains(instance)) { continue; } // If not an excluded instance, we must instantiate its entry in assignedPartitions - assignedPartitions.put(instance, new HashSet<Integer>()); Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance); + + // We need to remove all task pId's to be dropped because we already made an assignment in + // paMap above for them to be dropped. The following does this. + if (tasksToDrop.containsKey(instance)) { + pSet.removeAll(tasksToDrop.get(instance)); + } + // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, // TASK_ERROR, ERROR. Set<Integer> donePartitions = new TreeSet<>(); @@ -123,7 +145,14 @@ public abstract class AbstractTaskDispatcher { continue; } - paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); + // This contains check is necessary because we have already traversed pIdsToDrop at the + // beginning of this method. If we already have a dropping transition, we do not want to + // overwrite it. Any other requestedState transitions (for example, INIT to RUNNING or + // RUNNING to COMPLETE, can wait without affecting correctness - they will be picked up + // in ensuing runs of the Task pipeline) + if (!paMap.containsKey(pId)) { + paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); + } assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 2ae5b2f..596b54b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -204,9 +204,14 @@ public class JobDispatcher extends AbstractTaskDispatcher { return new ResourceAssignment(jobResource); } + // This set contains all task pIds that need to be dropped because requestedState is DROPPED + // Newer versions of Participants, upon connection reset, sets task requestedStates to DROPPED + // These dropping transitions will be prioritized above all task state transition assignments + Map<String, Set<Integer>> tasksToDrop = new HashMap<>(); + Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments = getPrevInstanceToTaskAssignments(liveInstances, prevTaskToInstanceStateAssignment, - allPartitions, currStateOutput, jobResource); + allPartitions, currStateOutput, jobResource, tasksToDrop); long currentTime = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { @@ -217,14 +222,15 @@ public class JobDispatcher extends AbstractTaskDispatcher { // Release resource for tasks in terminal state updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, jobResource, currStateOutput, jobCtx, jobCfg, prevTaskToInstanceStateAssignment, jobState, - assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache); + assignedPartitions, partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, + tasksToDrop); addGiveupPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg); if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold() || (jobCfg.getTargetResource() != null - && cache.getIdealState(jobCfg.getTargetResource()) != null - && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) { + && cache.getIdealState(jobCfg.getTargetResource()) != null + && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) { if (isJobFinished(jobCtx, jobResource, currStateOutput)) { failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache); return buildEmptyAssignment(jobResource, currStateOutput); @@ -345,17 +351,20 @@ public class JobDispatcher extends AbstractTaskDispatcher { * @param currStateOutput currentStates to make sure currentStates copied over expired sessions * are accounted for * @param jobName job name + * @param tasksToDrop instance -> pId's, to gather all pIds that need to be dropped * @return instance -> partitionIds from previous assignment, if the instance is still live */ private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments( Iterable<String> liveInstances, ResourceAssignment prevAssignment, - Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName) { + Set<Integer> allTaskPartitions, CurrentStateOutput currStateOutput, String jobName, + Map<String, Set<Integer>> tasksToDrop) { Map<String, SortedSet<Integer>> result = new HashMap<>(); for (String instance : liveInstances) { result.put(instance, new TreeSet<Integer>()); } // First, add all task partitions from JobContext + // TODO: Remove this portion to get rid of prevAssignment from Task Framework for (Partition partition : prevAssignment.getMappedPartitions()) { int pId = TaskUtil.getPartitionId(partition.getPartitionName()); if (allTaskPartitions.contains(pId)) { @@ -369,6 +378,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { } } + // Generate prevInstanceToTaskAssignment with CurrentStateOutput as source of truth + // Add all pIds existing in CurrentStateOutput as well because task currentStates copied over // from previous sessions won't show up in prevInstanceToTaskAssignments // We need to add these back here in order for these task partitions to be dropped (after a @@ -381,10 +392,27 @@ public class JobDispatcher extends AbstractTaskDispatcher { String instance = instanceToCurrState.getKey(); String requestedState = currStateOutput.getRequestedState(jobName, entry.getKey(), instance); + TaskPartitionState currState = TaskPartitionState.valueOf(instanceToCurrState.getValue()); int pId = TaskUtil.getPartitionId(entry.getKey().getPartitionName()); - if (result.containsKey(instance) && requestedState != null - && requestedState.equals(TaskPartitionState.DROPPED.name())) { - // Only if this instance is live and requestedState is DROPPED + + // We must add all active task pIds back here because dropping transition could overwrite an + // active transition in paMap + // Add all task partitions in the following states: + // currState = INIT, requestedState = RUNNING (bootstrap) + // currState = RUNNING, requestedState = ANY (active) + // ** for tasks that are just in INIT state, we do not add them here because old + // Participants, upon connection reset, set tasks' currentStates to INIT. We cannot consider + // those tasks active ** + if (result.containsKey(instance) && (currState == TaskPartitionState.INIT + && requestedState != null && requestedState.equals(TaskPartitionState.RUNNING.name()) + || currState == TaskPartitionState.RUNNING)) { + // Check if this is a dropping transition + if (requestedState != null && requestedState.equals(TaskPartitionState.DROPPED.name())) { + if (!tasksToDrop.containsKey(instance)) { + tasksToDrop.put(instance, new HashSet<Integer>()); + } + tasksToDrop.get(instance).add(pId); + } result.get(instance).add(pId); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java index 8fa3f94..88da7fb 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java @@ -45,7 +45,7 @@ import com.google.common.collect.ImmutableMap; public class TestNoDoubleAssign extends TaskTestBase { private static final int THREAD_COUNT = 20; private static final long CONNECTION_DELAY = 100L; - private static final long POLL_DELAY = 100L; + private static final long POLL_DELAY = 50L; private static final String TASK_DURATION = "200"; private static final Random RANDOM = new Random(); @@ -102,8 +102,6 @@ public class TestNoDoubleAssign extends TaskTestBase { pollForDoubleAssign(); _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); - Assert.assertFalse(_existsDoubleAssign.get()); - // Shut down thread pools _executorServicePoll.shutdown(); _executorServiceConnection.shutdown(); @@ -118,6 +116,9 @@ public class TestNoDoubleAssign extends TaskTestBase { _executorServicePoll.shutdownNow(); _executorServiceConnection.shutdownNow(); } + Thread.sleep(500L); + + Assert.assertFalse(_existsDoubleAssign.get()); } /**
