This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 97696dec04e Fix recurring bug "Inconsistency between stored metadata" 
during auto-scaling (#19034)
97696dec04e is described below

commit 97696dec04e4b4b7b31eb0c68cfcf0a1b84f8659
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Feb 20 12:26:30 2026 +0530

    Fix recurring bug "Inconsistency between stored metadata" during 
auto-scaling (#19034)
    
    Changes
    ---------
    - Do not clear `partitionOffsets` before auto-scaling so that subsequent 
tasks
    know where the previous tasks had left off.
    - Simplify the condition in `IndexerSQLMetadataStorageCoordinator`
    - Add some comments and javadocs
    - Update error message to be more user-friendly
    - Fixed `KafkaSupervisorTest` to not use reflection and updated test to 
verify
    clearing of `partitionOffsets`
---
 .../kafka/supervisor/KafkaSupervisorTest.java      | 20 +++++----------
 .../supervisor/SeekableStreamSupervisor.java       | 30 ++++++++++++++++++----
 .../IndexerSQLMetadataStorageCoordinator.java      | 19 +++++++++-----
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 12 ++++++---
 4 files changed, 51 insertions(+), 30 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f6a026e6aa7..4f6a93ce3a1 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,7 +71,6 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
-import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -123,7 +122,6 @@ import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -5218,12 +5216,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
         hasPendingTasks
     );
 
-    // Now call clearAllocationInfo() - this is where the bug was
-    // The bug was that this method cleared pendingCompletionTaskGroups
-    supervisor.testClearAllocationInfo();
+    // Clear the partition assignments (this is called when task count has 
changed)
+    supervisor.clearPartitionAssignmentsForScaling();
 
-    // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT 
empty after clearAllocationInfo
-    // This is the fix - clearAllocationInfo should preserve 
pendingCompletionTaskGroups
+    // Verify that pendingCompletionTaskGroups has not been cleared
     boolean stillHasPendingTasks = false;
     for (int groupId = 0; groupId < 3; groupId++) {
       if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
@@ -5243,6 +5239,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
         0,
         supervisor.getActivelyReadingTaskGroupsCount()
     );
+
+    // Verify that partitionOffsets have not been cleared either
+    Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty());
   }
 
   private void addSomeEvents(int numEventsPerPartition) throws Exception
@@ -5962,13 +5961,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
       CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
       return groups != null ? groups.size() : 0;
     }
-
-    public void testClearAllocationInfo() throws Exception
-    {
-      Method method = 
SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
-      method.setAccessible(true);
-      method.invoke(this);
-    }
   }
 
   private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends 
TestableKafkaSupervisor
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 40049967b2b..62c439e12b3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -585,7 +585,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
       gracefulShutdownInternal();
       changeTaskCountInIOConfig(desiredActiveTaskCount);
