This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ed5c863f5 Enhance rolling Supervisor restarts at taskDuration 
(#15859)
19ed5c863f5 is described below

commit 19ed5c863f5ae660111c4ec6d595bb93d4154f95
Author: YongGang <[email protected]>
AuthorDate: Wed Feb 14 15:44:34 2024 -0800

    Enhance rolling Supervisor restarts at taskDuration (#15859)
---
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   2 +
 .../supervisor/KinesisSupervisorIOConfigTest.java  |   2 +
 .../supervisor/SeekableStreamSupervisor.java       |  85 +++++---
 .../SeekableStreamSupervisorIOConfig.java          |  16 +-
 .../SeekableStreamSupervisorStateTest.java         | 222 +++++++++++++++++++++
 5 files changed, 291 insertions(+), 36 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 231705418f5..c2aee78b3cb 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -79,6 +79,8 @@ public class KafkaSupervisorIOConfigTest
     Assert.assertNull(config.getTopicPattern());
     Assert.assertEquals(1, (int) config.getReplicas());
     Assert.assertEquals(1, (int) config.getTaskCount());
+    Assert.assertNull(config.getStopTaskCount());
+    Assert.assertEquals((int) config.getTaskCount(), 
config.getMaxAllowedStops());
     Assert.assertEquals(Duration.standardMinutes(60), 
config.getTaskDuration());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", 
"localhost:9092"), config.getConsumerProperties());
     Assert.assertEquals(100, config.getPollTimeout());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index 4de35cf5e5d..9f5c0bf7504 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -62,6 +62,8 @@ public class KinesisSupervisorIOConfigTest
     Assert.assertEquals(KinesisRegion.US_EAST_1.getEndpoint(), 
config.getEndpoint());
     Assert.assertEquals(1, (int) config.getReplicas());
     Assert.assertEquals(1, (int) config.getTaskCount());
+    Assert.assertNull(config.getStopTaskCount());
+    Assert.assertEquals((int) config.getTaskCount(), 
config.getMaxAllowedStops());
     Assert.assertEquals(Duration.standardMinutes(60), 
config.getTaskDuration());
     Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
     Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 37596bffde9..5e937fe69eb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3105,45 +3105,59 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> 
futures = new ArrayList<>();
     final List<Integer> futureGroupIds = new ArrayList<>();
 
-    boolean stopTasksEarly = false;
+    boolean stopTasksEarly;
     if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || 
earlyStopTime.isEqualNow())) {
       log.info("Early stop requested - signalling tasks to complete");
 
       earlyStopTime = null;
       stopTasksEarly = true;
-    }
-
-    int stoppedTasks = 0;
-    for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
-      Integer groupId = entry.getKey();
-      TaskGroup group = entry.getValue();
-
-      if (stopTasksEarly) {
-        log.info("Stopping task group [%d] early. It has run for [%s]", 
groupId, ioConfig.getTaskDuration());
-        futureGroupIds.add(groupId);
-        futures.add(checkpointTaskGroup(group, true));
-      } else {
-        // find the longest running task from this group
-        DateTime earliestTaskStart = DateTimes.nowUtc();
-        for (TaskData taskData : group.tasks.values()) {
-          if (taskData.startTime != null && 
earliestTaskStart.isAfter(taskData.startTime)) {
-            earliestTaskStart = taskData.startTime;
-          }
-        }
-
-        if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
-          // if this task has run longer than the configured duration
-          // as long as the pending task groups are less than the configured 
stop task count.
-          if 
(pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum()
 + stoppedTasks
-                 < ioConfig.getStopTaskCount()) {
-            log.info("Task group [%d] has run for [%s]. Stopping.", groupId, 
ioConfig.getTaskDuration());
+    } else {
+      stopTasksEarly = false;
+    }
+
+    AtomicInteger stoppedTasks = new AtomicInteger();
+    // Sort task groups by start time to prioritize early termination of 
earlier groups, then iterate for processing
+    activelyReadingTaskGroups
+        .entrySet().stream().sorted(
+            Comparator.comparingLong(
+                (Entry<Integer, TaskGroup> entry) ->
+                    computeEarliestTaskStartTime(entry.getValue())
+                        .getMillis()))
+        .forEach(entry -> {
+          Integer groupId = entry.getKey();
+          TaskGroup group = entry.getValue();
+
+          if (stopTasksEarly) {
+            log.info(
+                "Stopping task group [%d] early. It has run for [%s]",
+                groupId,
+                ioConfig.getTaskDuration()
+            );
             futureGroupIds.add(groupId);
             futures.add(checkpointTaskGroup(group, true));
-            stoppedTasks++;
+          } else {
+            DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
+
+            if 
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+              // if this task has run longer than the configured duration
+              // as long as the pending task groups are less than the 
configured stop task count.
+              if (pendingCompletionTaskGroups.values()
+                                             .stream()
+                                             
.mapToInt(CopyOnWriteArrayList::size)
+                                             .sum() + stoppedTasks.get()
+                  < ioConfig.getMaxAllowedStops()) {
+                log.info(
+                    "Task group [%d] has run for [%s]. Stopping.",
+                    groupId,
+                    ioConfig.getTaskDuration()
+                );
+                futureGroupIds.add(groupId);
+                futures.add(checkpointTaskGroup(group, true));
+                stoppedTasks.getAndIncrement();
+              }
+            }
           }
-        }
-      }
-    }
+        });
 
     List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results 
