This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 89a4735c35b KAFKA-14656: Send UMR first during ZK migration (#13159)
89a4735c35b is described below

commit 89a4735c35b80abcf5cd80f5c5ae80fc39107571
Author: David Arthur <[email protected]>
AuthorDate: Tue Jan 31 01:31:45 2023 -0500

    KAFKA-14656: Send UMR first during ZK migration (#13159)
    
    When in migration-from-ZK mode and sending RPCs to ZK-based brokers, the 
KRaft controller must send
    full UpdateMetadataRequests prior to sending full LeaderAndIsrRequests. If 
the controller sends the
    requests in the other order, and the ZK-based broker does not already know 
about some of the nodes
    referenced in the LeaderAndIsrRequest, it will reject the request.
    
    This PR includes an integration test, and a number of other small fixes for 
dual-write.
    
    Co-authored-by: Akhilesh C <[email protected]>
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../scala/kafka/common/InterBrokerSendThread.scala |   2 +-
 .../kafka/migration/MigrationPropagator.scala      |  30 ++--
 .../server/BrokerToControllerChannelManager.scala  |  20 ++-
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  37 ++---
 .../test/junit/ZkClusterInvocationContext.java     |   2 +
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 164 ++++++++++++++++++++-
 .../kafka/integration/KafkaServerTestHarness.scala |   2 +-
 .../unit/kafka/zk/ZkMigrationClientTest.scala      |  16 +-
 .../kafka/controller/ProducerIdControlManager.java |   8 +-
 .../org/apache/kafka/image/ClientQuotaImage.java   |   4 +
 .../org/apache/kafka/image/ConfigurationImage.java |   4 +
 .../apache/kafka/image/ConfigurationsImage.java    |  13 ++
 .../metadata/migration/KRaftMigrationDriver.java   |  70 ++++++++-
 .../kafka/metadata/migration/MigrationClient.java  |  18 +++
 14 files changed, 334 insertions(+), 56 deletions(-)

diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala 
b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index e1bea72ddd0..2b77c29bc24 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
  */
 abstract class InterBrokerSendThread(
   name: String,
-  var networkClient: KafkaClient,
+  @volatile var networkClient: KafkaClient,
   requestTimeoutMs: Int,
   time: Time,
   isInterruptible: Boolean = true
diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala 
b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 308d8f06f15..355831166ee 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -92,14 +92,20 @@ class MigrationPropagator(
     val oldZkBrokers = zkBrokers -- changedZkBrokers
     val brokersChanged = !delta.clusterDelta().changedBrokers().isEmpty
 
+    // First send metadata about the live/dead brokers to all the zk brokers.
     if (changedZkBrokers.nonEmpty) {
       // Update new Zk brokers about all the metadata.
       requestBatch.addUpdateMetadataRequestForBrokers(changedZkBrokers.toSeq, 
image.topics().partitions().keySet().asScala)
-      // Send these requests first to make sure, we don't add all the 
partition metadata to the
-      // old brokers as well.
-      requestBatch.sendRequestsToBrokers(zkControllerEpoch)
-      requestBatch.newBatch()
+    }
+    if (brokersChanged) {
+      requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
+    }
+    requestBatch.sendRequestsToBrokers(zkControllerEpoch)
+    requestBatch.newBatch()
 
+    // Now send LISR, UMR and StopReplica requests for both new zk brokers and 
existing zk
+    // brokers based on the topic changes.
+    if (changedZkBrokers.nonEmpty) {
       // For new the brokers, check if there are partition assignments and add 
LISR appropriately.
       image.topics().partitions().asScala.foreach { case (tp, 
partitionRegistration) =>
         val replicas = partitionRegistration.replicas.toSet
@@ -118,9 +124,8 @@ class MigrationPropagator(
       }
     }
 
-    // If there are new brokers (including KRaft brokers) or if there are 
changes in topic
-    // metadata, let's send UMR about the changes to the old Zk brokers.
-    if (brokersChanged || !delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {
+    // If there are changes in topic metadata, let's send UMR about the 
changes to the old Zk brokers.
+    if (!delta.topicsDelta().deletedTopicIds().isEmpty || 
!delta.topicsDelta().changedTopics().isEmpty) {
       requestBatch.addUpdateMetadataRequestForBrokers(oldZkBrokers.toSeq)
     }
 
@@ -175,14 +180,20 @@ class MigrationPropagator(
 
   override def sendRPCsToBrokersFromMetadataImage(image: MetadataImage, 
zkControllerEpoch: Int): Unit = {
     publishMetadata(image)
+
+    val zkBrokers = 
image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
+    val partitions = image.topics().partitions()
+    // First send all the metadata before sending any other requests to make 
sure subsequent
+    // requests are handled correctly.
     requestBatch.newBatch()
+    requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, 
partitions.keySet().asScala)
+    requestBatch.sendRequestsToBrokers(zkControllerEpoch)
 
+    requestBatch.newBatch()
     // When we need to send RPCs from the image, we're sending 'full' requests 
meaning we let
     // every broker know about all the metadata and all the LISR requests it 
needs to handle.
     // Note that we cannot send StopReplica requests from the image. We don't 
have any state
     // about brokers that host a replica but are not part of the replica set 
known by the Controller.
-    val zkBrokers = 
image.cluster().zkBrokers().keySet().asScala.map(_.toInt).toSeq
-    val partitions = image.topics().partitions()
     partitions.asScala.foreach{ case (tp, partitionRegistration) =>
       val leaderIsrAndControllerEpochOpt = 
MigrationControllerChannelContext.partitionLeadershipInfo(image, tp)
       leaderIsrAndControllerEpochOpt match {
@@ -194,7 +205,6 @@ class MigrationPropagator(
         case None => None
       }
     }
-    requestBatch.addUpdateMetadataRequestForBrokers(zkBrokers, 
partitions.keySet().asScala)
     requestBatch.sendRequestsToBrokers(zkControllerEpoch)
   }
 
diff --git 
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index b1c27287560..3c2a6a2acbd 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -315,12 +315,12 @@ class BrokerToControllerRequestThread(
   private def maybeResetNetworkClient(controllerInformation: 
ControllerInformation): Unit = {
     if (isNetworkClientForZkController != 
controllerInformation.isZkController) {
       debug("Controller changed to " + (if (isNetworkClientForZkController) 
"kraft" else "zk") + " mode. " +
-        "Resetting network client")
+        s"Resetting network client with new controller information : 
${controllerInformation}")
       // Close existing network client.
-      if (networkClient != null) {
-        networkClient.initiateClose()
-        networkClient.close()
-      }
+      val oldClient = networkClient
+      oldClient.initiateClose()
+      oldClient.close()
+
       isNetworkClientForZkController = controllerInformation.isZkController
       updateControllerAddress(controllerInformation.node.orNull)
       controllerInformation.node.foreach(n => 
metadataUpdater.setNodes(Seq(n).asJava))
@@ -382,6 +382,7 @@ class BrokerToControllerRequestThread(
   }
 
   private[server] def handleResponse(queueItem: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+    debug(s"Request ${queueItem.request} received $response")
     if (response.authenticationException != null) {
       error(s"Request ${queueItem.request} failed due to authentication error 
with controller",
         response.authenticationException)
@@ -394,9 +395,16 @@ class BrokerToControllerRequestThread(
       updateControllerAddress(null)
       requestQueue.putFirst(queueItem)
     } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+      debug(s"Request ${queueItem.request} received NOT_CONTROLLER exception. 
Disconnecting the " +
+        s"connection to the stale controller 
${activeControllerAddress().map(_.idString).getOrElse("null")}")
       // just close the controller connection and wait for metadata cache 
update in doWork
       activeControllerAddress().foreach { controllerAddress =>
-        networkClient.disconnect(controllerAddress.idString)
+        try {
+          // We don't care if disconnect has an error, just log it and get a 
new network client
+          networkClient.disconnect(controllerAddress.idString)
+        } catch {
+          case t: Throwable => error("Had an error while disconnecting from 
NetworkClient.", t)
+        }
         updateControllerAddress(null)
       }
 
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala 
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 1ab05d5a75d..61b8b2afb51 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -208,7 +208,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
         recordConsumer.accept(List(new ApiMessageAndVersion(new 
ProducerIdsRecord()
           .setBrokerEpoch(-1)
           .setBrokerId(producerIdBlock.assignedBrokerId)
-          .setNextProducerId(producerIdBlock.firstProducerId), 
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
+          .setNextProducerId(producerIdBlock.firstProducerId()), 
ProducerIdsRecord.HIGHEST_SUPPORTED_VERSION)).asJava)
       case None => // Nothing to migrate
     }
   }
@@ -364,24 +364,27 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
     }
   }
 
-  def writeClientQuotas(entity: ClientQuotaEntity,
-                        quotas: util.Map[String, Double],
-                        state: ZkMigrationLeadershipState): 
ZkMigrationLeadershipState = {
-    val entityMap = entity.entries().asScala
-    val hasUser = entityMap.contains(ConfigType.User)
-    val hasClient = entityMap.contains(ConfigType.Client)
-    val hasIp = entityMap.contains(ConfigType.Ip)
+  override def writeClientQuotas(
+    entity: util.Map[String, String],
+    quotas: util.Map[String, java.lang.Double],
+    state: ZkMigrationLeadershipState
+  ): ZkMigrationLeadershipState = {
+
+    val entityMap = entity.asScala
+    val hasUser = entityMap.contains(ClientQuotaEntity.USER)
+    val hasClient = entityMap.contains(ClientQuotaEntity.CLIENT_ID)
+    val hasIp = entityMap.contains(ClientQuotaEntity.IP)
     val props = new Properties()
     // We store client quota values as strings in the ZK JSON
     quotas.forEach { case (key, value) => props.put(key, value.toString) }
     val (configType, path) = if (hasUser && !hasClient) {
-      (Some(ConfigType.User), Some(entityMap(ConfigType.User)))
+      (Some(ConfigType.User), Some(entityMap(ClientQuotaEntity.USER)))
     } else if (hasUser && hasClient) {
-      (Some(ConfigType.User), 
Some(s"${entityMap(ConfigType.User)}/clients/${entityMap(ConfigType.Client)}"))
+      (Some(ConfigType.User), 
Some(s"${entityMap(ClientQuotaEntity.USER)}/clients/${entityMap(ClientQuotaEntity.CLIENT_ID)}"))
     } else if (hasClient) {
-      (Some(ConfigType.Client), Some(entityMap(ConfigType.Client)))
+      (Some(ConfigType.Client), Some(entityMap(ClientQuotaEntity.CLIENT_ID)))
     } else if (hasIp) {
-      (Some(ConfigType.Ip), Some(entityMap(ConfigType.Ip)))
+      (Some(ConfigType.Ip), Some(entityMap(ClientQuotaEntity.IP)))
     } else {
       (None, None)
     }
@@ -399,7 +402,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
         // If we didn't update the migration state, we failed to write the 
client quota. Try again
         // after recursively create its parent znodes
         val createPath = if (hasUser && hasClient) {
-          
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ConfigType.User)}/clients"
+          
s"${ConfigEntityTypeZNode.path(configType.get)}/${entityMap(ClientQuotaEntity.USER)}/clients"
         } else {
           ConfigEntityTypeZNode.path(configType.get)
         }
@@ -414,7 +417,7 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
     }
   }
 
-  def writeProducerId(nextProducerId: Long, state: 
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
+  override def writeProducerId(nextProducerId: Long, state: 
ZkMigrationLeadershipState): ZkMigrationLeadershipState = {
     val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(
       new ProducerIdsBlock(-1, nextProducerId, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE))
 
@@ -423,9 +426,9 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
     state.withMigrationZkVersion(migrationZkVersion)
   }
 
-  def writeConfigs(resource: ConfigResource,
-                   configs: util.Map[String, String],
-                   state: ZkMigrationLeadershipState): 
ZkMigrationLeadershipState = {
+  override def writeConfigs(resource: ConfigResource,
+                            configs: util.Map[String, String],
+                            state: ZkMigrationLeadershipState): 
ZkMigrationLeadershipState = {
     val configType = resource.`type`() match {
       case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
       case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
diff --git 
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java 
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 5154982e472..26c51d70197 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -27,6 +27,7 @@ import kafka.test.ClusterInstance;
 import kafka.utils.EmptyTestInfo;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
@@ -319,6 +320,7 @@ public class ZkClusterInvocationContext implements 
TestTemplateInvocationContext
                 clusterReference.get().killBroker(i);
             }
             clusterReference.get().restartDeadBrokers(true);
+            
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
 bootstrapServers());
         }
 
         @Override
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index d9b2e286956..231bf01c917 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -16,26 +16,38 @@
  */
 package kafka.zk
 
+import kafka.server.{ConfigType, KafkaConfig}
 import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.common.config.TopicConfig
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasResult, 
AlterConfigOp, AlterConfigsResult, ConfigEntry, NewTopic}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
 import org.junit.jupiter.api.extension.ExtendWith
+import org.slf4j.LoggerFactory
 
 import java.util
+import java.util.Properties
 import java.util.concurrent.TimeUnit
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class ZkMigrationIntegrationTest {
 
+  val log = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
+
   class MetadataDeltaVerifier {
     val metadataDelta = new MetadataDelta(MetadataImage.EMPTY)
     var offset = 0
@@ -102,4 +114,146 @@ class ZkMigrationIntegrationTest {
 
     migrationState = 
migrationClient.releaseControllerLeadership(migrationState)
   }
+
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testDualWrite(zkCluster: ClusterInstance): Unit = {
+    // Create a topic in ZK mode
+    var admin = zkCluster.createAdminClient()
+    val newTopics = new util.ArrayList[NewTopic]()
+    newTopics.add(new NewTopic("test", 2, 3.toShort)
+      .configs(Map(TopicConfig.SEGMENT_BYTES_CONFIG -> "102400", 
TopicConfig.SEGMENT_MS_CONFIG -> "300000").asJava))
+    val createTopicResult = admin.createTopics(newTopics)
+    createTopicResult.all().get(60, TimeUnit.SECONDS)
+    admin.close()
+
+    // Verify the configs exist in ZK
+    val zkClient = 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+    val propsBefore = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+    assertEquals("102400", 
propsBefore.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
+    assertEquals("300000", 
propsBefore.getProperty(TopicConfig.SEGMENT_MS_CONFIG))
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, 
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Allocate a transactional producer ID while in ZK mode
+      allocateProducerId(zkCluster.bootstrapServers())
+      val producerIdBlock = readProducerIdBlock(zkClient)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
+      
zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, 
voters)
+      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
+      
zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      // Wait for migration to begin
+      log.info("Waiting for ZK migration to begin")
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), 
"Timed out waiting for KRaft controller to take over")
+
+      // Alter the metadata
+      log.info("Updating metadata with AdminClient")
+      admin = zkCluster.createAdminClient()
+      alterTopicConfig(admin).all().get(60, TimeUnit.SECONDS)
+      alterClientQuotas(admin).all().get(60, TimeUnit.SECONDS)
+
+      // Verify the changes made to KRaft are seen in ZK
+      log.info("Verifying metadata changes with ZK")
+      verifyTopicConfigs(zkClient)
+      verifyClientQuotas(zkClient)
+      allocateProducerId(zkCluster.bootstrapServers())
+      verifyProducerId(producerIdBlock, zkClient)
+
+    } finally {
+      zkCluster.stop()
+      kraftCluster.close()
+    }
+  }
+
+  def allocateProducerId(bootstrapServers: String): Unit = {
+    val props = new Properties()
+    props.put("bootstrap.servers", bootstrapServers)
+    props.put("transactional.id", "some-transaction-id")
+    val producer = new KafkaProducer[String, String](props, new 
StringSerializer(), new StringSerializer())
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(new ProducerRecord[String, String]("test", "", "one"))
+    producer.commitTransaction()
+    producer.flush()
+    producer.close()
+  }
+
+  def readProducerIdBlock(zkClient: KafkaZkClient): ProducerIdsBlock = {
+    val (dataOpt, _) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
+    dataOpt.map(ProducerIdBlockZNode.parseProducerIdBlockData).get
+  }
+
+  def alterTopicConfig(admin: Admin): AlterConfigsResult = {
+    val topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test")
+    val alterConfigs = Seq(
+      new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, 
"204800"), AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, null), 
AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+    admin.incrementalAlterConfigs(Map(topicResource -> alterConfigs).asJava)
+  }
+
+  def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
+    val quotas = new util.ArrayList[ClientQuotaAlteration]()
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("user" -> "user1").asJava),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> 
"clientA").asJava),
+      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new 
ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
+    quotas.add(new ClientQuotaAlteration(
+      new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
+      List(new ClientQuotaAlteration.Op("connection_creation_rate", 
10.0)).asJava))
+    admin.alterClientQuotas(quotas)
+  }
+
+  def verifyTopicConfigs(zkClient: KafkaZkClient): Unit = {
+    TestUtils.retry(10000) {
+      val propsAfter = zkClient.getEntityConfigs(ConfigType.Topic, "test")
+      assertEquals("204800", 
propsAfter.getProperty(TopicConfig.SEGMENT_BYTES_CONFIG))
+      assertFalse(propsAfter.containsKey(TopicConfig.SEGMENT_MS_CONFIG))
+    }
+  }
+
+  def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
+    TestUtils.retry(10000) {
+      assertEquals("1000.0", zkClient.getEntityConfigs(ConfigType.User, 
"user1").getProperty("consumer_byte_rate"))
+      assertEquals("800.0", zkClient.getEntityConfigs("users/user1/clients", 
"clientA").getProperty("consumer_byte_rate"))
+      assertEquals("100.0", zkClient.getEntityConfigs("users/user1/clients", 
"clientA").getProperty("producer_byte_rate"))
+      assertEquals("10.0", zkClient.getEntityConfigs(ConfigType.Ip, 
"8.8.8.8").getProperty("connection_creation_rate"))
+    }
+  }
+
+  def verifyProducerId(firstProducerIdBlock: ProducerIdsBlock, zkClient: 
KafkaZkClient): Unit = {
+    TestUtils.retry(10000) {
+      val producerIdBlock = readProducerIdBlock(zkClient)
+      assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
+    }
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 1ef4d47995a..cf52b5ce9b3 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -358,7 +358,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
         time = brokerTime(config.brokerId),
         threadNamePrefix = None,
         startup = false,
-        enableZkApiForwarding = isZkMigrationTest()
+        enableZkApiForwarding = isZkMigrationTest() || 
(config.migrationEnabled && 
config.interBrokerProtocolVersion.isApiForwardingEnabled)
       )
     }
   }
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index b6d01787785..8694f10c14a 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -164,11 +164,11 @@ class ZkMigrationClientTest extends QuorumTestHarness {
                                         adminZkClient: AdminZkClient,
                                         migrationState: 
ZkMigrationLeadershipState,
                                         entity: Map[String, String],
-                                        quotas: Map[String, Double],
+                                        quotas: Map[String, java.lang.Double],
                                         zkEntityType: String,
                                         zkEntityName: String): 