-      clearAllocationInfo();
+      clearPartitionAssignmentsForScaling();
       emitter.emit(ServiceMetricEvent.builder()
                                      .setDimension(DruidMetrics.SUPERVISOR_ID, 
supervisorId)
                                      .setDimension(DruidMetrics.DATASOURCE, 
dataSource)
@@ -621,18 +621,30 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   /**
-   * Clears allocation information including active task groups, partition 
groups, partition offsets, and partition IDs.
+   * Clears previous partition assignments in preparation for an upcoming 
scaling event.
    * <p>
    * Note: Does not clear {@link #pendingCompletionTaskGroups} so that the 
supervisor remembers that these
    * tasks are publishing and auto-scaler does not repeatedly attempt a scale 
down until these tasks
    * complete. If this is cleared, the next {@link #discoverTasks()} might add 
these tasks to
    * {@link #activelyReadingTaskGroups}.
+   * <p>
+   * Also does not clear {@link #partitionOffsets} so that the new tasks 
remember
+   * where the previous tasks had left off.
+   * <p>
+   * Since both of these are in-memory structures, a change in Overlord 
leadership
+   * might cause duplicate scaling actions and/or intermittent task failures if
+   * the new generation tasks start with stale offsets.
    */
-  private void clearAllocationInfo()
+  @VisibleForTesting
+  public void clearPartitionAssignmentsForScaling()
   {
+    // All previous tasks should now be publishing and not actively reading 
anymore
     activelyReadingTaskGroups.clear();
+
+    // partitionGroups will change as taskCount has changed due to scaling
     partitionGroups.clear();
-    partitionOffsets.clear();
+
+    // partitionIds will be rediscovered from the stream and assigned to 
respective taskGroups
     partitionIds.clear();
   }
 
@@ -3337,6 +3349,13 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  /**
+   * Checks the duration of {@link #activelyReadingTaskGroups}, requests them
+   * to checkpoint themselves if they have exceeded the specified run duration
+   * or if early stop has been requested. If checkpoint is successful, the
+   * {@link #partitionOffsets} are updated and checkpointed tasks are moved to
+   * {@link #pendingCompletionTaskGroups}.
+   */
   private void checkTaskDuration() throws ExecutionException, 
InterruptedException
   {
     final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> 
futures = new ArrayList<>();
@@ -3463,7 +3482,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         changeTaskCountInIOConfig(rolloverTaskCount);
         // Here force reset the supervisor state to be re-calculated on the 
next iteration of runInternal() call.
         // This seems the best way to inject task amount recalculation during 
the rollover.
-        clearAllocationInfo();
+        clearPartitionAssignmentsForScaling();
 
         ServiceMetricEvent.Builder event = ServiceMetricEvent
             .builder()
@@ -3976,6 +3995,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               .collect(Collectors.toSet());
         }
 
+        log.info("Initializing taskGroup[%d] with startingOffsets[%s].", 
groupId, simpleStartingOffsets);
         activelyReadingTaskGroups.put(
             groupId,
             new TaskGroup(
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 4a72e01e7c4..dc0db5b5683 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2310,8 +2310,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       startMetadataMatchesExisting = 
startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
     }
 
-    if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
-      // Offsets stored in startMetadata is greater than the last commited 
metadata.
+    if (startMetadataMatchesExisting) {
+      // Proceed with the commit
+    } else if (startMetadataGreaterThanExisting) {
+      // Offsets stored in startMetadata is greater than the last committed 
metadata.
       // This can happen because the previous task is still publishing its 
segments and can resolve once
       // the previous task finishes publishing.
       return SegmentPublishResult.retryableFailure(
@@ -2319,12 +2321,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           + " end state[%s]. Try resetting the supervisor.",
           startMetadata, oldCommitMetadataFromDb
       );
-    }
-
-    if (!startMetadataMatchesExisting) {
-      // Not in the desired start state.
+    } else {
+      // startMetadata is older than committed metadata
+      // The task trying to publish is probably a replica trying to commit 
offsets already published by another task.
+      // OR the metadata has been updated manually
       return SegmentPublishResult.fail(
-          "Inconsistency between stored metadata state[%s] and target 
state[%s]. Try resetting the supervisor.",
+          "Stored metadata state[%s] has already been updated by other tasks 
and"
+          + " has diverged from the expected start metadata state[%s]."
+          + " This task will be replaced by the supervisor with a new task 
using updated start offsets."
+          + " Try resetting the supervisor if the issue persists.",
           oldCommitMetadataFromDb, startMetadata
       );
     }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0beecb31895..9add89949c8 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -945,8 +945,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}]"
-            + " and target state[ObjectMetadata{theObject=null}]. Try 
resetting the supervisor."
+            "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has 
already been updated by other tasks"
+            + " and has diverged from the expected start metadata 
state[ObjectMetadata{theObject=null}]."
+            + " This task will be replaced by the supervisor with a new task 
using updated start offsets."
+            + " Try resetting the supervisor if the issue persists."
         ),
         result2
     );
@@ -1093,8 +1095,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            "Inconsistency between stored metadata 
state[ObjectMetadata{theObject={foo=baz}}] and "
-            + "target state[ObjectMetadata{theObject={foo=qux}}]. Try 
resetting the supervisor."
+            "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has 
already been updated by other tasks"
+            + " and has diverged from the expected start metadata 
state[ObjectMetadata{theObject={foo=qux}}]."
+            + " This task will be replaced by the supervisor with a new task 
using updated start offsets."
+            + " Try resetting the supervisor if the issue persists."
         ),
         result2
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to