This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c054833acd KAFKA-15552 Fix Producer ID ZK migration (#14506)
3c054833acd is described below
commit 3c054833acd90a0df647a1f9a1407c2219983369
Author: David Arthur <[email protected]>
AuthorDate: Fri Oct 6 00:49:31 2023 -0400
KAFKA-15552 Fix Producer ID ZK migration (#14506)
This patch fixes a problem where we migrate the current producer ID batch
to KRaft instead of the next producer ID batch. Since KRaft stores the next
batch in the log, we end up serving up a duplicate batch to the first caller of
AllocateProducerIds once the KRaft controller has taken over.
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/zk/ZkMigrationClient.scala | 2 +-
.../kafka/zk/ZkMigrationIntegrationTest.scala | 59 ++++++++++++----------
.../kafka/zk/migration/ZkMigrationClientTest.scala | 44 ++++++++--------
3 files changed, 56 insertions(+), 49 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 92da03d2935..a11a84c017b 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -263,7 +263,7 @@ class ZkMigrationClient(
recordConsumer.accept(List(new ApiMessageAndVersion(new
ProducerIdsRecord()
.setBrokerEpoch(-1)
.setBrokerId(producerIdBlock.assignedBrokerId)
- .setNextProducerId(producerIdBlock.firstProducerId()),
0.toShort)).asJava)
+ .setNextProducerId(producerIdBlock.nextBlockFirstId()),
0.toShort)).asJava)
case None => // Nothing to migrate
}
}
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 040fa52d482..4f5a2b8cb43 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,40 +17,42 @@
package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost,
WildcardPrincipalString}
-import kafka.server.{ConfigType, KafkaConfig}
+import kafka.server.{ConfigType, ControllerRequestCompletionHandler,
KafkaConfig}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty,
ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.{PasswordEncoder, TestUtils}
+import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.acl.AclOperation.{DESCRIBE, READ, WRITE}
import org.apache.kafka.common.acl.AclPermissionType.ALLOW
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
+import org.apache.kafka.common.requests.{AllocateProducerIdsRequest,
AllocateProducerIdsResponse}
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
-import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion,
ProducerIdsBlock}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import org.slf4j.LoggerFactory
import java.util
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.{Properties, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -357,9 +359,8 @@ class ZkMigrationIntegrationTest {
kraftCluster.startup()
val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
- // Allocate a transactional producer ID while in ZK mode
- allocateProducerId(zkCluster.bootstrapServers())
- val producerIdBlock = readProducerIdBlock(zkClient)
+ // Allocate a block of producer IDs while in ZK mode
+ val nextProducerId =
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30,
TimeUnit.SECONDS)
// Enable migration configs and restart brokers
log.info("Restart brokers in migration mode")
@@ -388,8 +389,8 @@ class ZkMigrationIntegrationTest {
log.info("Verifying metadata changes with ZK")
verifyTopicConfigs(zkClient)
verifyClientQuotas(zkClient)
- allocateProducerId(zkCluster.bootstrapServers())
- verifyProducerId(producerIdBlock, zkClient)
+ val nextKRaftProducerId =
sendAllocateProducerIds(zkCluster.asInstanceOf[ZkClusterInstance]).get(30,
TimeUnit.SECONDS)
+ assertNotEquals(nextProducerId, nextKRaftProducerId)
} finally {
shutdownInSequence(zkCluster, kraftCluster)
@@ -556,17 +557,26 @@ class ZkMigrationIntegrationTest {
}
}
- def allocateProducerId(bootstrapServers: String): Unit = {
- val props = new Properties()
- props.put("bootstrap.servers", bootstrapServers)
- props.put("transactional.id", "some-transaction-id")
- val producer = new KafkaProducer[String, String](props, new
StringSerializer(), new StringSerializer())
- producer.initTransactions()
- producer.beginTransaction()
- producer.send(new ProducerRecord[String, String]("test", "", "one"))
- producer.commitTransaction()
- producer.flush()
- producer.close()
+ def sendAllocateProducerIds(zkClusterInstance: ZkClusterInstance):
CompletableFuture[Long] = {
+ val channel =
zkClusterInstance.getUnderlying.brokers.head.clientToControllerChannelManager
+ val brokerId = zkClusterInstance.getUnderlying.brokers.head.config.brokerId
+ val brokerEpoch =
zkClusterInstance.getUnderlying.brokers.head.replicaManager.brokerEpochSupplier.apply()
+ val request = new AllocateProducerIdsRequest.Builder(new
AllocateProducerIdsRequestData()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(brokerEpoch))
+
+ val producerIdStart = new CompletableFuture[Long]()
+ channel.sendRequest(request, new ControllerRequestCompletionHandler() {
+ override def onTimeout(): Unit = {
+ producerIdStart.completeExceptionally(new TimeoutException("Request
timed out"))
+ }
+
+ override def onComplete(response: ClientResponse): Unit = {
+ val body =
response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
+ producerIdStart.complete(body.data().producerIdStart())
+ }
+ })
+ producerIdStart
}
def readProducerIdBlock(zkClient: KafkaZkClient): ProducerIdsBlock = {
@@ -643,13 +653,6 @@ class ZkMigrationIntegrationTest {
}
}
- def verifyProducerId(firstProducerIdBlock: ProducerIdsBlock, zkClient:
KafkaZkClient): Unit = {
- TestUtils.retry(10000) {
- val producerIdBlock = readProducerIdBlock(zkClient)
- assertTrue(firstProducerIdBlock.firstProducerId() <
producerIdBlock.firstProducerId())
- }
- }
-
def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster:
KafkaClusterTestKit): Unit = {
zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
kraftCluster.close()
diff --git
a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
index a821a6fb414..773c42a66e4 100644
--- a/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
@@ -18,7 +18,7 @@ package kafka.zk.migration
import kafka.api.LeaderAndIsr
import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
-import kafka.coordinator.transaction.ProducerIdManager
+import kafka.coordinator.transaction.{ProducerIdManager, ZkProducerIdManager}
import kafka.server.{ConfigType, KafkaConfig}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.ControllerMovedException
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test
import java.util.Properties
import scala.collection.Map
import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success}
/**
* ZooKeeper integration tests that verify the interoperability of
KafkaZkClient and ZkMigrationClient.
@@ -211,30 +212,33 @@ class ZkMigrationClientTest extends
ZkMigrationTestHarness {
}
@Test
- def testReadAndWriteProducerId(): Unit = {
- def generateNextProducerIdWithZkAndRead(): Long = {
- // Generate a producer ID in ZK
- val manager = ProducerIdManager.zk(1, zkClient)
- manager.generateProducerId()
-
- val records = new
java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
- migrationClient.migrateProducerId(batch => records.add(batch))
- assertEquals(1, records.size())
- assertEquals(1, records.get(0).size())
-
- val record =
records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
- record.nextProducerId()
- }
-
- // Initialize with ZK ProducerIdManager
- assertEquals(0, generateNextProducerIdWithZkAndRead())
+ def testReadMigrateAndWriteProducerId(): Unit = {
+ // allocate some producer id blocks
+ ZkProducerIdManager.getNewProducerIdBlock(1, zkClient, this)
+ ZkProducerIdManager.getNewProducerIdBlock(2, zkClient, this)
+ val block = ZkProducerIdManager.getNewProducerIdBlock(3, zkClient, this)
+
+ // Migrate the producer ID state to KRaft as a record
+ val records = new
java.util.ArrayList[java.util.List[ApiMessageAndVersion]]()
+ migrationClient.migrateProducerId(batch => records.add(batch))
+ assertEquals(1, records.size())
+ assertEquals(1, records.get(0).size())
+ val record =
records.get(0).get(0).message().asInstanceOf[ProducerIdsRecord]
+
+ // Ensure the block stored in KRaft is the _next_ block since that is what
will be served
+ // to the next ALLOCATE_PRODUCER_IDS caller
+ assertEquals(block.nextBlockFirstId(), record.nextProducerId())
// Update next producer ID via migration client
migrationState = migrationClient.writeProducerId(6000, migrationState)
assertEquals(1, migrationState.migrationZkVersion())
- // Switch back to ZK, it should provision the next block
- assertEquals(7000, generateNextProducerIdWithZkAndRead())
+ val manager = ProducerIdManager.zk(1, zkClient)
+ val producerId = manager.generateProducerId() match {
+ case Failure(e) => fail("Encountered error when generating producer id",
e)
+ case Success(value) => value
+ }
+ assertEquals(7000, producerId)
}
@Test