This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit ddb3690486a631efa9db704531781745d02ee546 Author: narendly <naren...@gmail.com> AuthorDate: Mon Feb 25 17:49:45 2019 -0800 [HELIX-794] TASK: Fix double-booking of tasks upon Participant disconnect It's been observed in production use cases that when there are transient Participant connection issues, the Controller would fail to honor maxNumberOfTasksPerInstance limit. That is to say, if the user wants only 1 task from a job (limit is set to 1), Helix must assign up to 1 task onto an instance. But upon short Participant disconnects, we saw 2 tasks in RUNNING at the same time. The cause for this is the incorrect calculation of jobConfigLimitation in AbstractTaskDispatcher. This fixes this by utilizing a Map (assignedPartitions) to calculate the correct number of tasks to assign. Changelist: 1. Modify an internal data structure (assignedPartitions) 2. Fix the logic that calculates the number of tasks to assign --- .../apache/helix/task/AbstractTaskDispatcher.java | 38 +++-- .../java/org/apache/helix/task/JobDispatcher.java | 3 +- .../helix/integration/task/TestNoDoubleAssign.java | 171 +++++++++++++++++++++ 3 files changed, 195 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java index 6fe5f7b..1bfab8b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java @@ -45,7 +45,7 @@ public abstract class AbstractTaskDispatcher { Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, ResourceAssignment prevTaskToInstanceStateAssignment, TaskState jobState, - Set<Integer> assignedPartitions, Set<Integer> partitionsToDropFromIs, + Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs, Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState, Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache) { @@ -58,6 +58,8 @@ public abstract class AbstractTaskDispatcher { continue; } + // If not an excluded instance, we must instantiate its entry in assignedPartitions + assignedPartitions.put(instance, new HashSet<Integer>()); Set<Integer> pSet = prevInstanceToTaskAssignments.get(instance); // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, // TASK_ERROR, ERROR. @@ -122,7 +124,7 @@ public abstract class AbstractTaskDispatcher { } paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug( String.format("Instance %s requested a state transition to %s for partition %s.", @@ -146,7 +148,7 @@ public abstract class AbstractTaskDispatcher { } paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, nextState, instance)); @@ -170,7 +172,7 @@ public abstract class AbstractTaskDispatcher { assignableInstanceManager.release(instance, taskConfig, quotaType); } paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, @@ -252,7 +254,7 @@ public abstract class AbstractTaskDispatcher { // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); } } @@ -335,7 +337,7 @@ public abstract class AbstractTaskDispatcher { private void processTaskWithPendingMessage(ResourceAssignment prevAssignment, Integer pId, String pName, String instance, Message pendingMessage, TaskState jobState, TaskPartitionState currState, Map<Integer, PartitionAssignment> paMap, - Set<Integer> assignedPartitions) { + Map<String, Set<Integer>> assignedPartitions) { // stateMap is a mapping of Instance -> TaskPartitionState (String) Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); @@ -352,7 +354,7 @@ public abstract class AbstractTaskDispatcher { // While job is timing out, if the task is pending on INIT->RUNNING, set it back to INIT, // so that Helix will cancel the transition. paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name())); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug(String.format( "Task partition %s has a pending state transition on instance %s INIT->RUNNING. Previous state %s" @@ -363,7 +365,7 @@ public abstract class AbstractTaskDispatcher { // Otherwise, Just copy forward // the state assignment from the previous ideal state. paMap.put(pId, new PartitionAssignment(instance, prevState)); - assignedPartitions.add(pId); + assignedPartitions.get(instance).add(pId); if (LOG.isDebugEnabled()) { LOG.debug(String.format( "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", @@ -440,11 +442,11 @@ public abstract class AbstractTaskDispatcher { protected void handleAdditionalTaskAssignment( Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments, Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput, JobContext jobCtx, JobConfig jobCfg, - WorkflowConfig workflowConfig, WorkflowContext workflowCtx, WorkflowControllerDataProvider cache, - ResourceAssignment prevTaskToInstanceStateAssignment, Set<Integer> assignedPartitions, - Map<Integer, PartitionAssignment> paMap, Set<Integer> skippedPartitions, - TaskAssignmentCalculator taskAssignmentCal, Set<Integer> allPartitions, long currentTime, - Collection<String> liveInstances) { + WorkflowConfig workflowConfig, WorkflowContext workflowCtx, + WorkflowControllerDataProvider cache, ResourceAssignment prevTaskToInstanceStateAssignment, + Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap, + Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal, + Set<Integer> allPartitions, long currentTime, Collection<String> liveInstances) { // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline boolean existsLiveInstanceOrCurrentStateChange = @@ -453,7 +455,11 @@ public abstract class AbstractTaskDispatcher { // The excludeSet contains the set of task partitions that must be excluded from consideration // when making any new assignments. // This includes all completed, failed, delayed, and already assigned partitions. - Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); + Set<Integer> excludeSet = Sets.newTreeSet(); + // Add all assigned partitions to excludeSet + for (Set<Integer> assignedSet : assignedPartitions.values()) { + excludeSet.addAll(assignedSet); + } addCompletedTasks(excludeSet, jobCtx, allPartitions); addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); excludeSet.addAll(skippedPartitions); @@ -540,8 +546,8 @@ public abstract class AbstractTaskDispatcher { } // 1. throttled by job configuration // Contains the set of task partitions currently assigned to the instance. - Set<Integer> pSet = entry.getValue(); - int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); + int jobCfgLimitation = + jobCfg.getNumConcurrentTasksPerInstance() - assignedPartitions.get(instance).size(); // 2. throttled by participant capacity int participantCapacity = cache.getInstanceConfigMap().get(instance).getMaxConcurrentTask(); if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) { diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 59a37ba..408c9f1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -176,7 +176,8 @@ public class JobDispatcher extends AbstractTaskDispatcher { Set<Integer> partitionsToDropFromIs, WorkflowControllerDataProvider cache) { // Used to keep track of tasks that have already been assigned to instances. - Set<Integer> assignedPartitions = new HashSet<>(); + // InstanceName -> Set of task partitions assigned to that instance in this iteration + Map<String, Set<Integer>> assignedPartitions = new HashMap<>(); // Used to keep track of tasks that have failed, but whose failure is acceptable Set<Integer> skippedPartitions = new HashSet<>(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java new file mode 100644 index 0000000..8fa3f94 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestNoDoubleAssign.java @@ -0,0 +1,171 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.Workflow; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; + +public class TestNoDoubleAssign extends TaskTestBase { + private static final int THREAD_COUNT = 20; + private static final long CONNECTION_DELAY = 100L; + private static final long POLL_DELAY = 100L; + private static final String TASK_DURATION = "200"; + private static final Random RANDOM = new Random(); + + private ScheduledExecutorService _executorServicePoll; + private ScheduledExecutorService _executorServiceConnection; + private AtomicBoolean _existsDoubleAssign = new AtomicBoolean(false); + private Set<String> _jobNames = new HashSet<>(); + + @BeforeClass + public void beforeClass() throws Exception { + _numDbs = 0; + _numPartitions = 0; + _numReplicas = 0; + super.beforeClass(); + } + + /** + * Tests that no Participants have more tasks from the same job than is specified in the config. + * (MaxConcurrentTaskPerInstance, default value = 1) + * NOTE: this test is supposed to generate a lot of Participant-side ERROR message (ZkClient + * already closed!) because we are disconnecting them on purpose. + */ + @Test + public void testNoDoubleAssign() throws InterruptedException { + // Some arbitrary workload that creates a reasonably large amount of tasks + int workload = 10; + + // Create a workflow with jobs and tasks + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder builder = new Workflow.Builder(workflowName); + for (int i = 0; i < workload; i++) { + List<TaskConfig> taskConfigs = new ArrayList<>(); + for (int j = 0; j < workload; j++) { + String taskID = "JOB_" + i + "_TASK_" + j; + TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder(); + taskConfigBuilder.setTaskId(taskID).setCommand(MockTask.TASK_COMMAND) + .addConfig(MockTask.JOB_DELAY, TASK_DURATION); + taskConfigs.add(taskConfigBuilder.build()); + } + String jobName = "JOB_" + i; + _jobNames.add(workflowName + "_" + jobName); // Add the namespaced job name + JobConfig.Builder jobBuilder = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(10000) + .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG) + .addTaskConfigs(taskConfigs).setIgnoreDependentJobFailure(true) + .setFailureThreshold(100000) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, TASK_DURATION)); + builder.addJob(jobName, jobBuilder); + } + // Start the workflow + _driver.start(builder.build()); + _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS); + breakConnection(); + pollForDoubleAssign(); + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + Assert.assertFalse(_existsDoubleAssign.get()); + + // Shut down thread pools + _executorServicePoll.shutdown(); + _executorServiceConnection.shutdown(); + try { + if (!_executorServicePoll.awaitTermination(60, TimeUnit.SECONDS)) { + _executorServicePoll.shutdownNow(); + } + if (!_executorServiceConnection.awaitTermination(60, TimeUnit.SECONDS)) { + _executorServiceConnection.shutdownNow(); + } + } catch (InterruptedException e) { + _executorServicePoll.shutdownNow(); + _executorServiceConnection.shutdownNow(); + } + } + + /** + * Fetch the JobContext for all jobs in ZK and check that no two tasks are running on the same + * Participant. + */ + private void pollForDoubleAssign() { + _executorServicePoll = Executors.newScheduledThreadPool(THREAD_COUNT); + _executorServicePoll.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + if (!_existsDoubleAssign.get()) { + // Get JobContexts and test that they are assigned to disparate Participants + for (String job : _jobNames) { + JobContext jobContext = _driver.getJobContext(job); + if (jobContext == null) { + continue; + } + Set<String> instanceCache = new HashSet<>(); + for (int partition : jobContext.getPartitionSet()) { + if (jobContext.getPartitionState(partition) == TaskPartitionState.RUNNING) { + if (instanceCache.contains(jobContext.getAssignedParticipant(partition))) { + // Two tasks running on the same instance at the same time + _existsDoubleAssign.set(true); + return; + } + instanceCache.add(jobContext.getAssignedParticipant(partition)); + } + } + } + } + } + }, 0L, POLL_DELAY, TimeUnit.MILLISECONDS); + } + + /** + * Randomly causes Participants to lost connection temporarily. + */ + private void breakConnection() { + _executorServiceConnection = Executors.newScheduledThreadPool(THREAD_COUNT); + _executorServiceConnection.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + // Randomly pick a Participant and cause a transient connection issue + int participantIndex = RANDOM.nextInt(_numNodes); + _participants[participantIndex].syncStop(); + startParticipant(participantIndex); + } + }, 0L, CONNECTION_DELAY, TimeUnit.MILLISECONDS); + } +}