This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 99c97432ff2 perf: batch reset invalid stream offsets in stream 
supervisor (#19431)
99c97432ff2 is described below

commit 99c97432ff265eb4892e2cb9eeeac7d73bdd748d
Author: jtuglu1 <[email protected]>
AuthorDate: Mon May 11 15:16:24 2026 -0700

    perf: batch reset invalid stream offsets in stream supervisor (#19431)
    
    I've noticed that recent Druid can be extremely slow to recover from a 
lagging supervisor (where offsets are invalid and 
tuningConfig.resetOffsetAutomatically=true). For each taskGroup, identify + 
reset all invalid partition offsets in one go. Once we have established the set 
of invalid offsets, perform an internal reset, then throw the exception. This 
reduces the time-to-recovery of a fatally-lagged supervisor from N 
runInternal() calls to 1 runInternal() call, where N is the # of in [...]
---
 .../kafka/supervisor/KafkaSupervisorTest.java      |   7 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   5 +-
 .../supervisor/SeekableStreamSupervisor.java       |  66 ++++++---
 .../SeekableStreamSupervisorStateTest.java         | 163 +++++++++++++++++++++
 4 files changed, 215 insertions(+), 26 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index ddd4e6053a8..c629aa46192 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -3749,14 +3749,13 @@ public class KafkaSupervisorTest extends EasyMockSupport
                     new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 1, -100L, 2, 200L))
                 )
             ).times(3);