ZkMigrationLeadershipState = {
     val nextMigrationState = migrationClient.writeClientQuotas(
-      new ClientQuotaEntity(entity.asJava),
+      entity.asJava,
       quotas.asJava,
       migrationState)
     val newProps = ZkAdminManager.clientQuotaPropsToDoubleMap(
@@ -187,25 +187,25 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
     assertEquals(0, migrationState.migrationZkVersion())
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user1"),
+      Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0),
       ConfigType.User, "user1")
     assertEquals(1, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user1"),
+      Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0),
       ConfigType.User, "user1")
     assertEquals(2, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user1"),
+      Map(ClientQuotaEntity.USER -> "user1"),
       Map.empty,
       ConfigType.User, "user1")
     assertEquals(3, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user1"),
+      Map(ClientQuotaEntity.USER -> "user1"),
       Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
       ConfigType.User, "user1")
     assertEquals(4, migrationState.migrationZkVersion())
@@ -215,14 +215,14 @@ class ZkMigrationClientTest extends QuorumTestHarness {
   def testWriteNewClientQuotas(): Unit = {
     assertEquals(0, migrationState.migrationZkVersion())
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user2"),
+      Map(ClientQuotaEntity.USER -> "user2"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 100.0),
       ConfigType.User, "user2")
 
     assertEquals(1, migrationState.migrationZkVersion())
 
     migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, 
migrationState,
-      Map(ConfigType.User -> "user2", ConfigType.Client -> "clientA"),
+      Map(ClientQuotaEntity.USER -> "user2", ClientQuotaEntity.CLIENT_ID -> 
"clientA"),
       Map(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
       ConfigType.User, "user2/clients/clientA")
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 47e4e1b430f..2c23bba9dd7 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -64,10 +64,12 @@ public class ProducerIdControlManager {
     }
 
     void replay(ProducerIdsRecord record) {
-        long currentNextProducerId = nextProducerBlock.get().firstProducerId();
-        if (record.nextProducerId() <= currentNextProducerId) {
+        // During a migration, we may be calling replay() without ever having 
called generateNextProducerId(),
+        // so the next producer block could be EMPTY
+        ProducerIdsBlock nextBlock = nextProducerBlock.get();
+        if (nextBlock != ProducerIdsBlock.EMPTY && record.nextProducerId() <= 
nextBlock.firstProducerId()) {
             throw new RuntimeException("Next Producer ID from replayed record 
(" + record.nextProducerId() + ")" +
-                " is not greater than current next Producer ID (" + 
currentNextProducerId + ")");
+                " is not greater than current next Producer ID (" + 
nextBlock.firstProducerId() + ")");
         } else {
             nextProducerBlock.set(new ProducerIdsBlock(record.brokerId(), 
record.nextProducerId(), ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE));
             brokerEpoch.set(record.brokerEpoch());
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
index 51104c92ab2..6e0b84bbfbb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
@@ -52,6 +52,10 @@ public final class ClientQuotaImage {
         return quotas;
     }
 
+    public Map<String, Double> quotaMap() {
+        return Collections.unmodifiableMap(quotas);
+    }
+
     public void write(
         ClientQuotaEntity entity,
         ImageWriter writer,
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index cac197a224d..bf74bb1aeb7 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -58,6 +58,10 @@ public final class ConfigurationImage {
         return properties;
     }
 
+    public Map<String, String> toMap() {
+        return Collections.unmodifiableMap(data);
+    }
+
     public void write(
         ConfigResource configResource,
         ImageWriter writer,
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index e1ec11e059a..101594615fb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -61,6 +61,19 @@ public final class ConfigurationsImage {
         }
     }
 
+    /**
+     * Return the underlying config data for a given resource as an immutable 
map. This does not apply
+     * configuration overrides or include entity defaults for the resource 
type.
+     */
+    public Map<String, String> configMapForResource(ConfigResource 
configResource) {
+        ConfigurationImage configurationImage = data.get(configResource);
+        if (configurationImage != null) {
+            return configurationImage.toMap();
+        } else {
+            return Collections.emptyMap();
+        }
+    }
+
     public void write(ImageWriter writer, ImageWriterOptions options) {
         for (Entry<ConfigResource, ConfigurationImage> entry : 
data.entrySet()) {
             ConfigResource configResource = entry.getKey();
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index f3bdeb81507..fd2d85081c4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.metadata.migration;
 
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
@@ -28,18 +30,22 @@ import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -403,7 +409,11 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                 AtomicInteger count = new AtomicInteger(0);
                 zkMigrationClient.readAllMetadata(batch -> {
                     try {
-                        log.info("Migrating {} records from ZK: {}", 
batch.size(), batch);
+                        if (log.isTraceEnabled()) {
+                            log.trace("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));
+                        } else {
+                            log.info("Migrating {} records from ZK", 
batch.size());
+                        }
                         CompletableFuture<?> future = 
zkRecordConsumer.acceptBatch(batch);
                         count.addAndGet(batch.size());
                         future.get();
@@ -446,9 +456,14 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
             // Ignore sending RPCs to the brokers since we're no longer in the 
state.
             if (migrationState == 
MigrationState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
                 if 
(image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch())
 >= 0) {
+                    log.trace("Sending RPCs to broker before moving to 
dual-write mode using " +
+                        "at offset and epoch {}", 
image.highestOffsetAndEpoch());
                     propagator.sendRPCsToBrokersFromMetadataImage(image, 
migrationLeadershipState.zkControllerEpoch());
                     // Migration leadership state doesn't change since we're 
not doing any Zk writes.
                     transitionTo(MigrationState.DUAL_WRITE);
+                } else {
+                    log.trace("Ignoring using metadata image since migration 
leadership state is at a greater offset and epoch {}",
+                        migrationLeadershipState.offsetAndEpoch());
                 }
             }
         }
@@ -470,10 +485,11 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         @Override
         public void run() throws Exception {
             KRaftMigrationDriver.this.image = image;
+            String metadataType = isSnapshot ? "snapshot" : "delta";
 
             if (migrationState != MigrationState.DUAL_WRITE) {
-                log.trace("Received metadata change, but the controller is not 
in dual-write " +
-                        "mode. Ignoring the change to be replicated to 
Zookeeper");
+                log.trace("Received metadata {}, but the controller is not in 
dual-write " +
+                    "mode. Ignoring the change to be replicated to Zookeeper", 
metadataType);
                 return;
             }
             if (delta.featuresDelta() != null) {
@@ -499,16 +515,60 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
                     });
                 }
 
+                // For configs and client quotas, we need to send all of the 
data to the ZK client since we persist
+                // everything for a given entity in a single ZK node.
+                if (delta.configsDelta() != null) {
+                    delta.configsDelta().changes().forEach((configResource, 
configDelta) ->
+                        apply("Updating config resource " + configResource, 
migrationState ->
+                            zkMigrationClient.writeConfigs(configResource, 
image.configs().configMapForResource(configResource), migrationState)));
+                }
+
+                if (delta.clientQuotasDelta() != null) {
+                    
delta.clientQuotasDelta().changes().forEach((clientQuotaEntity, 
clientQuotaDelta) -> {
+                        Map<String, Double> quotaMap = 
image.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
+                        apply("Updating client quota " + clientQuotaEntity, 
migrationState ->
+                            
zkMigrationClient.writeClientQuotas(clientQuotaEntity.entries(), quotaMap, 
migrationState));
+                    });
+                }
+
+                if (delta.producerIdsDelta() != null) {
+                    apply("Updating next producer ID", migrationState ->
+                        
zkMigrationClient.writeProducerId(delta.producerIdsDelta().nextProducerId(), 
migrationState));
+                }
 
-                apply("Write MetadataDelta to Zk", state -> 
zkMigrationClient.writeMetadataDeltaToZookeeper(delta, image, state));
                 // TODO: Unhappy path: Probably relinquish leadership and let 
new controller
                 //  retry the write?
+                log.trace("Sending RPCs to brokers for metadata {}.", 
metadataType);
                 propagator.sendRPCsToBrokersFromMetadataDelta(delta, image,
                         migrationLeadershipState.zkControllerEpoch());
             } else {
-                String metadataType = isSnapshot ? "snapshot" : "delta";
                 log.info("Ignoring {} {} which contains metadata that has 
already been written to ZK.", metadataType, provenance);
             }
         }
     }
+
+    static String recordBatchToString(Collection<ApiMessageAndVersion> batch) {
+        String batchString = batch.stream().map(apiMessageAndVersion -> {
+            if (apiMessageAndVersion.message().apiKey() == 
MetadataRecordType.CONFIG_RECORD.id()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("ApiMessageAndVersion(");
+                ConfigRecord record = (ConfigRecord) 
apiMessageAndVersion.message();
+                sb.append("ConfigRecord(");
+                sb.append("resourceType=");
+                sb.append(record.resourceType());
+                sb.append(", resourceName=");
+                sb.append(record.resourceName());
+                sb.append(", name=");
+                sb.append(record.name());
+                sb.append(")");
+                sb.append(" at version ");
+                sb.append(apiMessageAndVersion.version());
+                sb.append(")");
+                return sb.toString();
+            } else {
+                return apiMessageAndVersion.toString();
+            }
+        }).collect(Collectors.joining(","));
+        return "[" + batchString + "]";
+    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
index 1710458cd57..11284b399c1 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/MigrationClient.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.metadata.migration;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -85,6 +86,23 @@ public interface MigrationClient {
         ZkMigrationLeadershipState state
     );
 
+    ZkMigrationLeadershipState writeConfigs(
+        ConfigResource configResource,
+        Map<String, String> configMap,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState writeClientQuotas(
+        Map<String, String> clientQuotaEntity,
+        Map<String, Double> quotas,
+        ZkMigrationLeadershipState state
+    );
+
+    ZkMigrationLeadershipState writeProducerId(
+        long nextProducerId,
+        ZkMigrationLeadershipState state
+    );
+
     void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, 
Consumer<Integer> brokerIdConsumer);
 
     Set<Integer> readBrokerIds();


Reply via email to