This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b7289dea4 Try to fetch the task status for an active from memory 
(#15724)
e2b7289dea4 is described below

commit e2b7289dea4a1c9ec45af07091644038cb939484
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Feb 26 13:53:05 2024 +0530

    Try to fetch the task status for an active from memory (#15724)
    
    * Reduce metadata calls to fetch the status for an active task
---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 19 ++++++
 .../kinesis/supervisor/KinesisSupervisorTest.java  | 16 +++++
 .../client/IndexerWorkerManagerClient.java         | 11 ++--
 .../apache/druid/indexing/overlord/TaskQueue.java  | 13 ++++
 .../indexing/overlord/http/OverlordResource.java   |  8 ++-
 .../supervisor/SeekableStreamSupervisor.java       | 18 ++---
 .../druid/indexing/overlord/TaskQueueTest.java     | 60 +++++++++++++++++
 .../overlord/http/OverlordResourceTest.java        | 76 ++++++++++++++++++++++
 .../SeekableStreamSupervisorStateTest.java         |  3 +
 .../rpc/indexing/SpecificTaskServiceLocator.java   | 30 +++++----
 .../indexing/SpecificTaskServiceLocatorTest.java   | 56 +++++++---------
 11 files changed, 246 insertions(+), 64 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 21264a9e396..e4b4b2f9c94 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1445,6 +1445,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
@@ -1475,6 +1476,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
     }
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             .andReturn(Futures.immediateFuture(Status.READING))
             .anyTimes();
@@ -2561,6 +2563,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
@@ -2603,6 +2606,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
     }
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             .andReturn(Futures.immediateFuture(Status.READING))
             .anyTimes();
@@ -2647,6 +2651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KafkaDataSourceMetadata(
@@ -2689,6 +2694,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
     }
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             .andReturn(Futures.immediateFuture(Status.READING))
             .anyTimes();
@@ -2809,6 +2815,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
             .andReturn(ImmutableList.of(id1, id2, id3))
             .anyTimes();
@@ -2850,6 +2859,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
     EasyMock.reset(taskRunner, taskClient, taskQueue);
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
     EasyMock.expect(taskClient.pauseAsync("id2"))
             .andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 
15L, 1, 25L, 2, 30L)));
     EasyMock.expect(taskClient.setEndOffsetsAsync("id2", 
singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L), true))
@@ -2871,6 +2883,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     replayAll();
@@ -2896,6 +2909,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     replayAll();
@@ -2952,6 +2966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     replayAll();
@@ -3531,6 +3546,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
             .andReturn(ImmutableList.of(id1, id2, id3))
             .anyTimes();
@@ -3589,6 +3607,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
     replayAll();
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 87297dd8f28..a2f3e98deca 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -1496,6 +1496,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KinesisDataSourceMetadata(null)
@@ -1524,6 +1525,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
       
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
     }
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
             .anyTimes();
@@ -1654,6 +1656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
     
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
@@ -2200,6 +2203,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KinesisDataSourceMetadata(
@@ -2245,6 +2249,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
       
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
     }
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
     EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
             
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
             .anyTimes();
@@ -2308,6 +2313,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
         new KinesisDataSourceMetadata(
@@ -2330,6 +2336,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);
 
+    
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
+
     TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
     checkpoints1.put(0, ImmutableMap.of(
         SHARD_ID1,
@@ -2529,6 +2537,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1,
 id2, id3)).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
