This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 19ed5c863f5 Enhance rolling Supervisor restarts at taskDuration
(#15859)
19ed5c863f5 is described below
commit 19ed5c863f5ae660111c4ec6d595bb93d4154f95
Author: YongGang <[email protected]>
AuthorDate: Wed Feb 14 15:44:34 2024 -0800
Enhance rolling Supervisor restarts at taskDuration (#15859)
---
.../supervisor/KafkaSupervisorIOConfigTest.java | 2 +
.../supervisor/KinesisSupervisorIOConfigTest.java | 2 +
.../supervisor/SeekableStreamSupervisor.java | 85 +++++---
.../SeekableStreamSupervisorIOConfig.java | 16 +-
.../SeekableStreamSupervisorStateTest.java | 222 +++++++++++++++++++++
5 files changed, 291 insertions(+), 36 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 231705418f5..c2aee78b3cb 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -79,6 +79,8 @@ public class KafkaSupervisorIOConfigTest
Assert.assertNull(config.getTopicPattern());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
+ Assert.assertNull(config.getStopTaskCount());
+ Assert.assertEquals((int) config.getTaskCount(),
config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60),
config.getTaskDuration());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers",
"localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(100, config.getPollTimeout());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
index 4de35cf5e5d..9f5c0bf7504 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java
@@ -62,6 +62,8 @@ public class KinesisSupervisorIOConfigTest
Assert.assertEquals(KinesisRegion.US_EAST_1.getEndpoint(),
config.getEndpoint());
Assert.assertEquals(1, (int) config.getReplicas());
Assert.assertEquals(1, (int) config.getTaskCount());
+ Assert.assertNull(config.getStopTaskCount());
+ Assert.assertEquals((int) config.getTaskCount(),
config.getMaxAllowedStops());
Assert.assertEquals(Duration.standardMinutes(60),
config.getTaskDuration());
Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay());
Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 37596bffde9..5e937fe69eb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -3105,45 +3105,59 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>>
futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();
- boolean stopTasksEarly = false;
+ boolean stopTasksEarly;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() ||
earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");
earlyStopTime = null;
stopTasksEarly = true;
- }
-
- int stoppedTasks = 0;
- for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
- Integer groupId = entry.getKey();
- TaskGroup group = entry.getValue();
-
- if (stopTasksEarly) {
- log.info("Stopping task group [%d] early. It has run for [%s]",
groupId, ioConfig.getTaskDuration());
- futureGroupIds.add(groupId);
- futures.add(checkpointTaskGroup(group, true));
- } else {
- // find the longest running task from this group
- DateTime earliestTaskStart = DateTimes.nowUtc();
- for (TaskData taskData : group.tasks.values()) {
- if (taskData.startTime != null &&
earliestTaskStart.isAfter(taskData.startTime)) {
- earliestTaskStart = taskData.startTime;
- }
- }
-
- if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
- // if this task has run longer than the configured duration
- // as long as the pending task groups are less than the configured
stop task count.
- if
(pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum()
+ stoppedTasks
- < ioConfig.getStopTaskCount()) {
- log.info("Task group [%d] has run for [%s]. Stopping.", groupId,
ioConfig.getTaskDuration());
+ } else {
+ stopTasksEarly = false;
+ }
+
+ AtomicInteger stoppedTasks = new AtomicInteger();
+ // Sort task groups by start time to prioritize early termination of
earlier groups, then iterate for processing
+ activelyReadingTaskGroups
+ .entrySet().stream().sorted(
+ Comparator.comparingLong(
+ (Entry<Integer, TaskGroup> entry) ->
+ computeEarliestTaskStartTime(entry.getValue())
+ .getMillis()))
+ .forEach(entry -> {
+ Integer groupId = entry.getKey();
+ TaskGroup group = entry.getValue();
+
+ if (stopTasksEarly) {
+ log.info(
+ "Stopping task group [%d] early. It has run for [%s]",
+ groupId,
+ ioConfig.getTaskDuration()
+ );
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
- stoppedTasks++;
+ } else {
+ DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
+
+ if
(earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
+ // if this task has run longer than the configured duration
+ // as long as the pending task groups are less than the
configured stop task count.
+ if (pendingCompletionTaskGroups.values()
+ .stream()
+
.mapToInt(CopyOnWriteArrayList::size)
+ .sum() + stoppedTasks.get()
+ < ioConfig.getMaxAllowedStops()) {
+ log.info(
+ "Task group [%d] has run for [%s]. Stopping.",
+ groupId,
+ ioConfig.getTaskDuration()
+ );
+ futureGroupIds.add(groupId);
+ futures.add(checkpointTaskGroup(group, true));
+ stoppedTasks.getAndIncrement();
+ }
+ }
}
- }
- }
- }
+ });
List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> results
= coalesceAndAwait(futures);
for (int j = 0; j < results.size(); j++) {
@@ -3200,6 +3214,15 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ private DateTime computeEarliestTaskStartTime(TaskGroup group)
+ {
+ return group.tasks.values().stream()
+ .filter(taskData -> taskData.startTime != null)
+ .map(taskData -> taskData.startTime)
+ .min(DateTime::compareTo)
+ .orElse(DateTimes.nowUtc());
+ }
+
private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>
checkpointTaskGroup(
final TaskGroup taskGroup,
final boolean finalize
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index d49ceaa260c..90ba05e56cc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -49,8 +49,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;
-
- private final int stopTaskCount;
+ @Nullable private final Integer stopTaskCount;
public SeekableStreamSupervisorIOConfig(
String stream,
@@ -81,8 +80,9 @@ public abstract class SeekableStreamSupervisorIOConfig
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
- this.stopTaskCount = stopTaskCount == null ? this.taskCount :
stopTaskCount;
- Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be
greater than 0");
+ Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
+ "stopTaskCount must be greater than 0");
+ this.stopTaskCount = stopTaskCount;
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
@@ -205,9 +205,15 @@ public abstract class SeekableStreamSupervisorIOConfig
return idleConfig;
}
+ @Nullable
@JsonProperty
- public int getStopTaskCount()
+ public Integer getStopTaskCount()
{
return stopTaskCount;
}
+
+ public int getMaxAllowedStops()
+ {
+ return stopTaskCount == null ? taskCount : stopTaskCount;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 2a69ff064cb..840b4e9f69a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -979,6 +979,228 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
}
+ @Test(timeout = 60_000L)
+ public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws
InterruptedException, JsonProcessingException
+ {
+ // Assuming tasks have surpassed their duration limit at test execution
+ DateTime startTime = DateTimes.nowUtc().minusHours(2);
+ // Configure supervisor to stop only one task at a time
+ int stopTaskCount = 1;
+ SeekableStreamSupervisorIOConfig ioConfig = new
SeekableStreamSupervisorIOConfig(
+ STREAM,
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
+ 1,
+ 3,
+ new Period("PT1H"),
+ new Period("PT1S"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ null,
+ new IdleConfig(true, 200L),
+ stopTaskCount
+ )
+ {
+ };
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new
DruidMonitorSchedulerConfig()
+ {
+ @Override
+ public Duration getEmissionDuration()
+ {
+ return new Period("PT2S").toStandardDuration();
+ }
+ }).anyTimes();
+ EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+ EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig =
getTuningConfig().convertToTaskTuningConfig();
+
+ TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
+ sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
+
+ Map<String, Object> context = new HashMap<>();
+ context.put("checkpoints", new
ObjectMapper().writeValueAsString(sequenceOffsets));
+
+ TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+ "id1",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ createTaskIoConfigExt(
+ 0,
+ Collections.singletonMap("0", "10"),
+ Collections.singletonMap("0", "20"),
+ "test",
+ startTime,
+ null,
+ Collections.emptySet(),
+ ioConfig
+ ),
+ context,
+ "0"
+ );
+
+ TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+ "id2",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ createTaskIoConfigExt(
+ 1,
+ Collections.singletonMap("1", "10"),
+ Collections.singletonMap("1", "20"),
+ "test",
+ startTime,
+ null,
+ Collections.emptySet(),
+ ioConfig
+ ),
+ context,
+ "1"
+ );
+
+ TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
+ "id3",
+ null,
+ getDataSchema(),
+ taskTuningConfig,
+ createTaskIoConfigExt(
+ 2,
+ Collections.singletonMap("2", "10"),
+ Collections.singletonMap("2", "20"),
+ "test",
+ startTime,
+ null,
+ Collections.emptySet(),
+ ioConfig
+ ),
+ context,
+ "2"
+ );
+
+ final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+ final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+ workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+ workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));
+
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
+ .andReturn(ImmutableList.of(id1, id2, id3))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();
+
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
+ .andReturn(new
TestSeekableStreamDataSourceMetadata(null)).anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+
+ EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
+ .andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
+ .anyTimes();
+ // Mocking to return the earliest start time for task id2, indicating it's
the first group to start
+ EasyMock.expect(indexTaskClient.getStartTimeAsync("id2"))
+ .andReturn(Futures.immediateFuture(startTime)).anyTimes();
+ EasyMock.expect(indexTaskClient.getStartTimeAsync("id3"))
+ .andReturn(Futures.immediateFuture(startTime.plusSeconds(2)))
+ .anyTimes();
+
+ ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
+ final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, partitionOffset);
+
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset,
false))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id1"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id2"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.resumeAsync("id3"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id1"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id2"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.pauseAsync("id3"))
+ .andReturn(Futures.immediateFuture(true))
+ .anyTimes();
+
+ // Expect the earliest-started task (id2) to transition to publishing first
+ taskQueue.shutdown("id2", "All tasks in group[%s] failed to transition to
publishing state", 1);
+
+ replayAll();
+
+ SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ supervisor.checkpoint(
+ 0,
+ new TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamStartSequenceNumbers<>(STREAM,
checkpoints.get(0), ImmutableSet.of())
+ )
+ );
+
+ while (supervisor.getNoticesQueueSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ verifyAll();
+
+ Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
+ }
+
@Test
public void testEmitBothLag() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]