This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ff3ca289 Resume streaming tasks on Overlord switch (#13223)
e1ff3ca289 is described below

commit e1ff3ca28904913cb2cd3876f7ef232531b7c4ba
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Sat Oct 29 09:38:49 2022 +0530

    Resume streaming tasks on Overlord switch (#13223)
    
    * Resume streaming tasks on Overlord switch
    
    * Refactoring and better messages
    
    * Better docs
    
    * Add unit test
    
    * Fix tests' setup
    
    * Update 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Update 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Better logs
    
    * Fix test again
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 138 +++++++++++++++++++++
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   1 +
 .../supervisor/SeekableStreamSupervisor.java       |  52 ++++++++
 3 files changed, 191 insertions(+)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 78b010b3b3..e5f60db005 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -214,6 +214,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     taskRunner = createMock(TaskRunner.class);
     indexerMetadataStorageCoordinator = 
createMock(IndexerMetadataStorageCoordinator.class);
     taskClient = createMock(KafkaIndexTaskClient.class);
+    
EasyMock.expect(taskClient.resumeAsync(EasyMock.anyString())).andReturn(Futures.immediateFuture(true)).anyTimes();
     taskQueue = createMock(TaskQueue.class);
 
     topic = getTopic();
@@ -3921,6 +3922,143 @@ public class KafkaSupervisorTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testResumeAllActivelyReadingTasks() throws Exception
+  {
+    supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
+    // Mock with task based setup for resumeAsync
+    EasyMock.reset(taskClient);
+    addSomeEvents(100);
+
+    KafkaIndexTask readingTask = createKafkaIndexTask("readingTask",
+                                                      DATASOURCE,
+                                                      0,
+                                                      new 
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), 
Collections.emptySet()),
+                                                      new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+                                                      null,
+                                                      null,
+                                                      
supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask publishingTask = createKafkaIndexTask("publishingTask",
+                                                         DATASOURCE,
+                                                         1,
+                                                         new 
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), 
Collections.emptySet()),
+                                                         new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+                                                         null,
+                                                         null,
+                                                         
supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask pausedTask = createKafkaIndexTask("pausedTask",
+                                                     DATASOURCE,
+                                                     1,
+                                                     new 
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(1, 0L), 
Collections.emptySet()),
+                                                     new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, Long.MAX_VALUE)),
+                                                     null,
+                                                     null,
+                                                     
supervisor.getTuningConfig()
+    );
+
+    KafkaIndexTask failsToResumePausedTask = 
createKafkaIndexTask("failsToResumePausedTask",
+                                                                  DATASOURCE,
+                                                                  1,
+                                                                  new 
SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(1, 0L), 
Collections.emptySet()),
+                                                                  new 
SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(1, Long.MAX_VALUE)),
+                                                                  null,
+                                                                  null,
+                                                                  
supervisor.getTuningConfig()
+    );
+
+    List<Task> tasks = ImmutableList.of(readingTask, publishingTask, 
pausedTask, failsToResumePausedTask);
+    Collection taskRunnerWorkItems = ImmutableList.of(
+        new TestTaskRunnerWorkItem(readingTask, null, new 
TaskLocation("testHost", 1001, -1)),
+        new TestTaskRunnerWorkItem(publishingTask, null, new 
TaskLocation("testHost", 1002, -1)),
+        new TestTaskRunnerWorkItem(pausedTask, null, new 
TaskLocation("testHost", 1003, -1)),
+        new TestTaskRunnerWorkItem(failsToResumePausedTask, null, new 
TaskLocation("testHost", 1004, -1))
+    );
+
+    DateTime startTime = DateTimes.nowUtc();
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(taskRunnerWorkItems).anyTimes();
+
+    
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(tasks).anyTimes();
+
+    EasyMock.expect(taskStorage.getStatus(readingTask.getId()))
+            
.andReturn(Optional.of(TaskStatus.running(readingTask.getId()))).anyTimes();
+    EasyMock.expect(taskStorage.getStatus(publishingTask.getId()))
+            
.andReturn(Optional.of(TaskStatus.running(publishingTask.getId()))).anyTimes();
+    EasyMock.expect(taskStorage.getStatus(pausedTask.getId()))
+            
.andReturn(Optional.of(TaskStatus.running(pausedTask.getId()))).anyTimes();
+    EasyMock.expect(taskStorage.getStatus(failsToResumePausedTask.getId()))
+            
.andReturn(Optional.of(TaskStatus.running(failsToResumePausedTask.getId()))).anyTimes();
+
+    EasyMock.expect(taskStorage.getTask(readingTask.getId()))
+            .andReturn(Optional.of(readingTask)).anyTimes();
+    EasyMock.expect(taskStorage.getTask(publishingTask.getId()))
+            .andReturn(Optional.of(publishingTask)).anyTimes();
+    EasyMock.expect(taskStorage.getTask(pausedTask.getId()))
+            .andReturn(Optional.of(pausedTask)).anyTimes();
+    EasyMock.expect(taskStorage.getTask(failsToResumePausedTask.getId()))
+            .andReturn(Optional.of(failsToResumePausedTask)).anyTimes();
+
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+    EasyMock.expect(taskClient.getStatusAsync(readingTask.getId()))
+            .andReturn(Futures.immediateFuture(Status.READING));
+    EasyMock.expect(taskClient.getStatusAsync(publishingTask.getId()))
+            .andReturn(Futures.immediateFuture(Status.PUBLISHING));
+    EasyMock.expect(taskClient.getStatusAsync(pausedTask.getId()))
+            .andReturn(Futures.immediateFuture(Status.PAUSED));
+    EasyMock.expect(taskClient.getStatusAsync(failsToResumePausedTask.getId()))
+            .andReturn(Futures.immediateFuture(Status.PAUSED));
+
+    
EasyMock.expect(taskClient.getEndOffsets(publishingTask.getId())).andReturn(ImmutableMap.of(0,
 0L));
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(readingTask.getId(), true))
+            .andReturn(Futures.immediateFuture(new TreeMap<>())).anyTimes();
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(pausedTask.getId(), true))
+            .andReturn(Futures.immediateFuture(new TreeMap<>()));
+
+    
EasyMock.expect(taskClient.getCheckpointsAsync(failsToResumePausedTask.getId(), 
true))
+            .andReturn(Futures.immediateFuture(new TreeMap<>()));
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+
+    // Only the active i.e non-publishing tasks are resumed
+    
EasyMock.expect(taskClient.getStartTimeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.resumeAsync(readingTask.getId())).andReturn(Futures.immediateFuture(true));
+
+    
EasyMock.expect(taskClient.getStartTimeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.resumeAsync(pausedTask.getId())).andReturn(Futures.immediateFuture(true));
+
+    
EasyMock.expect(taskClient.getStartTimeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(startTime));
+    
EasyMock.expect(taskClient.resumeAsync(failsToResumePausedTask.getId())).andReturn(Futures.immediateFuture(false));
+
+    Capture<String> shutdownTaskId = EasyMock.newCapture();
+    // The task which failed to resume is shutdown forcibly.
+    taskQueue.shutdown(EasyMock.capture(shutdownTaskId), EasyMock.anyString());
+    EasyMock.expectLastCall();
+
+    replayAll();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    Assert.assertEquals(failsToResumePausedTask.getId(), 
shutdownTaskId.getValue());
+
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     // create topic manually
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 7496b69773..7ab920ce11 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -170,6 +170,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     taskRunner = createMock(TaskRunner.class);
     indexerMetadataStorageCoordinator = 