= coalesceAndAwait(futures);
     for (int j = 0; j < results.size(); j++) {
@@ -3200,6 +3214,15 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private DateTime computeEarliestTaskStartTime(TaskGroup group)
+  {
+    return group.tasks.values().stream()
+                      .filter(taskData -> taskData.startTime != null)
+                      .map(taskData -> taskData.startTime)
+                      .min(DateTime::compareTo)
+                      .orElse(DateTimes.nowUtc());
+  }
+
   private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> 
checkpointTaskGroup(
       final TaskGroup taskGroup,
       final boolean finalize
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index d49ceaa260c..90ba05e56cc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -49,8 +49,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   private final Optional<DateTime> lateMessageRejectionStartDateTime;
   @Nullable private final AutoScalerConfig autoScalerConfig;
   @Nullable private final IdleConfig idleConfig;
-
-  private final int stopTaskCount;
+  @Nullable private final Integer stopTaskCount;
 
   public SeekableStreamSupervisorIOConfig(
       String stream,
@@ -81,8 +80,9 @@ public abstract class SeekableStreamSupervisorIOConfig
     } else {
       this.taskCount = taskCount != null ? taskCount : 1;
     }
-    this.stopTaskCount = stopTaskCount == null ? this.taskCount : 
stopTaskCount;
-    Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be 
greater than 0");
+    Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
+                                "stopTaskCount must be greater than 0");
+    this.stopTaskCount = stopTaskCount;
     this.taskDuration = defaultDuration(taskDuration, "PT1H");
     this.startDelay = defaultDuration(startDelay, "PT5S");
     this.period = defaultDuration(period, "PT30S");
@@ -205,9 +205,15 @@ public abstract class SeekableStreamSupervisorIOConfig
     return idleConfig;
   }
 
+  @Nullable
   @JsonProperty
-  public int getStopTaskCount()
+  public Integer getStopTaskCount()
   {
     return stopTaskCount;
   }
+
+  public int getMaxAllowedStops()
+  {
+    return stopTaskCount == null ? taskCount : stopTaskCount;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 2a69ff064cb..840b4e9f69a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -979,6 +979,228 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
   }
 
+  @Test(timeout = 60_000L)
+  public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws 
InterruptedException, JsonProcessingException
+  {
+    // Assuming tasks have surpassed their duration limit at test execution
+    DateTime startTime = DateTimes.nowUtc().minusHours(2);
+    // Configure supervisor to stop only one task at a time
+    int stopTaskCount = 1;
+    SeekableStreamSupervisorIOConfig ioConfig = new 
SeekableStreamSupervisorIOConfig(
+        STREAM,
+        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+        1,
+        3,
+        new Period("PT1H"),
+        new Period("PT1S"),
+        new Period("PT30S"),
+        false,
+        new Period("PT30M"),
+        null,
+        null,
+        null,
+        null,
+        new IdleConfig(true, 200L),
+        stopTaskCount
+    )
+    {
+    };
+
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new 
DruidMonitorSchedulerConfig()
+    {
+      @Override
+      public Duration getEmissionDuration()
+      {
+        return new Period("PT2S").toStandardDuration();
+      }
+    }).anyTimes();
+    EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    SeekableStreamIndexTaskTuningConfig taskTuningConfig = 
getTuningConfig().convertToTaskTuningConfig();
+
+    TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
+    sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
+
+    Map<String, Object> context = new HashMap<>();
+    context.put("checkpoints", new 
ObjectMapper().writeValueAsString(sequenceOffsets));
+
+    TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+        "id1",
+        null,
+        getDataSchema(),
+        taskTuningConfig,
+        createTaskIoConfigExt(
+            0,
+            Collections.singletonMap("0", "10"),
+            Collections.singletonMap("0", "20"),
+            "test",
+            startTime,
+            null,
+            Collections.emptySet(),
+            ioConfig
+        ),
+        context,
+        "0"
+    );
+
+    TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+        "id2",
+        null,
+        getDataSchema(),
+        taskTuningConfig,
+        createTaskIoConfigExt(
+            1,
+            Collections.singletonMap("1", "10"),
+            Collections.singletonMap("1", "20"),
+            "test",
+            startTime,
+            null,
+            Collections.emptySet(),
+            ioConfig
+        ),
+        context,
+        "1"
+    );
+
+    TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
+        "id3",
+        null,
+        getDataSchema(),
+        taskTuningConfig,
+        createTaskIoConfigExt(
+            2,
+            Collections.singletonMap("2", "10"),
+            Collections.singletonMap("2", "20"),
+            "test",
+            startTime,
+            null,
+            Collections.emptySet(),
+            ioConfig
+        ),
+        context,
+        "2"
+    );
+
+    final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+    final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+    final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+    workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
+
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
+            .andReturn(ImmutableList.of(id1, id2, id3))
+            .anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();
+
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+            .andReturn(new 
TestSeekableStreamDataSourceMetadata(null)).anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
+            
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+            .anyTimes();
+
+    EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
+            .andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
+            .anyTimes();
+    // Mocking to return the earliest start time for task id2, indicating it's 
the first group to start
+    EasyMock.expect(indexTaskClient.getStartTimeAsync("id2"))
+            .andReturn(Futures.immediateFuture(startTime)).anyTimes();
+    EasyMock.expect(indexTaskClient.getStartTimeAsync("id3"))
+            .andReturn(Futures.immediateFuture(startTime.plusSeconds(2)))
+            .anyTimes();
+
+    ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
+    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, partitionOffset);
+
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id1"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id2"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id3"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id1"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id2"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id3"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+
+    // Expect the earliest-started task (id2) to transition to publishing first
+    taskQueue.shutdown("id2", "All tasks in group[%s] failed to transition to 
publishing state", 1);
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    supervisor.checkpoint(
+        0,
+        new TestSeekableStreamDataSourceMetadata(
+            new SeekableStreamStartSequenceNumbers<>(STREAM, 
checkpoints.get(0), ImmutableSet.of())
+        )
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
+  }
+
   @Test
   public void testEmitBothLag() throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to