This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 5d3e691e47b KAFKA-16171: Fix ZK migration controller race #15238
5d3e691e47b is described below

commit 5d3e691e47ba35b1cac481bcf158890413f19e3c
Author: David Arthur <[email protected]>
AuthorDate: Fri Jan 19 13:38:37 2024 -0500

    KAFKA-16171: Fix ZK migration controller race #15238
    
    This patch causes the active KRaftMigrationDriver to reload the /migration 
ZK state after electing
    itself as the leader in ZK. This closes a race condition where the previous 
active controller could
    make an update to /migration after the new leader was elected. The update 
race was not actually a
    problem regarding the data since both controllers would be syncing the same 
state from KRaft to ZK,
    but the change to the znode causes the new controller to fail on the zk 
version check on
    /migration.
    
    This patch also fixes a as-yet-unseen bug where the active controllers 
failing to elect itself via
    claimControllerLeadership would not retry.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../kafka/zk/ZkMigrationFailoverTest.scala         | 284 +++++++++++++++++++++
 .../metadata/migration/KRaftMigrationDriver.java   |  14 +-
 .../migration/ZkMigrationLeadershipState.java      |   9 +-
 .../migration/CapturingMigrationClient.java        |  12 +-
 4 files changed, 312 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
new file mode 100644
index 00000000000..1b297f4fc9b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk
+
+import kafka.utils.{Logging, PasswordEncoder, TestUtils}
+import org.apache.kafka.clients.ApiVersions
+import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.metadata.{FeatureLevelRecord, TopicRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.controller.QuorumFeatures
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics
+import org.apache.kafka.image.loader.LogDeltaManifest
+import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.KafkaConfigSchema
+import org.apache.kafka.metadata.migration._
+import org.apache.kafka.raft.{LeaderAndEpoch, OffsetAndEpoch}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.FaultHandler
+import org.apache.zookeeper.client.ZKClientConfig
+import org.junit.jupiter.api.Assertions.{assertTrue, fail}
+import org.junit.jupiter.api.Test
+
+import java.util
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.{Optional, OptionalInt}
+import scala.collection.mutable
+
+class ZkMigrationFailoverTest extends Logging {
+
+  class CapturingFaultHandler(nodeId: Int) extends FaultHandler {
+    val faults = mutable.Buffer[Throwable]()
+    var future: CompletableFuture[Throwable] = 
CompletableFuture.completedFuture(new RuntimeException())
+    var waitingForMsg = ""
+
+    override def handleFault(failureMessage: String, cause: Throwable): 
RuntimeException = {
+      error(s"Fault handled on node $nodeId", cause)
+      faults.append(cause)
+      if (!future.isDone && cause.getMessage.contains(waitingForMsg)) {
+        future.complete(cause)
+      }
+      new RuntimeException(cause)
+    }
+
+    def checkAndClear(verifier: (Seq[Throwable]) => Unit): Unit = {
+      val faultsSoFar = faults.toSeq
+      try {
+        verifier.apply(faultsSoFar)
+      } catch {
+        case ae: AssertionError => fail(s"Assertion failed. Faults on $nodeId 
were: $faultsSoFar", ae)
+      }
+    }
+
+    def waitForError(message: String): CompletableFuture[Throwable] = {
+      future = new CompletableFuture[Throwable]()
+      waitingForMsg = message
+      future
+    }
+  }
+
+  def buildMigrationDriver(nodeId: Int, zkMigrationClient: ZkMigrationClient): 
(KRaftMigrationDriver, CapturingFaultHandler) = {
+    val faultHandler = new CapturingFaultHandler(nodeId)
+    val driver = KRaftMigrationDriver.newBuilder
+      .setNodeId(nodeId)
+      .setZkRecordConsumer(new ZkRecordConsumer {
+        override def beginMigration(): CompletableFuture[_] = ???
+
+        override def acceptBatch(recordBatch: 
util.List[ApiMessageAndVersion]): CompletableFuture[_] = ???
+
+        override def completeMigration(): CompletableFuture[OffsetAndEpoch] = 
???
+
+        override def abortMigration(): Unit = ???
+      })
+      .setInitialZkLoadHandler((_: MetadataPublisher) => {})
+      .setZkMigrationClient(zkMigrationClient)
+      .setFaultHandler(faultHandler)
+      .setQuorumFeatures(QuorumFeatures.create(nodeId,
+        new ApiVersions(),
+        QuorumFeatures.defaultFeatureMap(),
+        util.Arrays.asList(
+          new Node(3000, "localhost", 3000),
+          new Node(3001, "localhost", 3001),
+          new Node(3002, "localhost", 3002)
+        )))
+      .setConfigSchema(KafkaConfigSchema.EMPTY)
+      .setControllerMetrics(new QuorumControllerMetrics(Optional.empty(), 
Time.SYSTEM, true))
+      .setTime(Time.SYSTEM)
+      .setPropagator(new LegacyPropagator() {
+        override def startup(): Unit = ???
+
+        override def shutdown(): Unit = ???
+
+        override def publishMetadata(image: MetadataImage): Unit = ???
+
+        override def sendRPCsToBrokersFromMetadataDelta(delta: MetadataDelta, 
image: MetadataImage, zkControllerEpoch: Int): Unit = {
+
+        }
+
+        override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, 
zkControllerEpoch: Int): Unit = {
+
+        }
+
+        override def clear(): Unit = ???
+      })
+      .build()
+    (driver, faultHandler)
+  }
+
+  def readMigrationZNode(zkMigrationClient: ZkMigrationClient): 
ZkMigrationLeadershipState = {
+    
zkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY)
+  }
+
+  def safeGet[T](future: CompletableFuture[T]): T = {
+    future.get(10, TimeUnit.SECONDS)
+  }
+
+  @Test
+  def testControllerFailoverZkRace(): Unit = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
+    try {
+      zkClient = KafkaZkClient(
+        zkConnect,
+        isSecure = false,
+        30000,
+        60000,
+        1,
+        Time.SYSTEM,
+        name = "ZkMigrationFailoverTest",
+        new ZKClientConfig)
+    } catch {
+      case t: Throwable =>
+        Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+        zookeeper.shutdown()
+        if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+        throw t
+    }
+
+    // Safe to reuse these since they don't keep any state
+    val zkMigrationClient = ZkMigrationClient(zkClient, PasswordEncoder.noop())
+
+    val (driver1, faultHandler1) = buildMigrationDriver(3000, 
zkMigrationClient)
+    val (driver2, faultHandler2) = buildMigrationDriver(3001, 
zkMigrationClient)
+
+    // Initialize data into /controller and /controller_epoch
+    zkClient.registerControllerAndIncrementControllerEpoch(0)
+    var zkState = zkMigrationClient.claimControllerLeadership(
+      ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
+    )
+
+    // Fake a complete migration
+    zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
+    zkState = zkMigrationClient.getOrCreateMigrationRecoveryState(zkState)
+
+    try {
+      driver1.start()
+      driver2.start()
+
+      val newLeader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
+      var image1 = MetadataImage.EMPTY
+      val delta1 = new MetadataDelta(image1)
+      delta1.replay(new FeatureLevelRecord()
+        .setName(MetadataVersion.FEATURE_NAME)
+        .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
+      delta1.replay(ZkMigrationState.MIGRATION.toRecord.message)
+      delta1.replay(new 
TopicRecord().setTopicId(Uuid.randomUuid()).setName("topic-to-sync"))
+
+      val provenance1 = new MetadataProvenance(210, 11, 1)
+      image1 = delta1.apply(provenance1)
+
+      val manifest1 = LogDeltaManifest.newBuilder()
+        .provenance(provenance1)
+        .leaderAndEpoch(newLeader1)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build()
+
+      // Load an image into 3000 image and a leader event, this lets it become 
active and sync to ZK
+      driver1.onMetadataUpdate(delta1, image1, manifest1)
+      driver1.onControllerChange(newLeader1)
+
+      // Hold off on loading image to to 3001. This lets us artificially defer 
it from claiming leadership in ZK
+      driver2.onControllerChange(newLeader1)
+
+      // Wait for driver 1 to become leader in ZK
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
+        case Some(nodeId) => nodeId == 3000
+        case None => false
+      }, "waiting for 3000 to claim ZK leadership")
+
+      // Now 3001 becomes leader, and loads migration recovery state from ZK.
+      // Since an image hasn't been published to it yet, it will stay in 
WAIT_FOR_CONTROLLER_QUORUM
+      val newLeader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3)
+      driver2.onControllerChange(newLeader2)
+      TestUtils.waitUntilTrue(
+        () => 
safeGet(driver2.migrationState()).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
+        "waiting for node 3001 to enter WAIT_FOR_CONTROLLER_QUORUM")
+
+      // Node 3000 still thinks that its the leader, do a delta update
+      val delta2 = new MetadataDelta(image1)
+      delta2.replay(new 
TopicRecord().setTopicId(Uuid.randomUuid()).setName("another-topic-to-sync"))
+      val provenance2 = new MetadataProvenance(211, 11, 1)
+      val image2 = delta2.apply(provenance2)
+      val manifest2 = LogDeltaManifest.newBuilder()
+        .provenance(provenance2)
+        .leaderAndEpoch(newLeader1)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build()
+      val migrationZkVersion = 
readMigrationZNode(zkMigrationClient).migrationZkVersion()
+      driver1.onMetadataUpdate(delta2, image2, manifest2)
+
+      // Wait for /migration znode update from 3000 SYNC_KRAFT_TO_ZK
+      TestUtils.waitUntilTrue(() => 
readMigrationZNode(zkMigrationClient).migrationZkVersion() > migrationZkVersion,
+        "waiting for /migration znode to change")
+
+      // Now unblock 3001 from claiming ZK. This will let it move to 
BECOME_CONTROLLER
+      val delta3 = new MetadataDelta(image1)
+      delta3.replay(new 
TopicRecord().setTopicId(Uuid.randomUuid()).setName("another-topic-to-sync"))
+      val provenance3 = new MetadataProvenance(211, 11, 1)
+      val image3 = delta3.apply(provenance3)
+      val manifest3 = LogDeltaManifest.newBuilder()
+        .provenance(provenance3)
+        .leaderAndEpoch(newLeader2)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build()
+      driver2.onMetadataUpdate(delta3, image3, manifest3)
+
+      // Now wait for 3001 to become leader in ZK
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
+        case Some(nodeId) => nodeId == 3001
+        case None => false
+      }, "waiting for 3001 to claim ZK leadership")
+
+      // Now, 3001 will reload the /migration state and should not see any 
errors
+      faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
+
+      // 3000 should not be able to make any more ZK updates now
+      driver1.onMetadataUpdate(delta3, image3, manifest3)
+      safeGet(faultHandler1.waitForError("Controller epoch zkVersion check 
fails"))
+
+      // 3000 finally processes new leader event
+      driver1.onControllerChange(newLeader2)
+
+      // 3001 should still not have any errors
+      faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
+
+      // Wait until new leader has sync'd to ZK
+      TestUtils.waitUntilTrue(
+        () => 
safeGet(driver2.migrationState()).equals(MigrationDriverState.DUAL_WRITE),
+        "waiting for driver to enter DUAL_WRITE"
+      )
+
+      // Ensure we still dont have errors on the new leader
+      faultHandler2.checkAndClear(faults => assertTrue(faults.isEmpty))
+
+    } finally {
+      driver1.close()
+      driver2.close()
+      Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+      zookeeper.shutdown()
+      if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+    }
+  }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 172e5f2f601..4eb82d05d76 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -636,12 +636,24 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         public void run() throws Exception {
             if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, 
this)) {
                 applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
-                if (migrationLeadershipState.zkControllerEpochZkVersion() == 
-1) {
+                if (migrationLeadershipState.zkControllerEpochZkVersion() == 
ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) {
                     log.info("Unable to claim leadership, will retry until we 
learn of a different KRaft leader");
+
                 } else {
                     if 
(!migrationLeadershipState.initialZkMigrationComplete()) {
                         transitionTo(MigrationDriverState.ZK_MIGRATION);
                     } else {
+                        // KAFKA-16171 after loading the migration state in 
KRaftLeaderEvent, the previous controller
+                        // could have modified the /migration ZNode. Re-read 
it here after claiming the controller ZNode
+                        applyMigrationOperation("Re-reading migration state", 
state -> {
+                            ZkMigrationLeadershipState reloadedState =
+                                
zkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY);
+                            return 
KRaftMigrationDriver.this.migrationLeadershipState
+                                
.withMigrationZkVersion(reloadedState.migrationZkVersion())
+                                .withKRaftMetadataOffsetAndEpoch(
+                                    reloadedState.kraftMetadataOffset(),
+                                    reloadedState.kraftMetadataEpoch());
+                        });
                         transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
                     }
                 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
index ad3af89acd2..15b8b789ae7 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java
@@ -26,10 +26,15 @@ import java.util.Objects;
  * that no migration has been started.
  */
 public class ZkMigrationLeadershipState {
+    /**
+     * A Kafka-internal constant used to indicate that the znode version is 
unknown. See ZkVersion.UnknownVersion.
+     */
+    public static final int UNKNOWN_ZK_VERSION = -2;
 
     // Use -2 as sentinel for "unknown version" for ZK versions to avoid 
sending an actual -1 "any version"
     // when doing ZK writes
-    public static final ZkMigrationLeadershipState EMPTY = new 
ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, -2);
+    public static final ZkMigrationLeadershipState EMPTY =
+        new ZkMigrationLeadershipState(-1, -1, -1, -1, -1, -2, -1, 
UNKNOWN_ZK_VERSION);
 
     private final int kraftControllerId;
 
@@ -110,7 +115,7 @@ public class ZkMigrationLeadershipState {
         return kraftMetadataOffset;
     }
 
-    public long kraftMetadataEpoch() {
+    public int kraftMetadataEpoch() {
         return kraftMetadataEpoch;
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
index 503236cf266..03579056a58 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingMigrationClient.java
@@ -139,14 +139,18 @@ class CapturingMigrationClient implements MigrationClient 
{
 
     @Override
     public ZkMigrationLeadershipState 
claimControllerLeadership(ZkMigrationLeadershipState state) {
-        this.state = state;
-        return state;
+        if (state.zkControllerEpochZkVersion() == 
ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) {
+            this.state = state.withZkController(0, 0);
+        } else {
+            this.state = state.withZkController(state.zkControllerEpoch() + 1, 
state.zkControllerEpochZkVersion() + 1);
+        }
+        return this.state;
     }
 
     @Override
     public ZkMigrationLeadershipState 
releaseControllerLeadership(ZkMigrationLeadershipState state) {
-        this.state = state;
-        return state;
+        this.state = state.withUnknownZkController();
+        return this.state;
     }
 
 

Reply via email to