This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c054833acd KAFKA-15552 Fix Producer ID ZK migration (#14506)
3c054833acd is described below

commit 3c054833acd90a0df647a1f9a1407c2219983369
Author: David Arthur <[email protected]>
AuthorDate: Fri Oct 6 00:49:31 2023 -0400

    KAFKA-15552 Fix Producer ID ZK migration (#14506)
    
    This patch fixes a problem where we migrate the current producer ID batch 
to KRaft instead of the next producer ID batch. Since KRaft stores the next 
batch in the log, we end up serving up a duplicate batch to the first caller of 
AllocateProducerIds once the KRaft controller has taken over.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  2 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 59 ++++++++++++----------
 .../kafka/zk/migration/ZkMigrationClientTest.scala | 44 ++++++++--------
 3 files changed, 56 insertions(+), 49 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala 
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 92da03d2935..a11a84c017b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -263,7 +263,7 @@ class ZkMigrationClient(
         recordConsumer.accept(List(new ApiMessageAndVersion(new 
ProducerIdsRecord()
           .setBrokerEpoch(-1)
           .setBrokerId(producerIdBlock.assignedBrokerId)
-          .setNextProducerId(producerIdBlock.firstProducerId()), 
0.toShort)).asJava)
+          .setNextProducerId(producerIdBlock.nextBlockFirstId()), 
0.toShort)).asJava)
       case None => // Nothing to migrate
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 040fa52d482..4f5a2b8cb43 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,40 +17,42 @@
 package kafka.zk
 
 import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
-import kafka.server.{ConfigType, KafkaConfig}
+import kafka.server.{ConfigType, ControllerRequestCompletionHandler, 
KafkaConfig}
 import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.test.annotation.{AutoStart, ClusterConfigProperty, 
ClusterTemplate, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
 import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.{PasswordEncoder, TestUtils}
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE}
 import org.apache.kafka.common.acl.AclPermissionType.ALLOW
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, 
AllocateProducerIdsResponse}
 import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.resource.ResourceType.TOPIC
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.extension.ExtendWith
 import org.slf4j.LoggerFactory
 
 import java.util
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.{Properties, UUID}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
@@ -357,9 +359,8 @@ class ZkMigrationIntegrationTest {
       kraftCluster.startup()
       val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
 
-      // Allocate a transactional producer ID while in ZK mode
-      allocateProducerId(zkCluster.bootstrapServers())
-      val producerIdBlock = readProducerIdBlock(zkClient)
+      // Allocate a block of producer IDs while in ZK mode
+      val nextProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
@@ -388,8 +389,8 @@ class ZkMigrationIntegrationTest {
       log.info("Verifying metadata changes with ZK")
       verifyTopicConfigs(zkClient)
       verifyClientQuotas(zkClient)
-      allocateProducerId(zkCluster.bootstrapServers())
-      verifyProducerId(producerIdBlock, zkClient)
+      val nextKRaftProducerId = 
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30, 
TimeUnit.SECONDS)
+      assertNotEquals(nextProducerId, nextKRaftProducerId)
 
     } finally {
       shutdownInSequence(zkCluster, kraftCluster)
@@ -556,17 +557,26 @@ class ZkMigrationIntegrationTest {
     }
   }
 
