This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 99c97432ff2 perf: batch reset invalid stream offsets in stream
supervisor (#19431)
99c97432ff2 is described below
commit 99c97432ff265eb4892e2cb9eeeac7d73bdd748d
Author: jtuglu1 <[email protected]>
AuthorDate: Mon May 11 15:16:24 2026 -0700
perf: batch reset invalid stream offsets in stream supervisor (#19431)
I've noticed that recent Druid can be extremely slow to recover from a
lagging supervisor (where offsets are invalid and
tuningConfig.resetOffsetAutomatically=true). For each taskGroup, identify +
reset all invalid partition offsets in one go. Once we have established the set
of invalid offsets, perform an internal reset, then throw the exception. This
reduces the time-to-recovery of a fatally-lagged supervisor from N
runInternal() calls to 1 runInternal() call, where N is the # of in [...]
---
.../kafka/supervisor/KafkaSupervisorTest.java | 7 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 5 +-
.../supervisor/SeekableStreamSupervisor.java | 66 ++++++---
.../SeekableStreamSupervisorStateTest.java | 163 +++++++++++++++++++++
4 files changed, 215 insertions(+), 26 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index ddd4e6053a8..c629aa46192 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -3749,14 +3749,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 1, -100L, 2, 200L))
)
).times(3);
- // getOffsetFromStorageForPartition() throws an exception when the offsets
are automatically reset.
- // Since getOffsetFromStorageForPartition() is called per partition, all
partitions can't be reset at the same time.
- // Instead, subsequent partitions will be reset in the following
supervisor runs.
+ // All unavailable partitions are collected in a single pass and reset
together in one resetInternal() call.
+ // Only partition 1 (-100L) is unavailable (below earliest=0); partition 2
(200L) is valid since
+ // Kafka only checks offset >= earliest.
EasyMock.expect(
indexerMetadataStorageCoordinator.resetDataSourceMetadata(
DATASOURCE,
new KafkaDataSourceMetadata(
- // Only one partition is reset in a single supervisor run.
new SeekableStreamEndSequenceNumbers<>(topic,
singlePartitionMap(topic, 2, 200L))
)
)
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 522aebad4cf..cf91c37e375 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -2847,15 +2847,12 @@ public class KinesisSupervisorTest extends
EasyMockSupport
.andReturn(true)
.times(1);
- // getOffsetFromStorageForPartition() throws an exception when the offsets
are automatically reset.
- // Since getOffsetFromStorageForPartition() is called per partition, all
partitions can't be reset at the same time.
- // Instead, subsequent partitions will be reset in the following
supervisor runs.
+ // All unavailable partitions are collected in a single pass and reset
together in one resetInternal() call.
EasyMock
.expect(
indexerMetadataStorageCoordinator.resetDataSourceMetadata(
DATASOURCE,
new KinesisDataSourceMetadata(
- // Only one partition is reset in a single supervisor run.
new SeekableStreamEndSequenceNumbers<>(STREAM,
ImmutableMap.of())
)
)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 395b5da90ce..6a3207e6baa 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4099,6 +4099,12 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
// check that there is a current task group for each group of partitions
in [partitionGroups]
+ // Fetch metadata offsets once and collect all stale partitions across all
groups before committing
+ // any state changes. New TaskGroups are staged locally and only written
to activelyReadingTaskGroups
+ // if no reset is required, keeping state consistent: either all new
groups are committed or none are.
+ final Map<PartitionIdType, SequenceOffsetType> metadataOffsets =
getOffsetsFromMetadataStorage();
+ final Map<PartitionIdType, SequenceOffsetType> partitionsToReset = new
HashMap<>();
+ final Map<Integer, TaskGroup> newTaskGroups = new HashMap<>();
for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) {
log.info("Creating new taskGroup[%d] for partitions[%s].", groupId,
partitionGroups.get(groupId));
@@ -4118,7 +4124,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
: null;
final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>>
unfilteredStartingOffsets =
- generateStartingSequencesForPartitionGroup(groupId);
+ generateStartingSequencesForPartitionGroup(groupId,
metadataOffsets, partitionsToReset);
final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>>
startingOffsets;
if (supportsPartitionExpiration()) {
@@ -4167,8 +4173,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.collect(Collectors.toSet());
}
- log.info("Initializing taskGroup[%d] with startingOffsets[%s].",
groupId, simpleStartingOffsets);
- activelyReadingTaskGroups.put(
+ newTaskGroups.put(
groupId,
new TaskGroup(
groupId,
@@ -4182,6 +4187,22 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ // If any partitions need a reset, issue a single batch reset.
+ if (!partitionsToReset.isEmpty()) {
+ resetInternal(createDataSourceMetaDataForReset(ioConfig.getStream(),
partitionsToReset));
+ throw new StreamException(
+ new ISE(
+ "Previous sequenceNumbers %s are no longer available -
automatically resetting sequences",
+ partitionsToReset
+ )
+ );
+ }
+
+ for (Entry<Integer, TaskGroup> entry : newTaskGroups.entrySet()) {
+ log.info("Initializing taskGroup[%d] with startingOffsets[%s].",
entry.getKey(), entry.getValue().startingSequences);
+ activelyReadingTaskGroups.put(entry.getKey(), entry.getValue());
+ }
+
// iterate through all the current task groups and make sure each one has
the desired number of replica tasks
boolean createdTask = false;
for (Entry<Integer, TaskGroup> entry :
activelyReadingTaskGroups.entrySet()) {
@@ -4233,12 +4254,23 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return notices.size();
}
+ /**
+ * Builds the starting sequence map for one partition group.
+ *
+ * <p>Stale partitions (whose stored offsets are no longer available) are
added to
+ * {@code partitionsToReset} instead of the result map. The caller is
responsible for
+ * issuing the single batch reset after all groups have been processed.
+ *
+ * @param metadataOffsets pre-fetched metadata offsets shared across all
groups in this run
+ * @param partitionsToReset accumulator for partitions that need to be
reset; mutated in-place
+ */
private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>>
generateStartingSequencesForPartitionGroup(
- int groupId
+ int groupId,
+ Map<PartitionIdType, SequenceOffsetType> metadataOffsets,
+ Map<PartitionIdType, SequenceOffsetType> partitionsToReset
)
{
ImmutableMap.Builder<PartitionIdType,
OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
- final Map<PartitionIdType, SequenceOffsetType> metadataOffsets =
getOffsetsFromMetadataStorage();
for (PartitionIdType partitionId : partitionGroups.get(groupId)) {
SequenceOffsetType sequence = partitionOffsets.get(partitionId);
@@ -4252,7 +4284,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// get the sequence from metadata storage (if available) or
Kafka/Kinesis (otherwise)
OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage =
getOffsetFromStorageForPartition(
partitionId,
- metadataOffsets
+ metadataOffsets,
+ partitionsToReset
);
if (offsetFromStorage != null) {
@@ -4267,10 +4300,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* Queries the dataSource metadata table to see if there is a previous
ending sequence for this partition. If it
* doesn't find any data, it will retrieve the latest or earliest
Kafka/Kinesis sequence depending on the
* {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}.
+ *
+ * When {@code resetOffsetAutomatically} is enabled and the stored offset is
no longer available, the partition is
+ * added to {@code partitionsToReset} and {@code null} is returned. The
caller is responsible for performing the
+ * batch reset after iterating all partitions.
*/
+ @Nullable
private OrderedSequenceNumber<SequenceOffsetType>
getOffsetFromStorageForPartition(
PartitionIdType partition,
- final Map<PartitionIdType, SequenceOffsetType> metadataOffsets
+ final Map<PartitionIdType, SequenceOffsetType> metadataOffsets,
+ final Map<PartitionIdType, SequenceOffsetType> partitionsToReset
)
{
SequenceOffsetType sequence = metadataOffsets.get(partition);
@@ -4279,17 +4318,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) {
if (!checkOffsetAvailability(partition, sequence)) {
if (taskTuningConfig.isResetOffsetAutomatically()) {
- resetInternal(
- createDataSourceMetaDataForReset(ioConfig.getStream(),
ImmutableMap.of(partition, sequence))
- );
- throw new StreamException(
- new ISE(
- "Previous sequenceNumber [%s] is no longer available for
partition [%s] - automatically resetting"
- + " sequence",
- sequence,
- partition
- )
- );
+ partitionsToReset.put(partition, sequence);
+ return null;
} else {
throw new StreamException(
new ISE(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 4266a002b84..b0b7954bb12 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2886,6 +2886,169 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
verifyAll();
}
+ /**
+ * Verifies that when N stored offsets are all outside stream retention, all
N partitions are collected in a single
+ * pass and reset together in one {@code resetInternal()} call, producing
exactly one exception event.
+ */
+ @Test
+ public void testBatchResetOfMultipleUnavailableOffsets() throws IOException
+ {
+ final StreamPartition<String> shard1Partition = StreamPartition.of(STREAM,
"1");
+ final ImmutableMap<String, String> storedOffsets = ImmutableMap.of("0",
"5", "1", "10");
+
+ final SeekableStreamSupervisorTuningConfig tuningConfigWithAutoReset = new
SeekableStreamSupervisorTuningConfig()
+ {
+ @Override
+ public Integer getWorkerThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Long getChatRetries()
+ {
+ return 1L;
+ }
+
+ @Override
+ public Duration getHttpTimeout()
+ {
+ return new Period("PT1M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getShutdownTimeout()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+
+ @Override
+ public Duration getRepartitionTransitionDuration()
+ {
+ return new Period("PT2M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getOffsetFetchPeriod()
+ {
+ return new Period("PT5M").toStandardDuration();
+ }
+
+ @Override
+ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+ {
+ return new SeekableStreamIndexTaskTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ {
+ @Override
+ public SeekableStreamIndexTaskTuningConfig
withBasePersistDirectory(File dir)
+ {
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return null;
+ }
+ };
+ }
+ };
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(createSupervisorIOConfig()).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(tuningConfigWithAutoReset).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+ final TestSeekableStreamDataSourceMetadata storedMetadata = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(STREAM, storedOffsets)
+ );
+ // After resetting both stale partitions, no offsets remain in the
metadata store.
+ final TestSeekableStreamDataSourceMetadata expectedAfterReset = new
TestSeekableStreamDataSourceMetadata(
+ new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of())
+ );
+
+ EasyMock.reset(indexerMetadataStorageCoordinator);
+ // Called once in getOffsetsFromMetadataStorage() and once inside
resetInternal().
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(SUPERVISOR_ID))
+ .andReturn(storedMetadata)
+ .anyTimes();
+ // Must be called exactly once — not once per unavailable partition.
+ EasyMock.expect(
+
indexerMetadataStorageCoordinator.resetDataSourceMetadata(SUPERVISOR_ID,
expectedAfterReset)
+ ).andReturn(true).times(1);
+
+ EasyMock.reset(recordSupplier);
+ EasyMock.expect(recordSupplier.getAssignment())
+ .andReturn(ImmutableSet.of(SHARD0_PARTITION, shard1Partition))
+ .anyTimes();
+
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject()))
+ .andReturn("10")
+ .anyTimes();
+ EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
+ .andReturn(ImmutableSet.of(SHARD_ID, "1"))
+ .anyTimes();
+ // Both stored offsets are outside stream retention.
+ EasyMock.expect(recordSupplier.isOffsetAvailable(EasyMock.anyObject(),
EasyMock.anyObject()))
+ .andReturn(false)
+ .anyTimes();
+
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+
+ replayAll();
+
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor()
+ {
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String>
createDataSourceMetaDataForReset(
+ String stream,
+ Map<String, String> map
+ )
+ {
+ return new TestSeekableStreamDataSourceMetadata(new
SeekableStreamEndSequenceNumbers<>(stream, map));
+ }
+ };
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ // The StreamException is recorded exactly once — not once per unavailable
partition.
+ final List<SupervisorStateManager.ExceptionEvent> exceptionEvents =
supervisor.stateManager.getExceptionEvents();
+ Assert.assertEquals(1, exceptionEvents.size());
+ Assert.assertTrue(((SeekableStreamExceptionEvent)
exceptionEvents.get(0)).isStreamException());
+
+ // EasyMock validates that resetDataSourceMetadata was called exactly once
(see .times(1) above).
+ verifyAll();
+ }
+
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]