This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 24.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/24.0.1 by this push:
new 4ffdea9014 Fix Overlord leader election when task lock re-acquisition
fails (#13236)
4ffdea9014 is described below
commit 4ffdea901436bd976663022f30ac0413ecbd021a
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 18 21:10:15 2022 +0530
Fix Overlord leader election when task lock re-acquisition fails (#13236)
Overlord leader election can sometimes fail due to task lock re-acquisition
issues.
This commit solves the issue by failing such tasks and clearing all their
locks.
---
.../druid/indexing/overlord/TaskLockbox.java | 43 +++++++--
.../indexing/overlord/TaskLockboxSyncResult.java | 46 +++++++++
.../apache/druid/indexing/overlord/TaskMaster.java | 1 -
.../apache/druid/indexing/overlord/TaskQueue.java | 15 +++
.../druid/indexing/overlord/TaskLockboxTest.java | 63 ++++++++++++
.../druid/indexing/overlord/http/OverlordTest.java | 106 +++++++++++++--------
6 files changed, 225 insertions(+), 49 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index a53af64591..41caf0620f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -108,17 +108,19 @@ public class TaskLockbox
/**
* Wipe out our current in-memory state and resync it from our bundled
{@link TaskStorage}.
+ *
+ * @return SyncResult which needs to be processed by the caller
*/
- public void syncFromStorage()
+ public TaskLockboxSyncResult syncFromStorage()
{
giant.lock();
try {
// Load stuff from taskStorage first. If this fails, we don't want to
lose all our locks.
- final Set<String> storedActiveTasks = new HashSet<>();
+ final Set<Task> storedActiveTasks = new HashSet<>();
final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>();
for (final Task task : taskStorage.getActiveTasks()) {
- storedActiveTasks.add(task.getId());
+ storedActiveTasks.add(task);
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
@@ -138,7 +140,12 @@ public class TaskLockbox
};
running.clear();
activeTasks.clear();
- activeTasks.addAll(storedActiveTasks);
+ activeTasks.addAll(storedActiveTasks.stream()
+ .map(Task::getId)
+ .collect(Collectors.toSet())
+ );
+ // Set of task groups in which at least one task failed to re-acquire a
lock
+ final Set<String> failedToReacquireLockTaskGroups = new HashSet<>();
// Bookkeeping for a log message at the end
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock :
byVersionOrdering.sortedCopy(storedLocks)) {
@@ -183,20 +190,39 @@ public class TaskLockbox
);
}
} else {
- throw new ISE(
- "Could not reacquire lock on interval[%s] version[%s] for task:
%s",
+ failedToReacquireLockTaskGroups.add(task.getGroupId());
+ log.error(
+ "Could not reacquire lock on interval[%s] version[%s] for task:
%s from group %s.",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
- task.getId()
+ task.getId(),
+ task.getGroupId()
);
+ continue;
}
}
+
+ Set<Task> tasksToFail = new HashSet<>();
+ for (Task task : storedActiveTasks) {
+ if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
+ tasksToFail.add(task);
+ activeTasks.remove(task.getId());
+ }
+ }
+
log.info(
"Synced %,d locks for %,d activeTasks from storage (%,d locks
ignored).",
taskLockCount,
activeTasks.size(),
storedLocks.size() - taskLockCount
);
+
+ if (!failedToReacquireLockTaskGroups.isEmpty()) {
+ log.warn("Marking all tasks from task groups[%s] to be failed "
+ + "as they failed to reacquire at least one lock.",
failedToReacquireLockTaskGroups);
+ }
+
+ return new TaskLockboxSyncResult(tasksToFail);
}
finally {
giant.unlock();
@@ -207,7 +233,8 @@ public class TaskLockbox
* This method is called only in {@link #syncFromStorage()} and verifies the
given task and the taskLock have the same
* groupId, dataSource, and priority.
*/
- private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
+ @VisibleForTesting
+ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
{
giant.lock();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
new file mode 100644
index 0000000000..b7273b6bde
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Result of TaskLockbox#syncFromStorage()
+ * Contains tasks which need to be forcefully failed to let the overlord
become the leader
+ */
+class TaskLockboxSyncResult
+{
+ private final Set<Task> tasksToFail;
+
+ TaskLockboxSyncResult(Set<Task> tasksToFail)
+ {
+ this.tasksToFail = tasksToFail;
+ }
+
+ /**
+ * Return set of tasks which need to be forcefully failed due to lock
re-acquisition failure
+ */
+ Set<Task> getTasksToFail()
+ {
+ return tasksToFail;
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index b52a3c5c03..7b9101cf1f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -113,7 +113,6 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
log.info("By the power of Grayskull, I have the power!");
try {
- taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskLockConfig,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 07f136c723..c508876f0c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -34,6 +34,7 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
+import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
@@ -173,6 +174,13 @@ public class TaskQueue
Preconditions.checkState(!active, "queue must be stopped");
active = true;
syncFromStorage();
+ // Mark these tasks as failed as they could not reacuire the lock
+ // Clean up needs to happen after tasks have been synced from storage
+ Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+ for (Task task : tasksToFail) {
+ shutdown(task.getId(),
+ "Shutting down forcefully as task failed to reacquire lock
while becoming leader");
+ }
managerExec.submit(
new Runnable()
{
@@ -228,6 +236,13 @@ public class TaskQueue
}
);
requestManagement();
+ // Remove any unacquired locks from storage (shutdown only clears
entries for which a TaskLockPosse was acquired)
+ // This is called after requesting management as locks need to be
cleared after notifyStatus is processed
+ for (Task task : tasksToFail) {
+ for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+ taskStorage.removeLock(task.getId(), lock);
+ }
+ }
}
finally {
giant.unlock();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 1da1ae18f8..834bdf9429 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
@@ -1258,6 +1259,47 @@ public class TaskLockboxTest
);
}
+ @Test
+ public void testFailedToReacquireTaskLock() throws Exception
+ {
+ // Tasks to be failed have a group id with the substring
"FailingLockAcquisition"
+ // Please refer to NullLockPosseTaskLockbox
+ final Task taskWithFailingLockAcquisition0 =
NoopTask.withGroupId("FailingLockAcquisition");
+ final Task taskWithFailingLockAcquisition1 =
NoopTask.withGroupId("FailingLockAcquisition");
+ final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
+ taskStorage.insert(taskWithFailingLockAcquisition0,
TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
+ taskStorage.insert(taskWithFailingLockAcquisition1,
TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
+ taskStorage.insert(taskWithSuccessfulLockAcquisition,
TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
+
+ TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage,
metadataStorageCoordinator);
+ testLockbox.add(taskWithFailingLockAcquisition0);
+ testLockbox.add(taskWithFailingLockAcquisition1);
+ testLockbox.add(taskWithSuccessfulLockAcquisition);
+
+ testLockbox.tryLock(taskWithFailingLockAcquisition0,
+ new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+
taskWithFailingLockAcquisition0,
+
Intervals.of("2017-07-01/2017-08-01"),
+ null
+ )
+ );
+
+ testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
+ new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+
taskWithSuccessfulLockAcquisition,
+
Intervals.of("2017-07-01/2017-08-01"),
+ null
+ )
+ );
+
+ Assert.assertEquals(3, taskStorage.getActiveTasks().size());
+
+ // The tasks must be marked for failure
+ TaskLockboxSyncResult result = testLockbox.syncFromStorage();
+ Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0,
taskWithFailingLockAcquisition1),
+ result.getTasksToFail());
+ }
+
private Set<TaskLock> getAllLocks(List<Task> tasks)
{
return tasks.stream()
@@ -1383,4 +1425,25 @@ public class TaskLockboxTest
return TaskStatus.failure("how?", "Dummy task status err msg");
}
}
+
+ /**
+ * Extends TaskLockbox to return a null TaskLockPosse when the task's group
name contains "FailingLockAcquisition".
+ */
+ private static class NullLockPosseTaskLockbox extends TaskLockbox
+ {
+ public NullLockPosseTaskLockbox(
+ TaskStorage taskStorage,
+ IndexerMetadataStorageCoordinator metadataStorageCoordinator
+ )
+ {
+ super(taskStorage, metadataStorageCoordinator);
+ }
+
+ @Override
+ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock
taskLock)
+ {
+ return task.getGroupId()
+ .contains("FailingLockAcquisition") ? null :
super.verifyAndCreateOrFindLockPosse(task, taskLock);
+ }
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 02a71f8560..6fa7252df0 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -38,12 +38,15 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskRunner;
@@ -59,6 +62,8 @@ import
org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -82,6 +87,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -103,11 +109,15 @@ public class OverlordTest
private CountDownLatch announcementLatch;
private DruidNode druidNode;
private OverlordResource overlordResource;
- private CountDownLatch[] taskCompletionCountDownLatches;
- private CountDownLatch[] runTaskCountDownLatches;
+ private Map<String, CountDownLatch> taskCompletionCountDownLatches;
+ private Map<String, CountDownLatch> runTaskCountDownLatches;
private HttpServletRequest req;
private SupervisorManager supervisorManager;
+ // Bad task's id must be lexicographically greater than the good task's
+ private final String goodTaskId = "aaa";
+ private final String badTaskId = "zzz";
+
private void setupServerAndCurator() throws Exception
{
server = new TestingServer();
@@ -140,38 +150,52 @@ public class OverlordTest
req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
supervisorManager = EasyMock.createMock(SupervisorManager.class);
- taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
- taskLockbox.syncFromStorage();
- EasyMock.expectLastCall().atLeastOnce();
- taskLockbox.add(EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- taskLockbox.remove(EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
-
- // for second Noop Task directly added to deep storage.
- taskLockbox.add(EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
- taskLockbox.remove(EasyMock.anyObject());
- EasyMock.expectLastCall().atLeastOnce();
taskActionClientFactory =
EasyMock.createStrictMock(TaskActionClientFactory.class);
EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject()))
.andReturn(null).anyTimes();
- EasyMock.replay(taskLockbox, taskActionClientFactory, req);
+ EasyMock.replay(taskActionClientFactory, req);
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
- runTaskCountDownLatches = new CountDownLatch[2];
- runTaskCountDownLatches[0] = new CountDownLatch(1);
- runTaskCountDownLatches[1] = new CountDownLatch(1);
- taskCompletionCountDownLatches = new CountDownLatch[2];
- taskCompletionCountDownLatches[0] = new CountDownLatch(1);
- taskCompletionCountDownLatches[1] = new CountDownLatch(1);
+
+ IndexerMetadataStorageCoordinator mdc = new
TestIndexerMetadataStorageCoordinator();
+
+ taskLockbox = new TaskLockbox(taskStorage, mdc);
+
+ runTaskCountDownLatches = new HashMap<>();
+ runTaskCountDownLatches.put("0", new CountDownLatch(1));
+ runTaskCountDownLatches.put("1", new CountDownLatch(1));
+ taskCompletionCountDownLatches = new HashMap<>();
+ taskCompletionCountDownLatches.put("0", new CountDownLatch(1));
+ taskCompletionCountDownLatches.put("1", new CountDownLatch(1));
announcementLatch = new CountDownLatch(1);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
ServiceEmitter serviceEmitter = new NoopServiceEmitter();
+
+ // Add two tasks with conflicting locks
+ // The bad task (The one with a lexicographically larger name) must be
failed
+ Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0,
null, null, null);
+ TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource",
Intervals.ETERNITY, "version1", 50);
+ Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0,
null, null, null);
+ TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource",
Intervals.ETERNITY, "version0", 50);
+ taskStorage.insert(goodTask, TaskStatus.running(goodTaskId));
+ taskStorage.insert(badTask, TaskStatus.running(badTaskId));
+ taskStorage.addLock(badTaskId, badLock);
+ taskStorage.addLock(goodTaskId, goodLock);
+ runTaskCountDownLatches.put(badTaskId, new CountDownLatch(1));
+ runTaskCountDownLatches.put(goodTaskId, new CountDownLatch(1));
+ taskCompletionCountDownLatches.put(badTaskId, new CountDownLatch(1));
+ taskCompletionCountDownLatches.put(goodTaskId, new CountDownLatch(1));
+
+ TaskRunnerFactory taskRunnerFactory = (TaskRunnerFactory<MockTaskRunner>)
() ->
+ new MockTaskRunner(runTaskCountDownLatches,
taskCompletionCountDownLatches);
+
+ taskRunnerFactory.build().run(badTask);
+ taskRunnerFactory.build().run(goodTask);
+
taskMaster = new TaskMaster(
new TaskLockConfig(),
new TaskQueueConfig(null, new Period(1), null, new Period(10)),
@@ -180,14 +204,7 @@ public class OverlordTest
taskStorage,
taskActionClientFactory,
druidNode,
- new TaskRunnerFactory<MockTaskRunner>()
- {
- @Override
- public MockTaskRunner build()
- {
- return new MockTaskRunner(runTaskCountDownLatches,
taskCompletionCountDownLatches);
- }
- },
+ taskRunnerFactory,
new NoopServiceAnnouncer()
{
@Override
@@ -235,6 +252,13 @@ public class OverlordTest
Response response = overlordResource.getLeader();
Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
+ // BadTask must fail due to null task lock
+ waitForTaskStatus(badTaskId, TaskState.FAILED);
+
+ // GoodTask must successfully run
+ taskCompletionCountDownLatches.get(goodTaskId).countDown();
+ waitForTaskStatus(goodTaskId, TaskState.SUCCESS);
+
final String taskId_0 = "0";
NoopTask task_0 = NoopTask.create(taskId_0, 0);
response = overlordResource.taskPost(task_0, req);
@@ -262,7 +286,7 @@ public class OverlordTest
);
// Simulate completion of task_0
- taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown();
+ taskCompletionCountDownLatches.get(taskId_0).countDown();
// Wait for taskQueue to handle success status of task_0
waitForTaskStatus(taskId_0, TaskState.SUCCESS);
@@ -272,7 +296,7 @@ public class OverlordTest
NoopTask task_1 = NoopTask.create(taskId_1, 0);
taskStorage.insert(task_1, TaskStatus.running(taskId_1));
// Wait for task runner to run task_1
- runTaskCountDownLatches[Integer.parseInt(taskId_1)].await();
+ runTaskCountDownLatches.get(taskId_1).await();
response = overlordResource.getRunningTasks(null, req);
// 1 task that was manually inserted should be in running state
@@ -283,19 +307,20 @@ public class OverlordTest
Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
// Simulate completion of task_1
- taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown();
+ taskCompletionCountDownLatches.get(taskId_1).countDown();
// Wait for taskQueue to handle success status of task_1
waitForTaskStatus(taskId_1, TaskState.SUCCESS);
// should return number of tasks which are not in running state
response = overlordResource.getCompleteTasks(null, req);
- Assert.assertEquals(2, (((List) response.getEntity()).size()));
+ Assert.assertEquals(4, (((List) response.getEntity()).size()));
response = overlordResource.getCompleteTasks(1, req);
Assert.assertEquals(1, (((List) response.getEntity()).size()));
+
taskMaster.stop();
Assert.assertFalse(taskMaster.isLeader());
- EasyMock.verify(taskLockbox, taskActionClientFactory);
+ EasyMock.verify(taskActionClientFactory);
}
/* Wait until the task with given taskId has the given Task Status
@@ -321,12 +346,12 @@ public class OverlordTest
public static class MockTaskRunner implements TaskRunner
{
- private CountDownLatch[] completionLatches;
- private CountDownLatch[] runLatches;
+ private Map<String, CountDownLatch> completionLatches;
+ private Map<String, CountDownLatch> runLatches;
private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
private List<String> runningTasks;
- public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[]
completionLatches)
+ public MockTaskRunner(Map<String, CountDownLatch> runLatches, Map<String,
CountDownLatch> completionLatches)
{
this.runLatches = runLatches;
this.completionLatches = completionLatches;
@@ -380,11 +405,11 @@ public class OverlordTest
// this is equivalent of getting process holder to run task in
ForkingTaskRunner
runningTasks.add(taskId);
if (runLatches != null) {
- runLatches[Integer.parseInt(taskId)].countDown();
+ runLatches.get(taskId).countDown();
}
// Wait for completion count down
if (completionLatches != null) {
- completionLatches[Integer.parseInt(taskId)].await();
+ completionLatches.get(taskId).await();
}
taskRunnerWorkItems.remove(taskId);
runningTasks.remove(taskId);
@@ -420,6 +445,7 @@ public class OverlordTest
@Override
public void shutdown(String taskid, String reason)
{
+ runningTasks.remove(taskid);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]