-  def allocateProducerId(bootstrapServers: String): Unit = {
-    val props = new Properties()
-    props.put("bootstrap.servers", bootstrapServers)
-    props.put("transactional.id", "some-transaction-id")
-    val producer = new KafkaProducer[String, String](props, new 
StringSerializer(), new StringSerializer())
-    producer.initTransactions()
-    producer.beginTransaction()
-    producer.send(new ProducerRecord[String, String]("test", "", "one"))
-    producer.commitTransaction()
-    producer.flush()
-    producer.close()
+  def sendAllocateProducerIds(zkClusterInstance: ZkClusterInstance): 
CompletableFuture[Long] = {
+    val channel = 
zkClusterInstance.getUnderlying.brokers.head.clientToControllerChannelManager
+    val brokerId = zkClusterInstance.getUnderlying.brokers.head.config.brokerId
+    val brokerEpoch = 
zkClusterInstance.getUnderlying.brokers.head.replicaManager.brokerEpochSupplier.apply()
+    val request = new AllocateProducerIdsRequest.Builder(new 
AllocateProducerIdsRequestData()
+      .setBrokerId(brokerId)
+      .setBrokerEpoch(brokerEpoch))
+
+    val producerIdStart = new CompletableFuture[Long]()
+    channel.sendRequest(request, new ControllerRequestCompletionHandler() {
+        override def onTimeout(): Unit = {
+          producerIdStart.completeExceptionally(new TimeoutException("Request 
timed out"))
+        }
+
+        override def onComplete(response: ClientResponse): Unit = {
+          val body = 
response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
+          producerIdStart.complete(body.data().producerIdStart())
+        }
+      })
+    producerIdStart
   }
 
   def readProducerIdBlock(zkClient: KafkaZkClient): ProducerIdsBlock = {
@@ -643,13 +653,6 @@ class ZkMigrationIntegrationTest {
     }
   }
 
-  def verifyProducerId(firstProducerIdBlock: ProducerIdsBlock, zkClient: 
KafkaZkClient): Unit = {
-    TestUtils.retry(10000) {
-      val producerIdBlock = readProducerIdBlock(zkClient)
-      assertTrue(firstProducerIdBlock.firstProducerId() < 
producerIdBlock.firstProducerId())
-    }
-  }
-
   def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: 
KafkaClusterTestKit): Unit = {
     zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
     kraftCluster.close()
diff --git 
a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index a821a6fb414..773c42a66e4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -18,7 +18,7 @@ package kafka.zk.migration
 
 import kafka.api.LeaderAndIsr
 import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.coordinator.transaction.ProducerIdManager
+import kafka.coordinator.transaction.{ProducerIdManager, ZkProducerIdManager}
 import kafka.server.{ConfigType, KafkaConfig}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.ControllerMovedException
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test
 import java.util.Properties
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success}
 
 /**
  * ZooKeeper integration tests that verify the interoperability of 
KafkaZkClient and ZkMigrationClient.
@@ -211,30 +212,33 @@ class ZkMigrationClientTest extends 
ZkMigrationTestHarness {
   }
 
   @Test
-  def testReadAndWriteProducerId(): Unit = {
-    def generateNextProducerIdWithZkAndRead(): Long = {
-      // Generate a producer ID in ZK
-      val manager = ProducerIdManager.zk(1, zkClient)
-      manager.generateProducerId()
-
-      val records = new 
java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
-      migrationClient.migrateProducerId(batch => records.add(batch))
-      assertEquals(1, records.size())
-      assertEquals(1, records.get(0).size())
-
-      val record = 
records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
-      record.nextProducerId()
-    }
-
-    // Initialize with ZK ProducerIdManager
-    assertEquals(0, generateNextProducerIdWithZkAndRead())
+  def testReadMigrateAndWriteProducerId(): Unit = {
+    // allocate some producer id blocks
+    ZkProducerIdManager.getNewProducerIdBlock(1, zkClient, this)
+    ZkProducerIdManager.getNewProducerIdBlock(2, zkClient, this)
+    val block = ZkProducerIdManager.getNewProducerIdBlock(3, zkClient, this)
+
+    // Migrate the producer ID state to KRaft as a record
+    val records = new 
java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+    migrationClient.migrateProducerId(batch => records.add(batch))
+    assertEquals(1, records.size())
+    assertEquals(1, records.get(0).size())
+    val record = 
records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
+
+    // Ensure the block stored in KRaft is the _next_ block since that is what 
will be served
+    // to the next ALLOCATE_PRODUCER_IDS caller
+    assertEquals(block.nextBlockFirstId(), record.nextProducerId())
 
     // Update next producer ID via migration client
     migrationState = migrationClient.writeProducerId(6000, migrationState)
     assertEquals(1, migrationState.migrationZkVersion())
 
-    // Switch back to ZK, it should provision the next block
-    assertEquals(7000, generateNextProducerIdWithZkAndRead())
+    val manager = ProducerIdManager.zk(1, zkClient)
+    val producerId = manager.generateProducerId() match {
+      case Failure(e) => fail("Encountered error when generating producer id", 
e)
+      case Success(value) => value
+    }
+    assertEquals(7000, producerId)
   }
 
   @Test

Reply via email to