This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 24.0.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/24.0.1 by this push:
     new 4ffdea9014 Fix Overlord leader election when task lock re-acquisition 
fails (#13236)
4ffdea9014 is described below

commit 4ffdea901436bd976663022f30ac0413ecbd021a
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 18 21:10:15 2022 +0530

    Fix Overlord leader election when task lock re-acquisition fails (#13236)
    
    Overlord leader election can sometimes fail due to task lock re-acquisition 
issues.
    This commit solves the issue by failing such tasks and clearing all their 
locks.
---
 .../druid/indexing/overlord/TaskLockbox.java       |  43 +++++++--
 .../indexing/overlord/TaskLockboxSyncResult.java   |  46 +++++++++
 .../apache/druid/indexing/overlord/TaskMaster.java |   1 -
 .../apache/druid/indexing/overlord/TaskQueue.java  |  15 +++
 .../druid/indexing/overlord/TaskLockboxTest.java   |  63 ++++++++++++
 .../druid/indexing/overlord/http/OverlordTest.java | 106 +++++++++++++--------
 6 files changed, 225 insertions(+), 49 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index a53af64591..41caf0620f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -108,17 +108,19 @@ public class TaskLockbox
 
   /**
    * Wipe out our current in-memory state and resync it from our bundled 
{@link TaskStorage}.
+   *
+   * @return SyncResult which needs to be processed by the caller
    */
-  public void syncFromStorage()
+  public TaskLockboxSyncResult syncFromStorage()
   {
     giant.lock();
 
     try {
       // Load stuff from taskStorage first. If this fails, we don't want to 
lose all our locks.
-      final Set<String> storedActiveTasks = new HashSet<>();
+      final Set<Task> storedActiveTasks = new HashSet<>();
       final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>();
       for (final Task task : taskStorage.getActiveTasks()) {
-        storedActiveTasks.add(task.getId());
+        storedActiveTasks.add(task);
         for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
           storedLocks.add(Pair.of(task, taskLock));
         }
@@ -138,7 +140,12 @@ public class TaskLockbox
       };
       running.clear();
       activeTasks.clear();
-      activeTasks.addAll(storedActiveTasks);
+      activeTasks.addAll(storedActiveTasks.stream()
+                                          .map(Task::getId)
+                                          .collect(Collectors.toSet())
+      );
+      // Set of task groups in which at least one task failed to re-acquire a 
lock
+      final Set<String> failedToReacquireLockTaskGroups = new HashSet<>();
       // Bookkeeping for a log message at the end
       int taskLockCount = 0;
       for (final Pair<Task, TaskLock> taskAndLock : 
byVersionOrdering.sortedCopy(storedLocks)) {
@@ -183,20 +190,39 @@ public class TaskLockbox
             );
           }
         } else {
-          throw new ISE(
-              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s",
+          failedToReacquireLockTaskGroups.add(task.getGroupId());
+          log.error(
+              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s from group %s.",
               savedTaskLockWithPriority.getInterval(),
               savedTaskLockWithPriority.getVersion(),
-              task.getId()
+              task.getId(),
+              task.getGroupId()
           );
+          continue;
         }
       }
+
+      Set<Task> tasksToFail = new HashSet<>();
+      for (Task task : storedActiveTasks) {
+        if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
+          tasksToFail.add(task);
+          activeTasks.remove(task.getId());
+        }
+      }
+
       log.info(
           "Synced %,d locks for %,d activeTasks from storage (%,d locks 
ignored).",
           taskLockCount,
           activeTasks.size(),
           storedLocks.size() - taskLockCount
       );