createMock(IndexerMetadataStorageCoordinator.class);
     taskClient = createMock(KinesisIndexTaskClient.class);
+    
EasyMock.expect(taskClient.resumeAsync(EasyMock.anyString())).andReturn(Futures.immediateFuture(true)).anyTimes();
     taskQueue = createMock(TaskQueue.class);
     supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 90258e386f..2754fe72f2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1950,6 +1950,58 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // make sure the checkpoints are consistent with each other and with the 
metadata store
 
     verifyAndMergeCheckpoints(taskGroupsToVerify.values());
+
+    // A pause from the previous Overlord's supervisor, immediately before 
leader change,
+    // can lead to tasks being in a state where they are active but do not 
read.
+    resumeAllActivelyReadingTasks();
+  }
+
+  /**
+   * If this is the first run, resume all tasks in the set of 
activelyReadingTaskGroups
+   * Paused tasks will be resumed
+   * Other tasks in this set are not affected adversely by the resume operation
+   */
+  private void resumeAllActivelyReadingTasks()
+  {
+    if (!getState().isFirstRunOnly()) {
+      return;
+    }
+
+    Map<String, ListenableFuture<Boolean>> tasksToResume = new HashMap<>();
+    for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+      for (String taskId : taskGroup.tasks.keySet()) {
+        tasksToResume.put(taskId, taskClient.resumeAsync(taskId));
+      }
+    }
+
+    for (Map.Entry<String, ListenableFuture<Boolean>> entry : 
tasksToResume.entrySet()) {
+      String taskId = entry.getKey();
+      ListenableFuture<Boolean> future = entry.getValue();
+      future.addListener(
+          new Runnable()
+          {
+            @Override
+            public void run()
+            {
+              try {
+                if (entry.getValue().get()) {
+                  log.info("Resumed task [%s] in first supervisor run.", 
taskId);
+                } else {
+                  log.warn("Failed to resume task [%s] in first supervisor 
run.", taskId);
+                  killTask(taskId,
+                           "Killing forcefully as task could not be resumed in 
the first supervisor run after Overlord change.");
+                }
+              }
+              catch (Exception e) {
+                log.warn(e, "Failed to resume task [%s] in first supervisor 
run.", taskId);
+                killTask(taskId,
+                         "Killing forcefully as task could not be resumed in 
the first supervisor run after Overlord change.");
+              }
+            }
+          },
+          workerExec
+      );
+    }
   }
 
   private void verifyAndMergeCheckpoints(final Collection<TaskGroup> 
taskGroupsToVerify)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to