-    // getOffsetFromStorageForPartition() throws an exception when the offsets 
are automatically reset.
-    // Since getOffsetFromStorageForPartition() is called per partition, all 
partitions can't be reset at the same time.
-    // Instead, subsequent partitions will be reset in the following 
supervisor runs.
+    // All unavailable partitions are collected in a single pass and reset 
together in one resetInternal() call.
+    // Only partition 1 (-100L) is unavailable (below earliest=0); partition 2 
(200L) is valid since
+    // Kafka only checks offset >= earliest.
     EasyMock.expect(
         indexerMetadataStorageCoordinator.resetDataSourceMetadata(
             DATASOURCE,
             new KafkaDataSourceMetadata(
-                // Only one partition is reset in a single supervisor run.
                 new SeekableStreamEndSequenceNumbers<>(topic, 
singlePartitionMap(topic, 2, 200L))
             )
         )
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 522aebad4cf..cf91c37e375 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -2847,15 +2847,12 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
         .andReturn(true)
         .times(1);
 
-    // getOffsetFromStorageForPartition() throws an exception when the offsets 
are automatically reset.
-    // Since getOffsetFromStorageForPartition() is called per partition, all 
partitions can't be reset at the same time.
-    // Instead, subsequent partitions will be reset in the following 
supervisor runs.
+    // All unavailable partitions are collected in a single pass and reset 
together in one resetInternal() call.
     EasyMock
         .expect(
             indexerMetadataStorageCoordinator.resetDataSourceMetadata(
                 DATASOURCE,
                 new KinesisDataSourceMetadata(
-                    // Only one partition is reset in a single supervisor run.
                     new SeekableStreamEndSequenceNumbers<>(STREAM, 
ImmutableMap.of())
                 )
             )
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 395b5da90ce..6a3207e6baa 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4099,6 +4099,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
 
     // check that there is a current task group for each group of partitions 
in [partitionGroups]
+    // Fetch metadata offsets once and collect all stale partitions across all 
groups before committing
+    // any state changes. New TaskGroups are staged locally and only written 
to activelyReadingTaskGroups
+    // if no reset is required, keeping state consistent: either all new 
groups are committed or none are.
+    final Map<PartitionIdType, SequenceOffsetType> metadataOffsets = 
getOffsetsFromMetadataStorage();
+    final Map<PartitionIdType, SequenceOffsetType> partitionsToReset = new 
HashMap<>();
+    final Map<Integer, TaskGroup> newTaskGroups = new HashMap<>();
     for (Integer groupId : partitionGroups.keySet()) {
       if (!activelyReadingTaskGroups.containsKey(groupId)) {
         log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, 
partitionGroups.get(groupId));
@@ -4118,7 +4124,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                             : null;
 
         final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
unfilteredStartingOffsets =
-            generateStartingSequencesForPartitionGroup(groupId);
+            generateStartingSequencesForPartitionGroup(groupId, 
metadataOffsets, partitionsToReset);
 
         final Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
startingOffsets;
         if (supportsPartitionExpiration()) {
@@ -4167,8 +4173,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               .collect(Collectors.toSet());
         }
 
-        log.info("Initializing taskGroup[%d] with startingOffsets[%s].", 
groupId, simpleStartingOffsets);
-        activelyReadingTaskGroups.put(
+        newTaskGroups.put(
             groupId,
             new TaskGroup(
                 groupId,
@@ -4182,6 +4187,22 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       }
     }
 
+    // If any partitions need a reset, issue a single batch reset.
+    if (!partitionsToReset.isEmpty()) {
+      resetInternal(createDataSourceMetaDataForReset(ioConfig.getStream(), 
partitionsToReset));
+      throw new StreamException(
+          new ISE(
+              "Previous sequenceNumbers %s are no longer available - 
automatically resetting sequences",
+              partitionsToReset
+          )
+      );
+    }
+
+    for (Entry<Integer, TaskGroup> entry : newTaskGroups.entrySet()) {
+      log.info("Initializing taskGroup[%d] with startingOffsets[%s].", 
entry.getKey(), entry.getValue().startingSequences);
+      activelyReadingTaskGroups.put(entry.getKey(), entry.getValue());
+    }
+
     // iterate through all the current task groups and make sure each one has 
the desired number of replica tasks
     boolean createdTask = false;
     for (Entry<Integer, TaskGroup> entry : 
activelyReadingTaskGroups.entrySet()) {
@@ -4233,12 +4254,23 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return notices.size();
   }
 
+  /**
+   * Builds the starting sequence map for one partition group.
+   *
+   * <p>Stale partitions (whose stored offsets are no longer available) are 
added to
+   * {@code partitionsToReset} instead of the result map. The caller is 
responsible for
+   * issuing the single batch reset after all groups have been processed.
+   *
+   * @param metadataOffsets pre-fetched metadata offsets shared across all 
groups in this run
+   * @param partitionsToReset accumulator for partitions that need to be 
reset; mutated in-place
+   */
   private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> 
generateStartingSequencesForPartitionGroup(
-      int groupId
+      int groupId,
+      Map<PartitionIdType, SequenceOffsetType> metadataOffsets,
+      Map<PartitionIdType, SequenceOffsetType> partitionsToReset
   )
   {
     ImmutableMap.Builder<PartitionIdType, 
OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
-    final Map<PartitionIdType, SequenceOffsetType> metadataOffsets = 
getOffsetsFromMetadataStorage();
     for (PartitionIdType partitionId : partitionGroups.get(groupId)) {
       SequenceOffsetType sequence = partitionOffsets.get(partitionId);
 
@@ -4252,7 +4284,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         // get the sequence from metadata storage (if available) or 
Kafka/Kinesis (otherwise)
         OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = 
getOffsetFromStorageForPartition(
             partitionId,
-            metadataOffsets
+            metadataOffsets,
+            partitionsToReset
         );
 
         if (offsetFromStorage != null) {
@@ -4267,10 +4300,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * Queries the dataSource metadata table to see if there is a previous 
ending sequence for this partition. If it
    * doesn't find any data, it will retrieve the latest or earliest 
Kafka/Kinesis sequence depending on the
    * {@link SeekableStreamSupervisorIOConfig#useEarliestSequenceNumber}.
+   *
+   * When {@code resetOffsetAutomatically} is enabled and the stored offset is 
no longer available, the partition is
+   * added to {@code partitionsToReset} and {@code null} is returned. The 
caller is responsible for performing the
+   * batch reset after iterating all partitions.
    */
+  @Nullable
   private OrderedSequenceNumber<SequenceOffsetType> 
getOffsetFromStorageForPartition(
       PartitionIdType partition,
-      final Map<PartitionIdType, SequenceOffsetType> metadataOffsets
+      final Map<PartitionIdType, SequenceOffsetType> metadataOffsets,
+      final Map<PartitionIdType, SequenceOffsetType> partitionsToReset
   )
   {
     SequenceOffsetType sequence = metadataOffsets.get(partition);
@@ -4279,17 +4318,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       if (!taskTuningConfig.isSkipSequenceNumberAvailabilityCheck()) {
         if (!checkOffsetAvailability(partition, sequence)) {
           if (taskTuningConfig.isResetOffsetAutomatically()) {
-            resetInternal(
-                createDataSourceMetaDataForReset(ioConfig.getStream(), 
ImmutableMap.of(partition, sequence))
-            );
-            throw new StreamException(
-                new ISE(
-                    "Previous sequenceNumber [%s] is no longer available for 
partition [%s] - automatically resetting"
-                    + " sequence",
-                    sequence,
-                    partition
-                )
-            );
+            partitionsToReset.put(partition, sequence);
+            return null;
           } else {
             throw new StreamException(
                 new ISE(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 4266a002b84..b0b7954bb12 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2886,6 +2886,169 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  /**
+   * Verifies that when N stored offsets are all outside stream retention, all 
N partitions are collected in a single
+   * pass and reset together in one {@code resetInternal()} call, producing 
exactly one exception event.
+   */
+  @Test
+  public void testBatchResetOfMultipleUnavailableOffsets() throws IOException
+  {
+    final StreamPartition<String> shard1Partition = StreamPartition.of(STREAM, 
"1");
+    final ImmutableMap<String, String> storedOffsets = ImmutableMap.of("0", 
"5", "1", "10");
+
+    final SeekableStreamSupervisorTuningConfig tuningConfigWithAutoReset = new 
SeekableStreamSupervisorTuningConfig()
+    {
+      @Override
+      public Integer getWorkerThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Long getChatRetries()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Duration getHttpTimeout()
+      {
+        return new Period("PT1M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getShutdownTimeout()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+
+      @Override
+      public Duration getRepartitionTransitionDuration()
+      {
+        return new Period("PT2M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getOffsetFetchPeriod()
+      {
+        return new Period("PT5M").toStandardDuration();
+      }
+
+      @Override
+      public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+      {
+        return new SeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        )
+        {
+          @Override
+          public SeekableStreamIndexTaskTuningConfig 
withBasePersistDirectory(File dir)
+          {
+            return null;
+          }
+
+          @Override
+          public String toString()
+          {
+            return null;
+          }
+        };
+      }
+    };
+
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(createSupervisorIOConfig()).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(tuningConfigWithAutoReset).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+    final TestSeekableStreamDataSourceMetadata storedMetadata = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(STREAM, storedOffsets)
+    );
+    // After resetting both stale partitions, no offsets remain in the 
metadata store.
+    final TestSeekableStreamDataSourceMetadata expectedAfterReset = new 
TestSeekableStreamDataSourceMetadata(
+        new SeekableStreamEndSequenceNumbers<>(STREAM, ImmutableMap.of())
+    );
+
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    // Called once in getOffsetsFromMetadataStorage() and once inside 
resetInternal().
+    
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(SUPERVISOR_ID))
+            .andReturn(storedMetadata)
+            .anyTimes();
+    // Must be called exactly once — not once per unavailable partition.
+    EasyMock.expect(
+        
indexerMetadataStorageCoordinator.resetDataSourceMetadata(SUPERVISOR_ID, 
expectedAfterReset)
+    ).andReturn(true).times(1);
+
+    EasyMock.reset(recordSupplier);
+    EasyMock.expect(recordSupplier.getAssignment())
+            .andReturn(ImmutableSet.of(SHARD0_PARTITION, shard1Partition))
+            .anyTimes();
+    
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject()))
+            .andReturn("10")
+            .anyTimes();
+    EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
+            .andReturn(ImmutableSet.of(SHARD_ID, "1"))
+            .anyTimes();
+    // Both stored offsets are outside stream retention.
+    EasyMock.expect(recordSupplier.isOffsetAvailable(EasyMock.anyObject(), 
EasyMock.anyObject()))
+            .andReturn(false)
+            .anyTimes();
+
+    
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor()
+    {
+      @Override
+      protected SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetaDataForReset(
+          String stream,
+          Map<String, String> map
+      )
+      {
+        return new TestSeekableStreamDataSourceMetadata(new 
SeekableStreamEndSequenceNumbers<>(stream, map));
+      }
+    };
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    // The StreamException is recorded exactly once — not once per unavailable 
partition.
+    final List<SupervisorStateManager.ExceptionEvent> exceptionEvents = 
supervisor.stateManager.getExceptionEvents();
+    Assert.assertEquals(1, exceptionEvents.size());
+    Assert.assertTrue(((SeekableStreamExceptionEvent) 
exceptionEvents.get(0)).isStreamException());
+
+    // EasyMock validates that resetDataSourceMetadata was called exactly once 
(see .times(1) above).
+    verifyAll();
+  }
+
   private static DataSchema getDataSchema()
   {
     List<DimensionSchema> dimensions = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to