+
+      if (!failedToReacquireLockTaskGroups.isEmpty()) {
+        log.warn("Marking all tasks from task groups[%s] to be failed "
+                 + "as they failed to reacquire at least one lock.", 
failedToReacquireLockTaskGroups);
+      }
+
+      return new TaskLockboxSyncResult(tasksToFail);
     }
     finally {
       giant.unlock();
@@ -207,7 +233,8 @@ public class TaskLockbox
    * This method is called only in {@link #syncFromStorage()} and verifies the 
given task and the taskLock have the same
    * groupId, dataSource, and priority.
    */
-  private TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
+  @VisibleForTesting
+  protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
   {
     giant.lock();
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
new file mode 100644
index 0000000000..b7273b6bde
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import org.apache.druid.indexing.common.task.Task;
+
+import java.util.Set;
+
+/**
+ * Result of TaskLockbox#syncFromStorage()
+ * Contains tasks which need to be forcefully failed to let the overlord 
become the leader
+ */
+class TaskLockboxSyncResult
+{
+  private final Set<Task> tasksToFail;
+
+  TaskLockboxSyncResult(Set<Task> tasksToFail)
+  {
+    this.tasksToFail = tasksToFail;
+  }
+
+  /**
+   * Return set of tasks which need to be forcefully failed due to lock 
re-acquisition failure
+   */
+  Set<Task> getTasksToFail()
+  {
+    return tasksToFail;
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index b52a3c5c03..7b9101cf1f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -113,7 +113,6 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
         log.info("By the power of Grayskull, I have the power!");
 
         try {
-          taskLockbox.syncFromStorage();
           taskRunner = runnerFactory.build();
           taskQueue = new TaskQueue(
               taskLockConfig,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 07f136c723..c508876f0c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -34,6 +34,7 @@ import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
+import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
@@ -173,6 +174,13 @@ public class TaskQueue
       Preconditions.checkState(!active, "queue must be stopped");
       active = true;
       syncFromStorage();
+      // Mark these tasks as failed as they could not reacuire the lock
+      // Clean up needs to happen after tasks have been synced from storage
+      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+      for (Task task : tasksToFail) {
+        shutdown(task.getId(),
+                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+      }
       managerExec.submit(
           new Runnable()
           {
@@ -228,6 +236,13 @@ public class TaskQueue
           }
       );
       requestManagement();
+      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
+      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
+      for (Task task : tasksToFail) {
+        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+          taskStorage.removeLock(task.getId(), lock);
+        }
+      }
     }
     finally {
       giant.unlock();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 1da1ae18f8..834bdf9429 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
@@ -1258,6 +1259,47 @@ public class TaskLockboxTest
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    // Tasks to be failed have a group id with the substring 
"FailingLockAcquisition"
+    // Please refer to NullLockPosseTaskLockbox
+    final Task taskWithFailingLockAcquisition0 = 
NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithFailingLockAcquisition1 = 
NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
+    taskStorage.insert(taskWithFailingLockAcquisition0, 
TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
+    taskStorage.insert(taskWithFailingLockAcquisition1, 
TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
+    taskStorage.insert(taskWithSuccessfulLockAcquisition, 
TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
+
+    TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, 
metadataStorageCoordinator);
+    testLockbox.add(taskWithFailingLockAcquisition0);
+    testLockbox.add(taskWithFailingLockAcquisition1);
+    testLockbox.add(taskWithSuccessfulLockAcquisition);
+
+    testLockbox.tryLock(taskWithFailingLockAcquisition0,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 
taskWithFailingLockAcquisition0,
+                                                 
Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 
taskWithSuccessfulLockAcquisition,
+                                                 
Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    Assert.assertEquals(3, taskStorage.getActiveTasks().size());
+
+    // The tasks must be marked for failure
+    TaskLockboxSyncResult result = testLockbox.syncFromStorage();
+    Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, 
taskWithFailingLockAcquisition1),
+                        result.getTasksToFail());
+  }
+
   private Set<TaskLock> getAllLocks(List<Task> tasks)
   {
     return tasks.stream()
@@ -1383,4 +1425,25 @@ public class TaskLockboxTest
       return TaskStatus.failure("how?", "Dummy task status err msg");
     }
   }
+
+  /**
+   * Extends TaskLockbox to return a null TaskLockPosse when the task's group 
name contains "FailingLockAcquisition".
+   */
+  private static class NullLockPosseTaskLockbox extends TaskLockbox
+  {
+    public NullLockPosseTaskLockbox(
+        TaskStorage taskStorage,
+        IndexerMetadataStorageCoordinator metadataStorageCoordinator
+    )
+    {
+      super(taskStorage, metadataStorageCoordinator);
+    }
+
+    @Override
+    protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
+    {
+      return task.getGroupId()
+                 .contains("FailingLockAcquisition") ? null : 
super.verifyAndCreateOrFindLockPosse(task, taskLock);
+    }
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 02a71f8560..6fa7252df0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -38,12 +38,15 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskRunner;
@@ -59,6 +62,8 @@ import 
org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
+import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -82,6 +87,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -103,11 +109,15 @@ public class OverlordTest
   private CountDownLatch announcementLatch;
   private DruidNode druidNode;
   private OverlordResource overlordResource;
-  private CountDownLatch[] taskCompletionCountDownLatches;
-  private CountDownLatch[] runTaskCountDownLatches;
+  private Map<String, CountDownLatch> taskCompletionCountDownLatches;
+  private Map<String, CountDownLatch> runTaskCountDownLatches;
   private HttpServletRequest req;
   private SupervisorManager supervisorManager;
 
+  // Bad task's id must be lexicographically greater than the good task's
+  private final String goodTaskId = "aaa";
+  private final String badTaskId = "zzz";
+
   private void setupServerAndCurator() throws Exception
   {
     server = new TestingServer();
@@ -140,38 +150,52 @@ public class OverlordTest
     req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
     EasyMock.expectLastCall().anyTimes();
     supervisorManager = EasyMock.createMock(SupervisorManager.class);
-    taskLockbox = EasyMock.createStrictMock(TaskLockbox.class);
-    taskLockbox.syncFromStorage();
-    EasyMock.expectLastCall().atLeastOnce();
-    taskLockbox.add(EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    taskLockbox.remove(EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-
-    // for second Noop Task directly added to deep storage.
-    taskLockbox.add(EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
-    taskLockbox.remove(EasyMock.anyObject());
-    EasyMock.expectLastCall().atLeastOnce();
 
     taskActionClientFactory = 
EasyMock.createStrictMock(TaskActionClientFactory.class);
     EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject()))
             .andReturn(null).anyTimes();
-    EasyMock.replay(taskLockbox, taskActionClientFactory, req);
+    EasyMock.replay(taskActionClientFactory, req);
 
     taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
-    runTaskCountDownLatches = new CountDownLatch[2];
-    runTaskCountDownLatches[0] = new CountDownLatch(1);
-    runTaskCountDownLatches[1] = new CountDownLatch(1);
-    taskCompletionCountDownLatches = new CountDownLatch[2];
-    taskCompletionCountDownLatches[0] = new CountDownLatch(1);
-    taskCompletionCountDownLatches[1] = new CountDownLatch(1);
+
+    IndexerMetadataStorageCoordinator mdc = new 
TestIndexerMetadataStorageCoordinator();
+
+    taskLockbox = new TaskLockbox(taskStorage, mdc);
+
+    runTaskCountDownLatches = new HashMap<>();
+    runTaskCountDownLatches.put("0", new CountDownLatch(1));
+    runTaskCountDownLatches.put("1", new CountDownLatch(1));
+    taskCompletionCountDownLatches = new HashMap<>();
+    taskCompletionCountDownLatches.put("0", new CountDownLatch(1));
+    taskCompletionCountDownLatches.put("1", new CountDownLatch(1));
     announcementLatch = new CountDownLatch(1);
     setupServerAndCurator();
     curator.start();
     curator.blockUntilConnected();
     druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
     ServiceEmitter serviceEmitter = new NoopServiceEmitter();
+
+    // Add two tasks with conflicting locks
+    // The bad task (The one with a lexicographically larger name) must be 
failed
+    Task badTask = new NoopTask(badTaskId, badTaskId, "datasource", 10_000, 0, 
null, null, null);
+    TaskLock badLock = new TimeChunkLock(null, badTaskId, "datasource", 
Intervals.ETERNITY, "version1", 50);
+    Task goodTask = new NoopTask(goodTaskId, goodTaskId, "datasource", 0, 0, 
null, null, null);
+    TaskLock goodLock = new TimeChunkLock(null, goodTaskId, "datasource", 
Intervals.ETERNITY, "version0", 50);
+    taskStorage.insert(goodTask, TaskStatus.running(goodTaskId));
+    taskStorage.insert(badTask, TaskStatus.running(badTaskId));
+    taskStorage.addLock(badTaskId, badLock);
+    taskStorage.addLock(goodTaskId, goodLock);
+    runTaskCountDownLatches.put(badTaskId, new CountDownLatch(1));
+    runTaskCountDownLatches.put(goodTaskId, new CountDownLatch(1));
+    taskCompletionCountDownLatches.put(badTaskId, new CountDownLatch(1));
+    taskCompletionCountDownLatches.put(goodTaskId, new CountDownLatch(1));
+
+    TaskRunnerFactory taskRunnerFactory = (TaskRunnerFactory<MockTaskRunner>) 
() ->
+        new MockTaskRunner(runTaskCountDownLatches, 
taskCompletionCountDownLatches);
+
+    taskRunnerFactory.build().run(badTask);
+    taskRunnerFactory.build().run(goodTask);
+
     taskMaster = new TaskMaster(
         new TaskLockConfig(),
         new TaskQueueConfig(null, new Period(1), null, new Period(10)),
@@ -180,14 +204,7 @@ public class OverlordTest
         taskStorage,
         taskActionClientFactory,
         druidNode,
-        new TaskRunnerFactory<MockTaskRunner>()
-        {
-          @Override
-          public MockTaskRunner build()
-          {
-            return new MockTaskRunner(runTaskCountDownLatches, 
taskCompletionCountDownLatches);
-          }
-        },
+        taskRunnerFactory,
         new NoopServiceAnnouncer()
         {
           @Override
@@ -235,6 +252,13 @@ public class OverlordTest
     Response response = overlordResource.getLeader();
     Assert.assertEquals(druidNode.getHostAndPort(), response.getEntity());
 
+    // BadTask must fail due to null task lock
+    waitForTaskStatus(badTaskId, TaskState.FAILED);
+
+    // GoodTask must successfully run
+    taskCompletionCountDownLatches.get(goodTaskId).countDown();
+    waitForTaskStatus(goodTaskId, TaskState.SUCCESS);
+
     final String taskId_0 = "0";
     NoopTask task_0 = NoopTask.create(taskId_0, 0);
     response = overlordResource.taskPost(task_0, req);
@@ -262,7 +286,7 @@ public class OverlordTest
     );
 
     // Simulate completion of task_0
-    taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown();
+    taskCompletionCountDownLatches.get(taskId_0).countDown();
     // Wait for taskQueue to handle success status of task_0
     waitForTaskStatus(taskId_0, TaskState.SUCCESS);
 
@@ -272,7 +296,7 @@ public class OverlordTest
     NoopTask task_1 = NoopTask.create(taskId_1, 0);
     taskStorage.insert(task_1, TaskStatus.running(taskId_1));
     // Wait for task runner to run task_1
-    runTaskCountDownLatches[Integer.parseInt(taskId_1)].await();
+    runTaskCountDownLatches.get(taskId_1).await();
 
     response = overlordResource.getRunningTasks(null, req);
     // 1 task that was manually inserted should be in running state
@@ -283,19 +307,20 @@ public class OverlordTest
     Assert.assertEquals(TASK_LOCATION, taskResponseObject.getLocation());
 
     // Simulate completion of task_1
-    taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown();
+    taskCompletionCountDownLatches.get(taskId_1).countDown();
     // Wait for taskQueue to handle success status of task_1
     waitForTaskStatus(taskId_1, TaskState.SUCCESS);
 
     // should return number of tasks which are not in running state
     response = overlordResource.getCompleteTasks(null, req);
-    Assert.assertEquals(2, (((List) response.getEntity()).size()));
+    Assert.assertEquals(4, (((List) response.getEntity()).size()));
 
     response = overlordResource.getCompleteTasks(1, req);
     Assert.assertEquals(1, (((List) response.getEntity()).size()));
+
     taskMaster.stop();
     Assert.assertFalse(taskMaster.isLeader());
-    EasyMock.verify(taskLockbox, taskActionClientFactory);
+    EasyMock.verify(taskActionClientFactory);
   }
 
   /* Wait until the task with given taskId has the given Task Status
@@ -321,12 +346,12 @@ public class OverlordTest
 
   public static class MockTaskRunner implements TaskRunner
   {
-    private CountDownLatch[] completionLatches;
-    private CountDownLatch[] runLatches;
+    private Map<String, CountDownLatch> completionLatches;
+    private Map<String, CountDownLatch> runLatches;
     private ConcurrentHashMap<String, TaskRunnerWorkItem> taskRunnerWorkItems;
     private List<String> runningTasks;
 
-    public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] 
completionLatches)
+    public MockTaskRunner(Map<String, CountDownLatch> runLatches, Map<String, 
CountDownLatch> completionLatches)
     {
       this.runLatches = runLatches;
       this.completionLatches = completionLatches;
@@ -380,11 +405,11 @@ public class OverlordTest
               // this is equivalent of getting process holder to run task in 
ForkingTaskRunner
               runningTasks.add(taskId);
               if (runLatches != null) {
-                runLatches[Integer.parseInt(taskId)].countDown();
+                runLatches.get(taskId).countDown();
               }
               // Wait for completion count down
               if (completionLatches != null) {
-                completionLatches[Integer.parseInt(taskId)].await();
+                completionLatches.get(taskId).await();
               }
               taskRunnerWorkItems.remove(taskId);
               runningTasks.remove(taskId);
@@ -420,6 +445,7 @@ public class OverlordTest
     @Override
     public void shutdown(String taskid, String reason)
     {
+      runningTasks.remove(taskid);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to