@@ -2580,6 +2590,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
 
     EasyMock.reset(taskRunner, taskClient, taskQueue);
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
     EasyMock.expect(taskClient.pauseAsync("id2"))
             .andReturn(Futures.immediateFuture(ImmutableMap.of(
                 SHARD_ID1,
@@ -3583,6 +3596,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
     
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
     
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1,
 id2, id3)).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
     
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
index 2940fc4f9c3..1894336043d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.msq.indexing.client;
 
-import org.apache.druid.client.indexing.TaskStatusResponse;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -65,10 +65,13 @@ public class IndexerWorkerManagerClient implements 
WorkerManagerClient
   @Override
   public TaskLocation location(String workerId)
   {
-    final TaskStatusResponse response = 
FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
+    final TaskStatus response = FutureUtils.getUnchecked(
+        overlordClient.taskStatuses(ImmutableSet.of(workerId)),
+        true
+    ).get(workerId);
 
-    if (response.getStatus() != null) {
-      return response.getStatus().getLocation();
+    if (response != null) {
+      return response.getLocation();
     } else {
       return TaskLocation.unknown();
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 3a06af69d1e..830ae9b732b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.annotations.SuppressFBWarnings;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
@@ -671,6 +672,8 @@ public class TaskQueue
     // Save status to metadata store first, so if we crash while doing the 
rest of the shutdown, our successor
     // remembers that this task has completed.
     try {
+      //The code block is only called when a task completes,
+      //and we need to check to make sure the metadata store has the correct 
status stored.
       final Optional<TaskStatus> previousStatus = 
taskStorage.getStatus(task.getId());
       if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
         log.makeAlert("Ignoring notification for already-complete 
task").addData("task", task.getId()).emit();
@@ -945,6 +948,16 @@ public class TaskQueue
     }
   }
 
+  public Optional<TaskStatus> getTaskStatus(final String taskId)
+  {
+    RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
+    if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
+      return 
Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
+    } else {
+      return taskStorage.getStatus(taskId);
+    }
+  }
+
   public CoordinatorRunStats getQueueStats()
   {
     final int queuedUpdates = statusUpdatesInQueue.get();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index b529aa45854..8416b2e0968 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -469,9 +469,15 @@ public class OverlordResource
       return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds 
provided.").build();
     }
 
+    final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     Map<String, TaskStatus> result = 
Maps.newHashMapWithExpectedSize(taskIds.size());
     for (String taskId : taskIds) {
-      Optional<TaskStatus> optional = 
taskStorageQueryAdapter.getStatus(taskId);
+      final Optional<TaskStatus> optional;
+      if (taskQueue.isPresent()) {
+        optional = taskQueue.get().getTaskStatus(taskId);
+      } else {
+        optional = taskStorageQueryAdapter.getStatus(taskId);
+      }
       if (optional.isPresent()) {
         result.put(taskId, optional.get());
       }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5e937fe69eb..62d3caa83d6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -27,11 +27,9 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -876,14 +874,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         Preconditions.checkNotNull(id, "id");
         Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
         if (taskRunner.isPresent()) {
-          Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
-              taskRunner.get().getRunningTasks(),
-              (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem -> 
id.equals(taskRunnerWorkItem.getTaskId())
-          );
-
-          if (item.isPresent()) {
-            return item.get().getLocation();
-          }
+          return taskRunner.get().getTaskLocation(id);
         } else {
           log.error("Failed to get task runner because I'm not the leader!");
         }
@@ -894,7 +885,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       @Override
       public Optional<TaskStatus> getTaskStatus(String id)
       {
-        return taskStorage.getStatus(id);
+        final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+        if (taskQueue.isPresent()) {
+          return taskQueue.get().getTaskStatus(id);
+        } else {
+          return taskStorage.getStatus(id);
+        }
       }
     };
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index a1a93e29cbf..8ca1ff49f05 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -460,6 +460,66 @@ public class TaskQueueTest extends IngestionTestBase
     Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
   }
 
+  @Test
+  public void testGetTaskStatus()
+  {
+    final String newTask = "newTask";
+    final String waitingTask = "waitingTask";
+    final String pendingTask = "pendingTask";
+    final String runningTask = "runningTask";
+    final String successfulTask = "successfulTask";
+    final String failedTask = "failedTask";
+
+    TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
+    EasyMock.expect(taskStorage.getStatus(newTask))
+            .andReturn(Optional.of(TaskStatus.running(newTask)));
+    EasyMock.expect(taskStorage.getStatus(successfulTask))
+            .andReturn(Optional.of(TaskStatus.success(successfulTask)));
+    EasyMock.expect(taskStorage.getStatus(failedTask))
+            .andReturn(Optional.of(TaskStatus.failure(failedTask, 
failedTask)));
+    EasyMock.replay(taskStorage);
+
+    TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class);
+    EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
+            .andReturn(null);
+    EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
+            .andReturn(RunnerTaskState.WAITING);
+    EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
+            .andReturn(RunnerTaskState.PENDING);
+    EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
+            .andReturn(RunnerTaskState.RUNNING);
+    EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
+            .andReturn(RunnerTaskState.NONE);
+    EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
+            .andReturn(RunnerTaskState.NONE);
+    EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
+            .andReturn(TaskLocation.unknown());
+    EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
+            .andReturn(TaskLocation.unknown());
+    EasyMock.expect(taskRunner.getTaskLocation(runningTask))
+            .andReturn(TaskLocation.create("host", 8100, 8100));
+    EasyMock.replay(taskRunner);
+
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null, null),
+        new DefaultTaskConfig(),
+        taskStorage,
+        taskRunner,
+        createActionClientFactory(),
+        getLockbox(),
+        new StubServiceEmitter("druid/overlord", "testHost")
+    );
+    taskQueue.setActive(true);
+
+    Assert.assertEquals(TaskStatus.running(newTask), 
taskQueue.getTaskStatus(newTask).get());
+    Assert.assertEquals(TaskStatus.running(waitingTask), 
taskQueue.getTaskStatus(waitingTask).get());
+    Assert.assertEquals(TaskStatus.running(pendingTask), 
taskQueue.getTaskStatus(pendingTask).get());
+    Assert.assertEquals(TaskStatus.running(runningTask), 
taskQueue.getTaskStatus(runningTask).get());
+    Assert.assertEquals(TaskStatus.success(successfulTask), 
taskQueue.getTaskStatus(successfulTask).get());
+    Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), 
taskQueue.getTaskStatus(failedTask).get());
+  }
+
   private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> 
