This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e1ff3ca289 Resume streaming tasks on Overlord switch (#13223)
e1ff3ca289 is described below
commit e1ff3ca28904913cb2cd3876f7ef232531b7c4ba
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sat Oct 29 09:38:49 2022 +0530
Resume streaming tasks on Overlord switch (#13223)
* Resume streaming tasks on Overlord switch
* Refactoring and better messages
* Better docs
* Add unit test
* Fix tests' setup
* Update
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Co-authored-by: Kashif Faraz <[email protected]>
* Update
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Co-authored-by: Kashif Faraz <[email protected]>
* Better logs
* Fix test again
Co-authored-by: Kashif Faraz <[email protected]>
---
.../kafka/supervisor/KafkaSupervisorTest.java | 138 +++++++++++++++++++++
.../kinesis/supervisor/KinesisSupervisorTest.java | 1 +
.../supervisor/SeekableStreamSupervisor.java | 52 ++++++++
3 files changed, 191 insertions(+)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 78b010b3b3..e5f60db005 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -214,6 +214,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner = createMock(TaskRunner.class);
indexerMetadataStorageCoordinator =
createMock(IndexerMetadataStorageCoordinator.class);
taskClient = createMock(KafkaIndexTaskClient.class);
+
EasyMock.expect(taskClient.resumeAsync(EasyMock.anyString())).andReturn(Futures.immediateFuture(true)).anyTimes();
taskQueue = createMock(TaskQueue.class);
topic = getTopic();
@@ -3921,6 +3922,143 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
+ @Test
+ public void testResumeAllActivelyReadingTasks() throws Exception
+ {
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
+ // Mock with task based setup for resumeAsync
+ EasyMock.reset(taskClient);
+ addSomeEvents(100);
+
+ KafkaIndexTask readingTask = createKafkaIndexTask("readingTask",
+ DATASOURCE,
+ 0,
+ new
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L),
Collections.emptySet()),
+ new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+ null,
+ null,
+
supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask publishingTask = createKafkaIndexTask("publishingTask",
+ DATASOURCE,
+ 1,
+ new
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L),
Collections.emptySet()),
+ new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+ null,
+ null,
+
supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask pausedTask = createKafkaIndexTask("pausedTask",
+ DATASOURCE,
+ 1,
+ new
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(1, 0L),
Collections.emptySet()),
+ new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, Long.MAX_VALUE)),
+ null,
+ null,
+
supervisor.getTuningConfig()
+ );
+
+ KafkaIndexTask failsToResumePausedTask =
createKafkaIndexTask("failsToResumePausedTask",
+ DATASOURCE,
+ 1,
+ new
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(1, 0L),
Collections.emptySet()),
+ new
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, Long.MAX_VALUE)),
+ null,
+ null,
+
supervisor.getTuningConfig()
+ );
+
+ List<Task> tasks = ImmutableList.of(readingTask, publishingTask,
pausedTask, failsToResumePausedTask);
+ Collection taskRunnerWorkItems = ImmutableList.of(
+ new TestTaskRunnerWorkItem(readingTask, null, new
TaskLocation("testHost", 1001, -1)),
+ new TestTaskRunnerWorkItem(publishingTask, null, new
TaskLocation("testHost", 1002, -1)),
+ new TestTaskRunnerWorkItem(pausedTask, null, new
TaskLocation("testHost", 1003, -1)),
+ new TestTaskRunnerWorkItem(failsToResumePausedTask, null, new
TaskLocation("testHost", 1004, -1))
+ );
+
+ DateTime startTime = DateTimes.nowUtc();
+
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(taskRunnerWorkItems).anyTimes();
+
+
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
+
+ EasyMock.expect(taskStorage.getStatus(readingTask.getId()))
+
.andReturn(Optional.of(TaskStatus.running(readingTask.getId()))).anyTimes();
+ EasyMock.expect(taskStorage.getStatus(publishingTask.getId()))
+
.andReturn(Optional.of(TaskStatus.running(publishingTask.getId()))).anyTimes();
+ EasyMock.expect(taskStorage.getStatus(pausedTask.getId()))
+
.andReturn(Optional.of(TaskStatus.running(pausedTask.getId()))).anyTimes();
+ EasyMock.expect(taskStorage.getStatus(failsToResumePausedTask.getId()))
+
.andReturn(Optional.of(TaskStatus.running(failsToResumePausedTask.getId()))).anyTimes();
+
+ EasyMock.expect(taskStorage.getTask(readingTask.getId()))
+ .andReturn(Optional.of(readingTask)).anyTimes();
+ EasyMock.expect(taskStorage.getTask(publishingTask.getId()))
+ .andReturn(Optional.of(publishingTask)).anyTimes();
+ EasyMock.expect(taskStorage.getTask(pausedTask.getId()))
+ .andReturn(Optional.of(pausedTask)).anyTimes();
+ EasyMock.expect(taskStorage.getTask(failsToResumePausedTask.getId()))
+ .andReturn(Optional.of(failsToResumePausedTask)).anyTimes();
+
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+ EasyMock.expect(taskClient.getStatusAsync(readingTask.getId()))
+ .andReturn(Futures.immediateFuture(Status.READING));
+ EasyMock.expect(taskClient.getStatusAsync(publishingTask.getId()))
+ .andReturn(Futures.immediateFuture(Status.PUBLISHING));
+ EasyMock.expect(taskClient.getStatusAsync(pausedTask.getId()))
+ .andReturn(Futures.immediateFuture(Status.PAUSED));
+ EasyMock.expect(taskClient.getStatusAsync(failsToResumePausedTask.getId()))
+ .andReturn(Futures.immediateFuture(Status.PAUSED));
+
+
EasyMock.expect(taskClient.getEndOffsets(publishingTask.getId())).andReturn(ImmutableMap.of(0,
0L));
+
+ EasyMock.expect(taskClient.getCheckpointsAsync(readingTask.getId(), true))
+ .andReturn(Futures.immediateFuture(new TreeMap<>())).anyTimes();
+
+ EasyMock.expect(taskClient.getCheckpointsAsync(pausedTask.getId(), true))
+ .andReturn(Futures.immediateFuture(new TreeMap<>()));
+
+
EasyMock.expect(taskClient.getCheckpointsAsync(failsToResumePausedTask.getId(),
true))
+ .andReturn(Futures.immediateFuture(new TreeMap<>()));
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+
+ // Only the active i.e non-publishing tasks are resumed
+
EasyMock.expect(taskClient.getStartTimeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.resumeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(true));
+
+
EasyMock.expect(taskClient.getStartTimeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.resumeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(true));
+
+
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.resumeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(false));
+
+ Capture<String> shutdownTaskId = EasyMock.newCapture();
+ // The task which failed to resume is shutdown forcibly.
+ taskQueue.shutdown(EasyMock.capture(shutdownTaskId), EasyMock.anyString());
+ EasyMock.expectLastCall();
+
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ Assert.assertEquals(failsToResumePausedTask.getId(),
shutdownTaskId.getValue());
+
+ verifyAll();
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
// create topic manually
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 7496b69773..7ab920ce11 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -170,6 +170,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskRunner = createMock(TaskRunner.class);
indexerMetadataStorageCoordinator =
createMock(IndexerMetadataStorageCoordinator.class);
taskClient = createMock(KinesisIndexTaskClient.class);
+
EasyMock.expect(taskClient.resumeAsync(EasyMock.anyString())).andReturn(Futures.immediateFuture(true)).anyTimes();
taskQueue = createMock(TaskQueue.class);
supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 90258e386f..2754fe72f2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1950,6 +1950,58 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// make sure the checkpoints are consistent with each other and with the
metadata store
verifyAndMergeCheckpoints(taskGroupsToVerify.values());
+
+ // A pause from the previous Overlord's supervisor, immediately before
leader change,
+ // can lead to tasks being in a state where they are active but do not
read.
+ resumeAllActivelyReadingTasks();
+ }
+
+ /**
+ * If this is the first run, resume all tasks in the set of
activelyReadingTaskGroups
+ * Paused tasks will be resumed
+ * Other tasks in this set are not affected adversely by the resume operation
+ */
+ private void resumeAllActivelyReadingTasks()
+ {
+ if (!getState().isFirstRunOnly()) {
+ return;
+ }
+
+ Map<String, ListenableFuture<Boolean>> tasksToResume = new HashMap<>();
+ for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+ for (String taskId : taskGroup.tasks.keySet()) {
+ tasksToResume.put(taskId, taskClient.resumeAsync(taskId));
+ }
+ }
+
+ for (Map.Entry<String, ListenableFuture<Boolean>> entry :
tasksToResume.entrySet()) {
+ String taskId = entry.getKey();
+ ListenableFuture<Boolean> future = entry.getValue();
+ future.addListener(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try {
+ if (entry.getValue().get()) {
+ log.info("Resumed task [%s] in first supervisor run.",
taskId);
+ } else {
+ log.warn("Failed to resume task [%s] in first supervisor
run.", taskId);
+ killTask(taskId,
+ "Killing forcefully as task could not be resumed in
the first supervisor run after Overlord change.");
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to resume task [%s] in first supervisor
run.", taskId);
+ killTask(taskId,
+ "Killing forcefully as task could not be resumed in
the first supervisor run after Overlord change.");
+ }
+ }
+ },
+ workerExec
+ );
+ }
}
private void verifyAndMergeCheckpoints(final Collection<TaskGroup>
taskGroupsToVerify)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]