This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e2b7289dea4 Try to fetch the task status for an active from memory
(#15724)
e2b7289dea4 is described below
commit e2b7289dea4a1c9ec45af07091644038cb939484
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Feb 26 13:53:05 2024 +0530
Try to fetch the task status for an active from memory (#15724)
* Reduce metadata calls to fetch the status for an active task
---
.../kafka/supervisor/KafkaSupervisorTest.java | 19 ++++++
.../kinesis/supervisor/KinesisSupervisorTest.java | 16 +++++
.../client/IndexerWorkerManagerClient.java | 11 ++--
.../apache/druid/indexing/overlord/TaskQueue.java | 13 ++++
.../indexing/overlord/http/OverlordResource.java | 8 ++-
.../supervisor/SeekableStreamSupervisor.java | 18 ++---
.../druid/indexing/overlord/TaskQueueTest.java | 60 +++++++++++++++++
.../overlord/http/OverlordResourceTest.java | 76 ++++++++++++++++++++++
.../SeekableStreamSupervisorStateTest.java | 3 +
.../rpc/indexing/SpecificTaskServiceLocator.java | 30 +++++----
.../indexing/SpecificTaskServiceLocatorTest.java | 56 +++++++---------
11 files changed, 246 insertions(+), 64 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 21264a9e396..e4b4b2f9c94 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1445,6 +1445,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@@ -1475,6 +1476,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@@ -2561,6 +2563,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@@ -2603,6 +2606,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@@ -2647,6 +2651,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
@@ -2689,6 +2694,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.READING))
.anyTimes();
@@ -2809,6 +2815,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
@@ -2850,6 +2859,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.reset(taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0,
15L, 1, 25L, 2, 30L)));
EasyMock.expect(taskClient.setEndOffsetsAsync("id2",
singlePartitionMap(topic, 0, 15L, 1, 25L, 2, 30L), true))
@@ -2871,6 +2883,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
replayAll();
@@ -2896,6 +2909,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
replayAll();
@@ -2952,6 +2966,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
replayAll();
@@ -3531,6 +3546,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
@@ -3589,6 +3607,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
replayAll();
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 87297dd8f28..a2f3e98deca 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -1496,6 +1496,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(null)
@@ -1524,6 +1525,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
@@ -1654,6 +1656,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(task)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
@@ -2200,6 +2203,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
@@ -2245,6 +2249,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getTask(task.getId())).andReturn(Optional.of(task)).anyTimes();
}
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
.anyTimes();
@@ -2308,6 +2313,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
@@ -2330,6 +2336,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.reset(taskStorage, taskRunner, taskClient, taskQueue);
+
EasyMock.expect(taskRunner.getTaskLocation(EasyMock.anyString())).andReturn(location).anyTimes();
+
TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(
SHARD_ID1,
@@ -2529,6 +2537,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1,
id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
@@ -2580,6 +2590,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.reset(taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture(ImmutableMap.of(
SHARD_ID1,
@@ -3583,6 +3596,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(TaskLocation.unknown()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1,
id2, id3)).anyTimes();
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
index 2940fc4f9c3..1894336043d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java
@@ -19,7 +19,7 @@
package org.apache.druid.msq.indexing.client;
-import org.apache.druid.client.indexing.TaskStatusResponse;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -65,10 +65,13 @@ public class IndexerWorkerManagerClient implements
WorkerManagerClient
@Override
public TaskLocation location(String workerId)
{
- final TaskStatusResponse response =
FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
+ final TaskStatus response = FutureUtils.getUnchecked(
+ overlordClient.taskStatuses(ImmutableSet.of(workerId)),
+ true
+ ).get(workerId);
- if (response.getStatus() != null) {
- return response.getStatus().getLocation();
+ if (response != null) {
+ return response.getLocation();
} else {
return TaskLocation.unknown();
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 3a06af69d1e..830ae9b732b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -32,6 +32,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
@@ -671,6 +672,8 @@ public class TaskQueue
// Save status to metadata store first, so if we crash while doing the
rest of the shutdown, our successor
// remembers that this task has completed.
try {
+ //The code block is only called when a task completes,
+ //and we need to check to make sure the metadata store has the correct
status stored.
final Optional<TaskStatus> previousStatus =
taskStorage.getStatus(task.getId());
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete
task").addData("task", task.getId()).emit();
@@ -945,6 +948,16 @@ public class TaskQueue
}
}
+ public Optional<TaskStatus> getTaskStatus(final String taskId)
+ {
+ RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
+ if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
+ return
Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
+ } else {
+ return taskStorage.getStatus(taskId);
+ }
+ }
+
public CoordinatorRunStats getQueueStats()
{
final int queuedUpdates = statusUpdatesInQueue.get();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index b529aa45854..8416b2e0968 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -469,9 +469,15 @@ public class OverlordResource
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds
provided.").build();
}
+ final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Map<String, TaskStatus> result =
Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
- Optional<TaskStatus> optional =
taskStorageQueryAdapter.getStatus(taskId);
+ final Optional<TaskStatus> optional;
+ if (taskQueue.isPresent()) {
+ optional = taskQueue.get().getTaskStatus(taskId);
+ } else {
+ optional = taskStorageQueryAdapter.getStatus(taskId);
+ }
if (optional.isPresent()) {
result.put(taskId, optional.get());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5e937fe69eb..62d3caa83d6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -27,11 +27,9 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -876,14 +874,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
Preconditions.checkNotNull(id, "id");
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
- Optional<? extends TaskRunnerWorkItem> item = Iterables.tryFind(
- taskRunner.get().getRunningTasks(),
- (Predicate<TaskRunnerWorkItem>) taskRunnerWorkItem ->
id.equals(taskRunnerWorkItem.getTaskId())
- );
-
- if (item.isPresent()) {
- return item.get().getLocation();
- }
+ return taskRunner.get().getTaskLocation(id);
} else {
log.error("Failed to get task runner because I'm not the leader!");
}
@@ -894,7 +885,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Override
public Optional<TaskStatus> getTaskStatus(String id)
{
- return taskStorage.getStatus(id);
+ final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getTaskStatus(id);
+ } else {
+ return taskStorage.getStatus(id);
+ }
}
};
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index a1a93e29cbf..8ca1ff49f05 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -460,6 +460,66 @@ public class TaskQueueTest extends IngestionTestBase
Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
}
+ @Test
+ public void testGetTaskStatus()
+ {
+ final String newTask = "newTask";
+ final String waitingTask = "waitingTask";
+ final String pendingTask = "pendingTask";
+ final String runningTask = "runningTask";
+ final String successfulTask = "successfulTask";
+ final String failedTask = "failedTask";
+
+ TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
+ EasyMock.expect(taskStorage.getStatus(newTask))
+ .andReturn(Optional.of(TaskStatus.running(newTask)));
+ EasyMock.expect(taskStorage.getStatus(successfulTask))
+ .andReturn(Optional.of(TaskStatus.success(successfulTask)));
+ EasyMock.expect(taskStorage.getStatus(failedTask))
+ .andReturn(Optional.of(TaskStatus.failure(failedTask,
failedTask)));
+ EasyMock.replay(taskStorage);
+
+ TaskRunner taskRunner = EasyMock.createMock(HttpRemoteTaskRunner.class);
+ EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
+ .andReturn(null);
+ EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
+ .andReturn(RunnerTaskState.WAITING);
+ EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
+ .andReturn(RunnerTaskState.PENDING);
+ EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
+ .andReturn(RunnerTaskState.RUNNING);
+ EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
+ .andReturn(RunnerTaskState.NONE);
+ EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
+ .andReturn(RunnerTaskState.NONE);
+ EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
+ .andReturn(TaskLocation.unknown());
+ EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
+ .andReturn(TaskLocation.unknown());
+ EasyMock.expect(taskRunner.getTaskLocation(runningTask))
+ .andReturn(TaskLocation.create("host", 8100, 8100));
+ EasyMock.replay(taskRunner);
+
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null, null),
+ new DefaultTaskConfig(),
+ taskStorage,
+ taskRunner,
+ createActionClientFactory(),
+ getLockbox(),
+ new StubServiceEmitter("druid/overlord", "testHost")
+ );
+ taskQueue.setActive(true);
+
+ Assert.assertEquals(TaskStatus.running(newTask),
taskQueue.getTaskStatus(newTask).get());
+ Assert.assertEquals(TaskStatus.running(waitingTask),
taskQueue.getTaskStatus(waitingTask).get());
+ Assert.assertEquals(TaskStatus.running(pendingTask),
taskQueue.getTaskStatus(pendingTask).get());
+ Assert.assertEquals(TaskStatus.running(runningTask),
taskQueue.getTaskStatus(runningTask).get());
+ Assert.assertEquals(TaskStatus.success(successfulTask),
taskQueue.getTaskStatus(successfulTask).get());
+ Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
+ }
+
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String>
runningTasks)
{
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 8c1b6765431..19eb1f8e7a2 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -1794,6 +1794,82 @@ public class OverlordResourceTest
Assert.assertEquals(expectedResourceActions, resourceActions);
}
+ @Test
+ public void testGetMultipleTaskStatuses_presentTaskQueue()
+ {
+ // Needed for teardown
+ EasyMock.replay(
+ authConfig,
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
+ EasyMock.expect(taskQueue.getTaskStatus("task"))
+ .andReturn(Optional.of(TaskStatus.running("task")));
+ EasyMock.replay(taskQueue);
+ TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+ EasyMock.replay(taskMaster);
+ OverlordResource overlordResource = new OverlordResource(
+ taskMaster,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ final Object response =
overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+ .getEntity();
+ Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")),
response);
+ }
+
+ @Test
+ public void testGetMultipleTaskStatuses_absentTaskQueue()
+ {
+ // Needed for teardown
+ EasyMock.replay(
+ authConfig,
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ TaskStorageQueryAdapter taskStorageQueryAdapter =
EasyMock.createMock(TaskStorageQueryAdapter.class);
+ EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
+ .andReturn(Optional.of(TaskStatus.running("task")));
+ EasyMock.replay(taskStorageQueryAdapter);
+ TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
+ EasyMock.replay(taskMaster);
+ OverlordResource overlordResource = new OverlordResource(
+ taskMaster,
+ taskStorageQueryAdapter,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+ final Object response =
overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+ .getEntity();
+ Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")),
response);
+ }
+
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 840b4e9f69a..ea083423eaa 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1101,6 +1101,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(location3).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
.andReturn(ImmutableList.of(id1, id2, id3))
.anyTimes();
diff --git
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
index 0276e768ba9..163c7e14e01 100644
---
a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
+++
b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java
@@ -19,16 +19,16 @@
package org.apache.druid.rpc.indexing;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
@@ -36,9 +36,11 @@ import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
/**
- * Service locator for a specific task. Uses the {@link
OverlordClient#taskStatus} API to locate tasks.
+ * Service locator for a specific task. Uses the {@link
OverlordClient#taskStatuses(Set)} API to locate tasks.
*
* This locator has an internal cache that is updated if the last check has
been over {@link #LOCATION_CACHE_MS} ago.
*
@@ -85,10 +87,10 @@ public class SpecificTaskServiceLocator implements
ServiceLocator
} else if (closed || lastKnownState != TaskState.RUNNING) {
return Futures.immediateFuture(ServiceLocations.closed());
} else if (lastKnownLocation == null || lastUpdateTime +
LOCATION_CACHE_MS < System.currentTimeMillis()) {
- final ListenableFuture<TaskStatusResponse> taskStatusFuture;
+ final ListenableFuture<Map<String, TaskStatus>> taskStatusFuture;
try {
- taskStatusFuture = overlordClient.taskStatus(taskId);
+ taskStatusFuture =
overlordClient.taskStatuses(ImmutableSet.of(taskId));
}
catch (Exception e) {
throw new RuntimeException(e);
@@ -110,31 +112,31 @@ public class SpecificTaskServiceLocator implements
ServiceLocator
Futures.addCallback(
taskStatusFuture,
- new FutureCallback<TaskStatusResponse>()
+ new FutureCallback<Map<String, TaskStatus>>()
{
@Override
- public void onSuccess(final TaskStatusResponse taskStatus)
+ public void onSuccess(final Map<String, TaskStatus>
taskStatusMap)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();
- final TaskStatusPlus statusPlus = taskStatus.getStatus();
+ final TaskStatus status = taskStatusMap.get(taskId);
- if (statusPlus == null) {
+ if (status == null) {
// If the task status is unknown, we'll treat it as
closed.
lastKnownState = null;
lastKnownLocation = null;
} else {
- lastKnownState = statusPlus.getStatusCode();
+ lastKnownState = status.getStatusCode();
- if
(TaskLocation.unknown().equals(statusPlus.getLocation())) {
+ if (TaskLocation.unknown().equals(status.getLocation()))
{
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
- statusPlus.getLocation().getHost(),
- statusPlus.getLocation().getPort(),
- statusPlus.getLocation().getTlsPort(),
+ status.getLocation().getHost(),
+ status.getLocation().getPort(),
+ status.getLocation().getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH,
StringUtils.urlEncode(taskId))
);
}
diff --git
a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
index 85ae025c734..4888078af5d 100644
---
a/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
+++
b/server/src/test/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocatorTest.java
@@ -22,11 +22,9 @@ package org.apache.druid.rpc.indexing;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
-import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocations;
@@ -43,6 +41,7 @@ import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
public class SpecificTaskServiceLocatorTest
@@ -61,8 +60,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_noLocationYet() throws Exception
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
- .thenReturn(makeResponse(TaskState.RUNNING,
TaskLocation.unknown()));
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+ .thenReturn(status(TaskState.RUNNING, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -72,8 +71,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskRunning() throws Exception
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
- .thenReturn(makeResponse(TaskState.RUNNING, TASK_LOCATION1));
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+ .thenReturn(status(TaskState.RUNNING, TASK_LOCATION1));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
Assert.assertEquals(ServiceLocations.forLocation(SERVICE_LOCATION1),
locator.locate().get());
@@ -82,8 +81,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskNotFound() throws Exception
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
- .thenReturn(Futures.immediateFuture(new TaskStatusResponse(TASK_ID,
null)));
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+
.thenReturn(Futures.immediateFuture(Collections.singletonMap(TASK_ID, null)));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -93,8 +92,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskSuccess() throws Exception
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
- .thenReturn(makeResponse(TaskState.SUCCESS,
TaskLocation.unknown()));
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+ .thenReturn(status(TaskState.SUCCESS, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -104,8 +103,8 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_taskFailed() throws Exception
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
- .thenReturn(makeResponse(TaskState.FAILED, TaskLocation.unknown()));
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
+ .thenReturn(status(TaskState.FAILED, TaskLocation.unknown()));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
final ListenableFuture<ServiceLocations> future = locator.locate();
@@ -115,7 +114,7 @@ public class SpecificTaskServiceLocatorTest
@Test
public void test_locate_overlordError()
{
- Mockito.when(overlordClient.taskStatus(TASK_ID))
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(Futures.immediateFailedFuture(new ISE("oh no")));
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
@@ -134,8 +133,8 @@ public class SpecificTaskServiceLocatorTest
public void test_locate_afterClose() throws Exception
{
// Overlord call will never return.
- final SettableFuture<TaskStatusResponse> overlordFuture =
SettableFuture.create();
- Mockito.when(overlordClient.taskStatus(TASK_ID))
+ final SettableFuture<Map<String, TaskStatus>> overlordFuture =
SettableFuture.create();
+ Mockito.when(overlordClient.taskStatuses(Collections.singleton(TASK_ID)))
.thenReturn(overlordFuture);
final SpecificTaskServiceLocator locator = new
SpecificTaskServiceLocator(TASK_ID, overlordClient);
@@ -147,26 +146,15 @@ public class SpecificTaskServiceLocatorTest
Assert.assertTrue(overlordFuture.isCancelled());
}
- private static ListenableFuture<TaskStatusResponse> makeResponse(final
TaskState state, final TaskLocation location)
+ private static ListenableFuture<Map<String, TaskStatus>> status(final
TaskState state, final TaskLocation location)
{
- final TaskStatusResponse response = new TaskStatusResponse(
+ final TaskStatus status = new TaskStatus(
TASK_ID,
- new TaskStatusPlus(
- TASK_ID,
- null,
- null,
- DateTimes.utc(0),
- DateTimes.utc(0),
- state,
- null,
- null,
- 1L,
- location,
- null,
- null
- )
+ state,
+ 1L,
+ null,
+ location
);
-
- return Futures.immediateFuture(response);
+ return Futures.immediateFuture(Collections.singletonMap(TASK_ID, status));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]