runningTasks)
   {
     HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new 
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 8c1b6765431..19eb1f8e7a2 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -1794,6 +1794,82 @@ public class OverlordResourceTest
     Assert.assertEquals(expectedResourceActions, resourceActions);
   }
 
+  @Test
+  public void testGetMultipleTaskStatuses_presentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
+    EasyMock.expect(taskQueue.getTaskStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskQueue);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+    EasyMock.replay(taskMaster);
+    OverlordResource overlordResource = new OverlordResource(
+        taskMaster,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    final Object response = 
overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+                                            .getEntity();
+    Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), 
response);
+  }
+
+  @Test
+  public void testGetMultipleTaskStatuses_absentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskStorageQueryAdapter taskStorageQueryAdapter = 
EasyMock.createMock(TaskStorageQueryAdapter.class);
+    EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskStorageQueryAdapter);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
+    EasyMock.replay(taskMaster);
+    OverlordResource overlordResource = new OverlordResource(
+        taskMaster,
+        taskStorageQueryAdapter,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    final Object response = 
overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+                                            .getEntity();
+    Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), 
response);
+  }
+
   private void expectAuthorizationTokenCheck()
   {
     expectAuthorizationTokenCheck(Users.DRUID);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 840b4e9f69a..ea083423eaa 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1101,6 +1101,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
 
     
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+    
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(location3).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
             .andReturn(ImmutableList.of(id1, id2, id3))
             .anyTimes();
diff --git 
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
 
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 0276e768ba9..163c7e14e01 100644
--- 
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++ 
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -19,16 +19,16 @@
 
 package org.apache.druid.rpc.indexing;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.rpc.ServiceLocation;
@@ -36,9 +36,11 @@ import org.apache.druid.rpc.ServiceLocations;
 import org.apache.druid.rpc.ServiceLocator;
 
 import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 
 /**
- * Service locator for a specific task. Uses the {@link 
OverlordClient#taskStatus} API to locate tasks.
+ * Service locator for a specific task. Uses the {@link 
OverlordClient#taskStatuses(Set)} API to locate tasks.
  *
  * This locator has an internal cache that is updated if the last check has 
been over {@link #LOCATION_CACHE_MS} ago.
  *
@@ -85,10 +87,10 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
       } else if (closed || lastKnownState != TaskState.RUNNING) {
         return Futures.immediateFuture(ServiceLocations.closed());
       } else if (lastKnownLocation == null || lastUpdateTime + 
LOCATION_CACHE_MS < System.currentTimeMillis()) {
-        final ListenableFuture<TaskStatusResponse> taskStatusFuture;
+        final ListenableFuture<Map<String, TaskStatus>> taskStatusFuture;
 
         try {
-          taskStatusFuture = overlordClient.taskStatus(taskId);
+          taskStatusFuture = 
overlordClient.taskStatuses(ImmutableSet.of(taskId));
         }
         catch (Exception e) {
           throw new RuntimeException(e);
@@ -110,31 +112,31 @@ public class SpecificTaskServiceLocator implements 
ServiceLocator
 
         Futures.addCallback(
             taskStatusFuture,
-            new FutureCallback<TaskStatusResponse>()
+            new FutureCallback<Map<String, TaskStatus>>()
             {
               @Override
-              public void onSuccess(final TaskStatusResponse taskStatus)
+              public void onSuccess(final Map<String, TaskStatus> 
taskStatusMap)
               {
                 synchronized (lock) {
                   if (pendingFuture != null) {
                     lastUpdateTime = System.currentTimeMillis();
 
-                    final TaskStatusPlus statusPlus = taskStatus.getStatus();
+                    final TaskStatus status = taskStatusMap.get(taskId);
 
-                    if (statusPlus == null) {
+                    if (status == null) {
                       // If the task status is unknown, we'll treat it as 
closed.
                       lastKnownState = null;
                       lastKnownLocation = null;
                     } else {
-                      lastKnownState = statusPlus.getStatusCode();
+                      lastKnownState = status.getStatusCode();
 
-                      if 
(TaskLocation.unknown().equals(statusPlus.getLocation())) {
+                      if (TaskLocation.unknown().equals(status.getLocation())) 
{
                         lastKnownLocation = null;
                       } else {
                         lastKnownLocation = new ServiceLocation(
-                            statusPlus.getLocation().getHost(),
-                            statusPlus.getLocation().getPort(),
-                            statusPlus.getLocation().getTlsPort(),
+                            status.getLocation().getHost(),
+                            status.getLocation().getPort(),
+                            status.getLocation().getTlsPort(),
                             StringUtils.format("%s/%s", BASE_PATH, 
StringUtils.urlEncode(taskId))
                         );
                       }
diff --git 
a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
 
b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
index 85ae025c734..4888078af5d 100644
--- 
a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
@@ -22,11 +22,9 @@ package org.apache.druid.rpc.indexing;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.rpc.ServiceLocation;
 import org.apache.druid.rpc.ServiceLocations;
@@ -43,6 +41,7 @@ import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public class SpecificTaskServiceLocatorTest
@@ -61,8 +60,8 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_noLocationYet() throws Exception
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
-           .thenReturn(makeResponse(TaskState.RUNNING, 
TaskLocation.unknown()));
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+           .thenReturn(status(TaskState.RUNNING, TaskLocation.unknown()));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
     final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -72,8 +71,8 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_taskRunning() throws Exception
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
-           .thenReturn(makeResponse(TaskState.RUNNING, TASK_LOCATION1));
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+           .thenReturn(status(TaskState.RUNNING, TASK_LOCATION1));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
     Assert.assertEquals(ServiceLocations.forLocation(SERVICE_LOCATION1), 
locator.locate().get());
@@ -82,8 +81,8 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_taskNotFound() throws Exception
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
-           .thenReturn(Futures.immediateFuture(new TaskStatusResponse(TASK_ID, 
null)));
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+           
.thenReturn(Futures.immediateFuture(Collections.singletonMap(TASK_ID, null)));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
     final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -93,8 +92,8 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_taskSuccess() throws Exception
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
-           .thenReturn(makeResponse(TaskState.SUCCESS, 
TaskLocation.unknown()));
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+           .thenReturn(status(TaskState.SUCCESS, TaskLocation.unknown()));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
     final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -104,8 +103,8 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_taskFailed() throws Exception
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
-           .thenReturn(makeResponse(TaskState.FAILED, TaskLocation.unknown()));
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+           .thenReturn(status(TaskState.FAILED, TaskLocation.unknown()));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
     final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -115,7 +114,7 @@ public class SpecificTaskServiceLocatorTest
   @Test
   public void test_locate_overlordError()
   {
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
            .thenReturn(Futures.immediateFailedFuture(new ISE("oh no")));
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
@@ -134,8 +133,8 @@ public class SpecificTaskServiceLocatorTest
   public void test_locate_afterClose() throws Exception
   {
     // Overlord call will never return.
-    final SettableFuture<TaskStatusResponse> overlordFuture = 
SettableFuture.create();
-    Mockito.when(overlordClient.taskStatus(TASK_ID))
+    final SettableFuture<Map<String, TaskStatus>> overlordFuture = 
SettableFuture.create();
+    Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
            .thenReturn(overlordFuture);
 
     final SpecificTaskServiceLocator locator = new 
SpecificTaskServiceLocator(TASK_ID, overlordClient);
@@ -147,26 +146,15 @@ public class SpecificTaskServiceLocatorTest
     Assert.assertTrue(overlordFuture.isCancelled());
   }
 
-  private static ListenableFuture<TaskStatusResponse> makeResponse(final 
TaskState state, final TaskLocation location)
+  private static ListenableFuture<Map<String, TaskStatus>> status(final 
TaskState state, final TaskLocation location)
   {
-    final TaskStatusResponse response = new TaskStatusResponse(
+    final TaskStatus status = new TaskStatus(
         TASK_ID,
-        new TaskStatusPlus(
-            TASK_ID,
-            null,
-            null,
-            DateTimes.utc(0),
-            DateTimes.utc(0),
-            state,
-            null,
-            null,
-            1L,
-            location,
-            null,
-            null
-        )
+        state,
+        1L,
+        null,
+        location
     );
-
-    return Futures.immediateFuture(response);
+    return Futures.immediateFuture(Collections.singletonMap(TASK_ID, status));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to