This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 38ce4a191f4 KAFKA-17506 KRaftMigrationDriver initialization race 
(#17147)
38ce4a191f4 is described below

commit 38ce4a191f4770e5436130dc614c60b0c61cc18e
Author: David Arthur <[email protected]>
AuthorDate: Wed Sep 11 10:41:49 2024 -0400

    KAFKA-17506 KRaftMigrationDriver initialization race (#17147)
    
    There is a race condition between KRaftMigrationDriver running its first 
poll() and being notified by Raft about a leader change. If onControllerChange 
is called before RecoverMigrationStateFromZKEvent is run, we will end up 
getting stuck in the INACTIVE state.
    
    This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent 
from onControllerChange if the driver has not yet initialized. If another 
RecoverMigrationStateFromZKEvent was already enqueued, the second one to run 
will just be ignored.
    
    Reviewers: Luke Chen <[email protected]>
---
 .../metadata/migration/KRaftMigrationDriver.java   |  7 ++++--
 .../migration/KRaftMigrationDriverTest.java        | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 7cf82b8762c..a85679ada64 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -331,6 +331,9 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
 
     @Override
     public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
+        if (migrationState.equals(MigrationDriverState.UNINITIALIZED)) {
+            eventQueue.append(new RecoverMigrationStateFromZKEvent());
+        }
         eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
     }
 
@@ -473,8 +476,8 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             KRaftMigrationDriver.this.image = image;
             String metadataType = isSnapshot ? "snapshot" : "delta";
 
-            if (migrationState.equals(MigrationDriverState.INACTIVE)) {
-                // No need to log anything if this node is not the active 
controller
+            if (EnumSet.of(MigrationDriverState.UNINITIALIZED, 
MigrationDriverState.INACTIVE).contains(migrationState)) {
+                // No need to log anything if this node is not the active 
controller or the driver has not initialized
                 completionHandler.accept(null);
                 return;
             }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index 14f347ca1e1..6d3c7b078d8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -231,6 +231,32 @@ public class KRaftMigrationDriverTest {
         return future;
     }
 
+    @Test
+    public void testOnControllerChangeWhenUninitialized() throws 
InterruptedException {
+        CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
+        CapturingMigrationClient.newBuilder().build();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder().build();
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testBecomeLeaderUninitialized");
+        KRaftMigrationDriver.Builder builder = defaultTestBuilder()
+            .setZkMigrationClient(migrationClient)
+            .setPropagator(metadataPropagator)
+            .setFaultHandler(faultHandler);
+        try (KRaftMigrationDriver driver = builder.build()) {
+            // Fake a complete migration with ZK client
+            migrationClient.setMigrationRecoveryState(
+                    
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
+
+            // simulate the Raft layer running before the driver has fully 
started.
+            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 
1));
+
+            // start up the driver. this will enqueue a poll event. once run, 
this will enqueue a recovery event
+            driver.start();
+
+            // Even though we contrived a race above, the driver still makes 
it past initialization.
+            TestUtils.waitForCondition(() -> driver.migrationState().get(30, 
TimeUnit.SECONDS).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
+                "Waiting for KRaftMigrationDriver to enter 
WAIT_FOR_CONTROLLER_QUORUM state");
+        }
+    }
     /**
      * Don't send RPCs to brokers for every metadata change, only when brokers 
or topics change.
      * This is a regression test for KAFKA-14668

Reply via email to