This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new a1766e02b68 KAFKA-17506 KRaftMigrationDriver initialization race
(#17147)
a1766e02b68 is described below
commit a1766e02b6844610f7e11699617d53a04fe8e516
Author: David Arthur <[email protected]>
AuthorDate: Wed Sep 11 10:41:49 2024 -0400
KAFKA-17506 KRaftMigrationDriver initialization race (#17147)
There is a race condition between KRaftMigrationDriver running its first
poll() and being notified by Raft about a leader change. If onControllerChange
is called before RecoverMigrationStateFromZKEvent is run, we will end up
getting stuck in the INACTIVE state.
This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent
from onControllerChange if the driver has not yet initialized. If another
RecoverMigrationStateFromZKEvent was already enqueued, the second one to run
will just be ignored.
Reviewers: Luke Chen <[email protected]>
---
.../metadata/migration/KRaftMigrationDriver.java | 7 ++++--
.../migration/KRaftMigrationDriverTest.java | 26 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 7cf82b8762c..a85679ada64 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -331,6 +331,9 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
@Override
public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
+ if (migrationState.equals(MigrationDriverState.UNINITIALIZED)) {
+ eventQueue.append(new RecoverMigrationStateFromZKEvent());
+ }
eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
}
@@ -473,8 +476,8 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";
- if (migrationState.equals(MigrationDriverState.INACTIVE)) {
- // No need to log anything if this node is not the active
controller
+ if (EnumSet.of(MigrationDriverState.UNINITIALIZED,
MigrationDriverState.INACTIVE).contains(migrationState)) {
+ // No need to log anything if this node is not the active
controller or the driver has not initialized
completionHandler.accept(null);
return;
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index f7bc07871dc..9792537c5c1 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -231,6 +231,32 @@ public class KRaftMigrationDriverTest {
return future;
}
+ @Test
+ public void testOnControllerChangeWhenUninitialized() throws
InterruptedException {
+ CountingMetadataPropagator metadataPropagator = new
CountingMetadataPropagator();
+ CapturingMigrationClient.newBuilder().build();
+ CapturingMigrationClient migrationClient =
CapturingMigrationClient.newBuilder().build();
+ MockFaultHandler faultHandler = new
MockFaultHandler("testBecomeLeaderUninitialized");
+ KRaftMigrationDriver.Builder builder = defaultTestBuilder()
+ .setZkMigrationClient(migrationClient)
+ .setPropagator(metadataPropagator)
+ .setFaultHandler(faultHandler);
+ try (KRaftMigrationDriver driver = builder.build()) {
+ // Fake a complete migration with ZK client
+ migrationClient.setMigrationRecoveryState(
+
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
+
+ // simulate the Raft layer running before the driver has fully
started.
+ driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000),
1));
+
+ // start up the driver. this will enqueue a poll event. once run,
this will enqueue a recovery event
+ driver.start();
+
+ // Even though we contrived a race above, the driver still makes
it past initialization.
+ TestUtils.waitForCondition(() -> driver.migrationState().get(30,
TimeUnit.SECONDS).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
+ "Waiting for KRaftMigrationDriver to enter
WAIT_FOR_CONTROLLER_QUORUM state");
+ }
+ }
/**
* Don't send RPCs to brokers for every metadata change, only when brokers
or topics change.
* This is a regression test for KAFKA-14668