This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 97696dec04e Fix recurring bug "Inconsistency between stored metadata"
during auto-scaling (#19034)
97696dec04e is described below
commit 97696dec04e4b4b7b31eb0c68cfcf0a1b84f8659
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Feb 20 12:26:30 2026 +0530
Fix recurring bug "Inconsistency between stored metadata" during
auto-scaling (#19034)
Changes
---------
- Do not clear `partitionOffsets` before auto-scaling so that subsequent
tasks
know where the previous tasks had left off.
- Simplify the condition in `IndexerSQLMetadataStorageCoordinator`
- Add some comments and javadocs
- Update error message to be more user-friendly
- Fixed `KafkaSupervisorTest` to not use reflection and updated test to
verify
clearing of `partitionOffsets`
---
.../kafka/supervisor/KafkaSupervisorTest.java | 20 +++++----------
.../supervisor/SeekableStreamSupervisor.java | 30 ++++++++++++++++++----
.../IndexerSQLMetadataStorageCoordinator.java | 19 +++++++++-----
.../IndexerSQLMetadataStorageCoordinatorTest.java | 12 ++++++---
4 files changed, 51 insertions(+), 30 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f6a026e6aa7..4f6a93ce3a1 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,7 +71,6 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
-import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -123,7 +122,6 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -5218,12 +5216,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
hasPendingTasks
);
- // Now call clearAllocationInfo() - this is where the bug was
- // The bug was that this method cleared pendingCompletionTaskGroups
- supervisor.testClearAllocationInfo();
+ // Clear the partition assignments (this is called when task count has
changed)
+ supervisor.clearPartitionAssignmentsForScaling();
- // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT
empty after clearAllocationInfo
- // This is the fix - clearAllocationInfo should preserve
pendingCompletionTaskGroups
+ // Verify that pendingCompletionTaskGroups has not been cleared
boolean stillHasPendingTasks = false;
for (int groupId = 0; groupId < 3; groupId++) {
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
@@ -5243,6 +5239,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
0,
supervisor.getActivelyReadingTaskGroupsCount()
);
+
+ // Verify that partitionOffsets have not been cleared either
+ Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty());
}
private void addSomeEvents(int numEventsPerPartition) throws Exception
@@ -5962,13 +5961,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
return groups != null ? groups.size() : 0;
}
-
- public void testClearAllocationInfo() throws Exception
- {
- Method method =
SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
- method.setAccessible(true);
- method.invoke(this);
- }
}
private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends
TestableKafkaSupervisor
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 40049967b2b..62c439e12b3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -585,7 +585,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
- clearAllocationInfo();
+ clearPartitionAssignmentsForScaling();
emitter.emit(ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID,
supervisorId)
.setDimension(DruidMetrics.DATASOURCE,
dataSource)
@@ -621,18 +621,30 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
/**
- * Clears allocation information including active task groups, partition
groups, partition offsets, and partition IDs.
+ * Clears previous partition assignments in preparation for an upcoming
scaling event.
* <p>
* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the
supervisor remembers that these
* tasks are publishing and auto-scaler does not repeatedly attempt a scale
down until these tasks
* complete. If this is cleared, the next {@link #discoverTasks()} might add
these tasks to
* {@link #activelyReadingTaskGroups}.
+ * <p>
+ * Also does not clear {@link #partitionOffsets} so that the new tasks
remember
+ * where the previous tasks had left off.
+ * <p>
+ * Since both of these are in-memory structures, a change in Overlord
leadership
+ * might cause duplicate scaling actions and/or intermittent task failures if
+ * the new generation tasks start with stale offsets.
*/
- private void clearAllocationInfo()
+ @VisibleForTesting
+ public void clearPartitionAssignmentsForScaling()
{
+ // All previous tasks should now be publishing and not actively reading
anymore
activelyReadingTaskGroups.clear();
+
+ // partitionGroups will change as taskCount has changed due to scaling
partitionGroups.clear();
- partitionOffsets.clear();
+
+ // partitionIds will be rediscovered from the stream and assigned to
respective taskGroups
partitionIds.clear();
}
@@ -3337,6 +3349,13 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * Checks the duration of {@link #activelyReadingTaskGroups}, requests them
+ * to checkpoint themselves if they have exceeded the specified run duration
+ * or if early stop has been requested. If checkpoint is successful, the
+ * {@link #partitionOffsets} are updated and checkpointed tasks are moved to
+ * {@link #pendingCompletionTaskGroups}.
+ */
private void checkTaskDuration() throws ExecutionException,
InterruptedException
{
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>>
futures = new ArrayList<>();
@@ -3463,7 +3482,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the
next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during
the rollover.
- clearAllocationInfo();
+ clearPartitionAssignmentsForScaling();
ServiceMetricEvent.Builder event = ServiceMetricEvent
.builder()
@@ -3976,6 +3995,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.collect(Collectors.toSet());
}
+ log.info("Initializing taskGroup[%d] with startingOffsets[%s].",
groupId, simpleStartingOffsets);
activelyReadingTaskGroups.put(
groupId,
new TaskGroup(
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 4a72e01e7c4..dc0db5b5683 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2310,8 +2310,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
startMetadataMatchesExisting =
startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}
- if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
- // Offsets stored in startMetadata is greater than the last commited
metadata.
+ if (startMetadataMatchesExisting) {
+ // Proceed with the commit
+ } else if (startMetadataGreaterThanExisting) {
+ // Offsets stored in startMetadata is greater than the last committed
metadata.
// This can happen because the previous task is still publishing its
segments and can resolve once
// the previous task finishes publishing.
return SegmentPublishResult.retryableFailure(
@@ -2319,12 +2321,15 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
+ " end state[%s]. Try resetting the supervisor.",
startMetadata, oldCommitMetadataFromDb
);
- }
-
- if (!startMetadataMatchesExisting) {
- // Not in the desired start state.
+ } else {
+ // startMetadata is older than committed metadata
+ // The task trying to publish is probably a replica trying to commit
offsets already published by another task.
+ // OR the metadata has been updated manually
return SegmentPublishResult.fail(
- "Inconsistency between stored metadata state[%s] and target
state[%s]. Try resetting the supervisor.",
+ "Stored metadata state[%s] has already been updated by other tasks
and"
+ + " has diverged from the expected start metadata state[%s]."
+ + " This task will be replaced by the supervisor with a new task
using updated start offsets."
+ + " Try resetting the supervisor if the issue persists.",
oldCommitMetadataFromDb, startMetadata
);
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0beecb31895..9add89949c8 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -945,8 +945,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
Assert.assertEquals(
SegmentPublishResult.fail(
- "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}]"
- + " and target state[ObjectMetadata{theObject=null}]. Try
resetting the supervisor."
+ "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has
already been updated by other tasks"
+ + " and has diverged from the expected start metadata
state[ObjectMetadata{theObject=null}]."
+ + " This task will be replaced by the supervisor with a new task
using updated start offsets."
+ + " Try resetting the supervisor if the issue persists."
),
result2
);
@@ -1093,8 +1095,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
Assert.assertEquals(
SegmentPublishResult.fail(
- "Inconsistency between stored metadata
state[ObjectMetadata{theObject={foo=baz}}] and "
- + "target state[ObjectMetadata{theObject={foo=qux}}]. Try
resetting the supervisor."
+ "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has
already been updated by other tasks"
+ + " and has diverged from the expected start metadata
state[ObjectMetadata{theObject={foo=qux}}]."
+ + " This task will be replaced by the supervisor with a new task
using updated start offsets."
+ + " Try resetting the supervisor if the issue persists."
),
result2
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]