This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1dbcb7da9e3 KAFKA-14694: RPCProducerIdManager should not wait on new
block (#13267)
1dbcb7da9e3 is described below
commit 1dbcb7da9e3625ec2078a82f84542a3127730fef
Author: Jeff Kim <[email protected]>
AuthorDate: Thu Jun 22 13:19:39 2023 -0400
KAFKA-14694: RPCProducerIdManager should not wait on new block (#13267)
RPCProducerIdManager initiates an async request to the controller to grab a
block of producer IDs and then blocks waiting for a response from the
controller.
This is done in the request handler threads while holding a global lock.
This means that if many producers are requesting producer IDs and the
controller is slow to respond, many threads can get stuck waiting for the lock.
This patch aims to:
* resolve the deadlock scenario mentioned above by not waiting for a new
block and returning an error immediately
* remove synchronization usages in RpcProducerIdManager.generateProducerId()
* handle errors returned from generateProducerId() so that KafkaApis does
not log unexpected errors
* confirm producers backoff before retrying
* introduce backoff if manager fails to process AllocateProducerIdsResponse
Reviewers: Artem Livshits <[email protected]>, Jason Gustafson
<[email protected]>
---
.../clients/producer/internals/SenderTest.java | 24 ++-
.../transaction/ProducerIdManager.scala | 167 +++++++++++--------
.../transaction/TransactionCoordinator.scala | 47 ++++--
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../transaction/ProducerIdsIntegrationTest.scala | 66 ++++++--
.../transaction/ProducerIdManagerTest.scala | 180 ++++++++++++++++-----
.../TransactionCoordinatorConcurrencyTest.scala | 7 +-
.../transaction/TransactionCoordinatorTest.scala | 11 +-
.../AddPartitionsToTxnRequestServerTest.scala | 9 +-
.../kafka/server/common/ProducerIdsBlock.java | 16 ++
.../kafka/server/common/ProducerIdsBlockTest.java | 25 +++
12 files changed, 414 insertions(+), 146 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index f6c91659356..b80817465a5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -3035,6 +3035,28 @@ public class SenderTest {
verifyErrorMessage(produceResponse(tp0, 0L, Errors.INVALID_REQUEST, 0,
-1, errorMessage), errorMessage);
}
+ @Test
+ public void testSenderShouldRetryWithBackoffOnRetriableError() {
+ final long producerId = 343434L;
+ TransactionManager transactionManager = createTransactionManager();
+ setupWithTransactionState(transactionManager);
+ long start = time.milliseconds();
+
+ // first request is sent immediately
+ prepareAndReceiveInitProducerId(producerId, (short) -1,
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ long request1 = time.milliseconds();
+ assertEquals(start, request1);
+
+ // backoff before sending second request
+ prepareAndReceiveInitProducerId(producerId, (short) -1,
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ long request2 = time.milliseconds();
+ assertEquals(RETRY_BACKOFF_MS, request2 - request1);
+
+ // third request should also backoff
+ prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+ assertEquals(RETRY_BACKOFF_MS, time.milliseconds() - request2);
+ }
+
private void verifyErrorMessage(ProduceResponse response, String
expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key",
"value");
sender.runOnce(); // connect
@@ -3191,7 +3213,7 @@ public class SenderTest {
}
private TransactionManager createTransactionManager() {
- return new TransactionManager(new LogContext(), null, 0, 100L, new
ApiVersions());
+ return new TransactionManager(new LogContext(), null, 0,
RETRY_BACKOFF_MS, new ApiVersions());
}
private void setupWithTransactionState(TransactionManager
transactionManager) {
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index f16785a7b6c..1e2b6ffac5a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -16,6 +16,7 @@
*/
package kafka.coordinator.transaction
+import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit,
NoRetry, RetryBackoffMs}
import kafka.server.{BrokerToControllerChannelManager,
ControllerRequestCompletionHandler}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
@@ -24,10 +25,11 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest,
AllocateProducerIdsResponse}
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.common.ProducerIdsBlock
-import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
+import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.util.{Failure, Success, Try}
/**
@@ -41,6 +43,9 @@ import scala.util.{Failure, Success, Try}
object ProducerIdManager {
// Once we reach this percentage of PIDs consumed from the current block,
trigger a fetch of the next block
val PidPrefetchThreshold = 0.90
+ val IterationLimit = 3
+ val RetryBackoffMs = 50
+ val NoRetry = -1L
// Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP
< 3.0-IV0
def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = {
@@ -49,16 +54,20 @@ object ProducerIdManager {
// Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >=
3.0-IV0
def rpc(brokerId: Int,
- brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager,
- maxWaitMs: Int): RPCProducerIdManager = {
- new RPCProducerIdManager(brokerId, brokerEpochSupplier, controllerChannel,
maxWaitMs)
+ time: Time,
+ brokerEpochSupplier: () => Long,
+ controllerChannel: BrokerToControllerChannelManager):
RPCProducerIdManager = {
+
+ new RPCProducerIdManager(brokerId, time, brokerEpochSupplier,
controllerChannel)
}
}
trait ProducerIdManager {
- def generateProducerId(): Long
+ def generateProducerId(): Try[Long]
def shutdown() : Unit = {}
+
+ // For testing purposes
+ def hasValidBlock: Boolean
}
object ZkProducerIdManager {
@@ -103,8 +112,7 @@ object ZkProducerIdManager {
}
}
-class ZkProducerIdManager(brokerId: Int,
- zkClient: KafkaZkClient) extends ProducerIdManager
with Logging {
+class ZkProducerIdManager(brokerId: Int, zkClient: KafkaZkClient) extends
ProducerIdManager with Logging {
this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: "
@@ -123,73 +131,103 @@ class ZkProducerIdManager(brokerId: Int,
}
}
- def generateProducerId(): Long = {
+ def generateProducerId(): Try[Long] = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
- allocateNewProducerIdBlock()
+ try {
+ allocateNewProducerIdBlock()
+ } catch {
+ case t: Throwable =>
+ return Failure(t)
+ }
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
- nextProducerId - 1
+ Success(nextProducerId - 1)
+ }
+ }
+
+ override def hasValidBlock: Boolean = {
+ this synchronized {
+ !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
}
}
}
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is
waiting on a new block.
+ */
class RPCProducerIdManager(brokerId: Int,
+ time: Time,
brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager,
- maxWaitMs: Int) extends ProducerIdManager with
Logging {
+ controllerChannel:
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
- private val nextProducerIdBlock = new
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+ // Visible for testing
+ private[transaction] var nextProducerIdBlock = new
AtomicReference[ProducerIdsBlock](null)
+ private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new
AtomicReference(ProducerIdsBlock.EMPTY)
private val requestInFlight = new AtomicBoolean(false)
- private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
- private var nextProducerId: Long = -1L
+ private val backoffDeadlineMs = new AtomicLong(NoRetry)
- override def generateProducerId(): Long = {
- this synchronized {
- if (nextProducerId == -1L) {
- // Send an initial request to get the first block
- maybeRequestNextBlock()
- nextProducerId = 0L
- } else {
- nextProducerId += 1
-
- // Check if we need to fetch the next block
- if (nextProducerId >= (currentProducerIdBlock.firstProducerId +
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
- maybeRequestNextBlock()
- }
- }
+ override def hasValidBlock: Boolean = {
+ nextProducerIdBlock.get != null
+ }
- // If we've exhausted the current block, grab the next block (waiting if
necessary)
- if (nextProducerId > currentProducerIdBlock.lastProducerId) {
- val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
- if (block == null) {
- // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT
since older clients treat the error as fatal
- // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
- throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out
waiting for next producer ID block")
- } else {
- block match {
- case Success(nextBlock) =>
- currentProducerIdBlock = nextBlock
- nextProducerId = currentProducerIdBlock.firstProducerId
- case Failure(t) => throw t
+ override def generateProducerId(): Try[Long] = {
+ var result: Try[Long] = null
+ var iteration = 0
+ while (result == null) {
+ currentProducerIdBlock.get.claimNextId().asScala match {
+ case None =>
+ // Check the next block if current block is full
+ val block = nextProducerIdBlock.getAndSet(null)
+ if (block == null) {
+ // Return COORDINATOR_LOAD_IN_PROGRESS rather than
REQUEST_TIMED_OUT since older clients treat the error as fatal
+ // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+ maybeRequestNextBlock()
+ result =
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is
full. Waiting for next block"))
+ } else {
+ currentProducerIdBlock.set(block)
+ requestInFlight.set(false)
+ iteration = iteration + 1
}
- }
+
+ case Some(nextProducerId) =>
+ // Check if we need to prefetch the next block
+ val prefetchTarget = currentProducerIdBlock.get.firstProducerId +
(currentProducerIdBlock.get.size *
ProducerIdManager.PidPrefetchThreshold).toLong
+ if (nextProducerId == prefetchTarget) {
+ maybeRequestNextBlock()
+ }
+ result = Success(nextProducerId)
+ }
+ if (iteration == IterationLimit) {
+ result =
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is
full. Waiting for next block"))
}
- nextProducerId
}
+ result
}
- private def maybeRequestNextBlock(): Unit = {
- if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false,
true)) {
- sendRequest()
+ // Visible for testing
+ private[transaction] def maybeRequestNextBlock(): Unit = {
+ val retryTimestamp = backoffDeadlineMs.get()
+ if (retryTimestamp == NoRetry || time.milliseconds() >= retryTimestamp) {
+ // Send a request only if we reached the retry deadline, or if no
deadline was set.
+
+ if (nextProducerIdBlock.get == null &&
+ requestInFlight.compareAndSet(false, true) ) {
+
+ sendRequest()
+ // Reset backoff after a successful send.
+ backoffDeadlineMs.set(NoRetry)
+ }
}
}
+ // Visible for testing
private[transaction] def sendRequest(): Unit = {
val message = new AllocateProducerIdsRequestData()
.setBrokerEpoch(brokerEpochSupplier.apply())
@@ -207,37 +245,40 @@ class RPCProducerIdManager(brokerId: Int,
})
}
+ // Visible for testing
private[transaction] def handleAllocateProducerIdsResponse(response:
AllocateProducerIdsResponse): Unit = {
- requestInFlight.set(false)
val data = response.data
+ var successfulResponse = false
Errors.forCode(data.errorCode()) match {
case Errors.NONE =>
debug(s"Got next producer ID block from controller $data")
// Do some sanity checks on the response
- if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
- nextProducerIdBlock.put(Failure(new KafkaException(
- s"Producer ID block is not monotonic with current block:
current=$currentProducerIdBlock response=$data")))
+ if (data.producerIdStart() <
currentProducerIdBlock.get.lastProducerId) {
+ error(s"Producer ID block is not monotonic with current block:
current=$currentProducerIdBlock response=$data")
} else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 ||
data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
- nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID
block includes invalid ID range: $data")))
+ error(s"Producer ID block includes invalid ID range: $data")
} else {
- nextProducerIdBlock.put(
- Success(new ProducerIdsBlock(brokerId, data.producerIdStart(),
data.producerIdLen())))
+ nextProducerIdBlock.set(new ProducerIdsBlock(brokerId,
data.producerIdStart(), data.producerIdLen()))
+ successfulResponse = true
}
case Errors.STALE_BROKER_EPOCH =>
- warn("Our broker epoch was stale, trying again.")
- maybeRequestNextBlock()
+ warn("Our broker currentBlockCount was stale, trying again.")
case Errors.BROKER_ID_NOT_REGISTERED =>
warn("Our broker ID is not yet known by the controller, trying again.")
- maybeRequestNextBlock()
case e: Errors =>
- warn("Had an unknown error from the controller, giving up.")
- nextProducerIdBlock.put(Failure(e.exception()))
+ error(s"Received an unexpected error code from the controller: $e")
+ }
+
+ if (!successfulResponse) {
+ // There is no need to compare and set because only one thread
+ // handles the AllocateProducerIds response.
+ backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
+ requestInFlight.set(false)
}
}
- private[transaction] def handleTimeout(): Unit = {
+ private def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
- maybeRequestNextBlock()
}
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index bb1b3792c83..7eda8f3b1f2 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.{LogContext,
ProducerIdAndEpoch, Time}
import org.apache.kafka.server.util.Scheduler
import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success}
object TransactionCoordinator {
@@ -113,8 +114,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
if (transactionalId == null) {
// if the transactional id is null, then always blindly accept the
request
// and return a new producerId from the producerId manager
- val producerId = producerIdManager.generateProducerId()
- responseCallback(InitProducerIdResult(producerId, producerEpoch = 0,
Errors.NONE))
+ producerIdManager.generateProducerId() match {
+ case Success(producerId) =>
+ responseCallback(InitProducerIdResult(producerId, producerEpoch = 0,
Errors.NONE))
+ case Failure(exception) =>
+
responseCallback(initTransactionError(Errors.forException(exception)))
+ }
} else if (transactionalId.isEmpty) {
// if transactional id is empty then return error as invalid request.
This is
// to make TransactionCoordinator's behavior consistent with producer
client
@@ -125,17 +130,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
} else {
val coordinatorEpochAndMetadata =
txnManager.getTransactionState(transactionalId).flatMap {
case None =>
- val producerId = producerIdManager.generateProducerId()
- val createdMetadata = new TransactionMetadata(transactionalId =
transactionalId,
- producerId = producerId,
- lastProducerId = RecordBatch.NO_PRODUCER_ID,
- producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
- lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
- txnTimeoutMs = transactionTimeoutMs,
- state = Empty,
- topicPartitions = collection.mutable.Set.empty[TopicPartition],
- txnLastUpdateTimestamp = time.milliseconds())
- txnManager.putTransactionStateIfNotExists(createdMetadata)
+ producerIdManager.generateProducerId() match {
+ case Success(producerId) =>
+ val createdMetadata = new TransactionMetadata(transactionalId =
transactionalId,
+ producerId = producerId,
+ lastProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = transactionTimeoutMs,
+ state = Empty,
+ topicPartitions = collection.mutable.Set.empty[TopicPartition],
+ txnLastUpdateTimestamp = time.milliseconds())
+ txnManager.putTransactionStateIfNotExists(createdMetadata)
+
+ case Failure(exception) =>
+ Left(Errors.forException(exception))
+ }
case Some(epochAndTxnMetadata) => Right(epochAndTxnMetadata)
}
@@ -231,9 +241,14 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// If the epoch is exhausted and the expected epoch (if provided)
matches it, generate a new producer ID
if (txnMetadata.isProducerEpochExhausted &&
expectedProducerIdAndEpoch.forall(_.epoch ==
txnMetadata.producerEpoch)) {
- val newProducerId = producerIdManager.generateProducerId()
- Right(txnMetadata.prepareProducerIdRotation(newProducerId,
transactionTimeoutMs, time.milliseconds(),
- expectedProducerIdAndEpoch.isDefined))
+
+ producerIdManager.generateProducerId() match {
+ case Success(producerId) =>
+ Right(txnMetadata.prepareProducerIdRotation(producerId,
transactionTimeoutMs, time.milliseconds(),
+ expectedProducerIdAndEpoch.isDefined))
+ case Failure(exception) =>
+ Left(Errors.forException(exception))
+ }
} else {
txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs,
expectedProducerIdAndEpoch.map(_.epoch),
time.milliseconds())
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 2bf29c32d97..edb645f561c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -295,9 +295,9 @@ class BrokerServer(
val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
+ time,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
- clientToControllerChannelManager,
- config.requestTimeoutMs
+ clientToControllerChannelManager
)
// Create transaction coordinator, but don't start it until we've
started replica manager.
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 10acd74241c..28c07840973 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -461,9 +461,9 @@ class KafkaServer(
val producerIdManager = if
(config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
ProducerIdManager.rpc(
config.brokerId,
+ time,
brokerEpochSupplier = brokerEpochSupplier,
- clientToControllerChannelManager,
- config.requestTimeoutMs
+ clientToControllerChannelManager
)
} else {
ProducerIdManager.zk(config.brokerId, zkClient)
diff --git
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 558f0041e0a..519c2bcf088 100644
---
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -24,14 +24,16 @@ import kafka.test.junit.ClusterTestExtensions
import kafka.test.{ClusterConfig, ClusterInstance}
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest,
InitProducerIdResponse}
import org.apache.kafka.server.common.MetadataVersion
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Disabled, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
import java.util.stream.{Collectors, IntStream}
+import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
clusterInstance.stop()
}
+ @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+ @Timeout(20)
+ def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance:
ClusterInstance): Unit = {
+
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp,
"1")
+ clusterInstance.start()
+ verifyUniqueIds(clusterInstance)
+ clusterInstance.stop()
+ }
+
+ @Disabled // TODO: Enable once producer id block size is configurable
(KAFKA-15029)
+ @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+ def testMultipleAllocateProducerIdsRequest(clusterInstance:
ClusterInstance): Unit = {
+
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp,
"2")
+ clusterInstance.start()
+ verifyUniqueIds(clusterInstance)
+ clusterInstance.stop()
+ }
+
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
- // Request enough PIDs from each broker to ensure each broker generates
two PID blocks
+ // Request enough PIDs from each broker to ensure each broker generates
two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker
=> {
- IntStream.range(0, 1001).parallel().mapToObj( _ =>
nextProducerId(broker, clusterInstance.clientListener()))
- }).collect(Collectors.toList[Long]).asScala.toSeq
+ IntStream.range(0, 1001).parallel().mapToObj( _ =>
+ nextProducerId(broker, clusterInstance.clientListener())
+ )}).collect(Collectors.toList[Long]).asScala.toSeq
- assertEquals(3003, ids.size, "Expected exactly 3003 IDs")
- assertEquals(ids.size, ids.distinct.size, "Found duplicate producer IDs")
+ val brokerCount = clusterInstance.brokerIds.size
+ val expectedTotalCount = 1001 * brokerCount
+ assertEquals(expectedTotalCount, ids.size, s"Expected exactly
$expectedTotalCount IDs")
+ assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate
producer IDs")
}
private def nextProducerId(broker: SocketServer, listener: ListenerName):
Long = {
- val data = new InitProducerIdRequestData()
- .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
- .setProducerId(RecordBatch.NO_PRODUCER_ID)
- .setTransactionalId(null)
- .setTransactionTimeoutMs(10)
- val request = new InitProducerIdRequest.Builder(data).build()
+ // Generating producer ids may fail while waiting for the initial block
and also
+ // when the current block is full and waiting for the prefetched block.
+ val deadline = 5.seconds.fromNow
+ var shouldRetry = true
+ var response: InitProducerIdResponse = null
+ while(shouldRetry && deadline.hasTimeLeft()) {
+ val data = new InitProducerIdRequestData()
+ .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
+ .setProducerId(RecordBatch.NO_PRODUCER_ID)
+ .setTransactionalId(null)
+ .setTransactionTimeoutMs(10)
+ val request = new InitProducerIdRequest.Builder(data).build()
+
+ response =
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
+ destination = broker,
+ listenerName = listener)
- val response =
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
- destination = broker,
- listenerName = listener)
+ shouldRetry = response.data.errorCode ==
Errors.COORDINATOR_LOAD_IN_PROGRESS.code
+ }
+ assertTrue(deadline.hasTimeLeft())
+ assertEquals(Errors.NONE.code, response.data.errorCode)
response.data().producerId()
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 666a3c363ff..73b208196e6 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -16,13 +16,16 @@
*/
package kafka.coordinator.transaction
+import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
import kafka.server.BrokerToControllerChannelManager
+import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
+import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -31,7 +34,11 @@ import org.junit.jupiter.params.provider.{EnumSource,
ValueSource}
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}
-import java.util.stream.IntStream
+
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable
+import scala.util.{Failure, Success}
class ProducerIdManagerTest {
@@ -39,20 +46,48 @@ class ProducerIdManagerTest {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
// Mutable test implementation that lets us easily set the idStart and error
- class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen:
Int, var error: Errors = Errors.NONE, timeout: Boolean = false)
- extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) {
+ class MockProducerIdManager(
+ val brokerId: Int,
+ var idStart: Long,
+ val idLen: Int,
+ var error: Errors = Errors.NONE,
+ val isErroneousBlock: Boolean = false,
+ val time: Time = Time.SYSTEM,
+ var remainingRetries: Int = 1
+ ) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {
+
+ private val brokerToControllerRequestExecutor =
Executors.newSingleThreadExecutor()
+ val capturedFailure: AtomicBoolean = new AtomicBoolean(false)
override private[transaction] def sendRequest(): Unit = {
- if (timeout)
- return
- if (error == Errors.NONE) {
- handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
- new
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
- idStart += idLen
+ brokerToControllerRequestExecutor.submit(() => {
+ if (error == Errors.NONE) {
+ handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
+ new
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
+ if (!isErroneousBlock) {
+ idStart += idLen
+ }
+ } else {
+ handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
+ new AllocateProducerIdsResponseData().setErrorCode(error.code)))
+ }
+ }, 0)
+ }
+
+ override private[transaction] def
handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit
= {
+ super.handleAllocateProducerIdsResponse(response)
+ capturedFailure.set(nextProducerIdBlock.get == null)
+ }
+
+ override private[transaction] def maybeRequestNextBlock(): Unit = {
+ if (error == Errors.NONE && !isErroneousBlock) {
+ super.maybeRequestNextBlock()
} else {
- handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
- new AllocateProducerIdsResponseData().setErrorCode(error.code)))
+ if (remainingRetries > 0) {
+ super.maybeRequestNextBlock()
+ remainingRetries -= 1
+ }
}
}
}
@@ -80,26 +115,20 @@ class ProducerIdManagerTest {
val manager1 = new ZkProducerIdManager(0, zkClient)
val manager2 = new ZkProducerIdManager(1, zkClient)
- val pid1 = manager1.generateProducerId()
- val pid2 = manager2.generateProducerId()
+ val pid1 = manager1.generateProducerId().get
+ val pid2 = manager2.generateProducerId().get
assertEquals(0, pid1)
assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, pid2)
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
- assertEquals(pid1 + i, manager1.generateProducerId())
+ assertEquals(pid1 + i, manager1.generateProducerId().get)
for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
- assertEquals(pid2 + i, manager2.generateProducerId())
-
- assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE,
manager1.generateProducerId())
- assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2,
manager2.generateProducerId())
- }
+ assertEquals(pid2 + i, manager2.generateProducerId().get)
- @Test
- def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = {
- val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true)
- assertThrows(classOf[CoordinatorLoadInProgressException], () =>
manager1.generateProducerId())
+ assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE,
manager1.generateProducerId().get)
+ assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2,
manager2.generateProducerId().get)
}
@Test
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
}
@ParameterizedTest
- @ValueSource(ints = Array(1, 2, 10))
- def testContiguousIds(idBlockLen: Int): Unit = {
+ @ValueSource(ints = Array(1, 2, 10, 100))
+ def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+ // Send concurrent generateProducerId requests. Ensure that the generated
producer id is unique.
+ // For each block (total 3 blocks), only "idBlockLen" number of requests
should go through.
+ // All other requests should fail immediately.
+
+ val numThreads = 5
+ val latch = new CountDownLatch(idBlockLen * 3)
val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
- IntStream.range(0, idBlockLen * 3).forEach { i =>
- assertEquals(i, manager.generateProducerId())
+ val pidMap = mutable.Map[Long, Int]()
+ val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+ for ( _ <- 0 until numThreads) {
+ requestHandlerThreadPool.submit(() => {
+ while(latch.getCount > 0) {
+ val result = manager.generateProducerId()
+ result match {
+ case Success(pid) =>
+ pidMap synchronized {
+ if (latch.getCount != 0) {
+ val counter = pidMap.getOrElse(pid, 0)
+ pidMap += pid -> (counter + 1)
+ latch.countDown()
+ }
+ }
+
+ case Failure(exception) =>
+ assertEquals(classOf[CoordinatorLoadInProgressException],
exception.getClass)
+ }
+ Thread.sleep(100)
+ }
+ }, 0)
+ }
+ assertTrue(latch.await(12000, TimeUnit.MILLISECONDS))
+ requestHandlerThreadPool.shutdown()
+
+ assertEquals(idBlockLen * 3, pidMap.size)
+ pidMap.foreach { case (pid, count) =>
+ assertEquals(1, count)
+ assertTrue(pid < (3 * idBlockLen) + numThreads, s"Unexpected pid $pid; "
+
+ s"non-contiguous blocks generated or did not fully exhaust blocks.")
}
}
@ParameterizedTest
@EnumSource(value = classOf[Errors], names = Array("UNKNOWN_SERVER_ERROR",
"INVALID_REQUEST"))
def testUnrecoverableErrors(error: Errors): Unit = {
- val manager = new MockProducerIdManager(0, 0, 1)
- assertEquals(0, manager.generateProducerId())
+ val time = new MockTime()
+ val manager = new MockProducerIdManager(0, 0, 1, time = time)
+
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
manager.error = error
- assertThrows(classOf[Throwable], () => manager.generateProducerId())
+ verifyFailure(manager)
manager.error = Errors.NONE
- assertEquals(1, manager.generateProducerId())
+ time.sleep(RetryBackoffMs)
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 1, 1), 1)
}
@Test
def testInvalidRanges(): Unit = {
- var manager = new MockProducerIdManager(0, -1, 10)
- assertThrows(classOf[KafkaException], () => manager.generateProducerId())
+ var manager = new MockProducerIdManager(0, -1, 10, isErroneousBlock = true)
+ verifyFailure(manager)
+
+ manager = new MockProducerIdManager(0, 0, -1, isErroneousBlock = true)
+ verifyFailure(manager)
+
+ manager = new MockProducerIdManager(0, Long.MaxValue-1, 10,
isErroneousBlock = true)
+ verifyFailure(manager)
+ }
+
+ @Test
+ def testRetryBackoff(): Unit = {
+ val time = new MockTime()
+ val manager = new MockProducerIdManager(0, 0, 1,
+ error = Errors.UNKNOWN_SERVER_ERROR, time = time, remainingRetries = 2)
+
+ verifyFailure(manager)
+ manager.error = Errors.NONE
+
+ // We should only get a new block once retry backoff ms has passed.
+ assertEquals(classOf[CoordinatorLoadInProgressException],
manager.generateProducerId().failed.get.getClass)
+ time.sleep(RetryBackoffMs)
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+ }
+
+ private def verifyFailure(manager: MockProducerIdManager): Unit = {
+ assertEquals(classOf[CoordinatorLoadInProgressException],
manager.generateProducerId().failed.get.getClass)
+ TestUtils.waitUntilTrue(() => {
+ manager synchronized {
+ manager.capturedFailure.get
+ }
+ }, "Expected failure")
+ manager.capturedFailure.set(false)
+ }
- manager = new MockProducerIdManager(0, 0, -1)
- assertThrows(classOf[KafkaException], () => manager.generateProducerId())
+ private def verifyNewBlockAndProducerId(manager: MockProducerIdManager,
+ expectedBlock: ProducerIdsBlock,
+ expectedPid: Long): Unit = {
- manager = new MockProducerIdManager(0, Long.MaxValue-1, 10)
- assertThrows(classOf[KafkaException], () => manager.generateProducerId())
+ assertEquals(classOf[CoordinatorLoadInProgressException],
manager.generateProducerId().failed.get.getClass)
+ TestUtils.waitUntilTrue(() => {
+ val nextBlock = manager.nextProducerIdBlock.get
+ nextBlock != null && nextBlock.equals(expectedBlock)
+ }, "failed to generate block")
+ assertEquals(expectedPid, manager.generateProducerId().get)
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 9a0d8143766..d57a8e974c6 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -43,6 +43,7 @@ import org.mockito.Mockito.{mock, when}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, mutable}
+import scala.util.Success
class TransactionCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest[Transaction] {
private val nTransactions = nThreads * 10
@@ -82,7 +83,11 @@ class TransactionCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurren
val pidGenerator: ProducerIdManager = mock(classOf[ProducerIdManager])
when(pidGenerator.generateProducerId())
- .thenAnswer(_ => if (bumpProducerId) producerId + 1 else producerId)
+ .thenAnswer(_ => if (bumpProducerId) {
+ Success(producerId + 1)
+ } else {
+ Success(producerId)
+ })
val brokerNode = new Node(0, "host", 10)
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
when(metadataCache.getPartitionLeaderEndpoint(
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index ab8e1052f93..a5e2d57d87f 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -31,6 +31,7 @@ import org.mockito.Mockito.{mock, times, verify, when}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
+import scala.util.Success
class TransactionCoordinatorTest {
@@ -46,7 +47,7 @@ class TransactionCoordinatorTest {
val brokerId = 0
val coordinatorEpoch = 0
private val transactionalId = "known"
- private val producerId = 10
+ private val producerId = 10L
private val producerEpoch: Short = 1
private val txnTimeoutMs = 1
@@ -68,7 +69,7 @@ class TransactionCoordinatorTest {
private def mockPidGenerator(): Unit = {
when(pidGenerator.generateProducerId()).thenAnswer(_ => {
nextPid += 1
- nextPid - 1
+ Success(nextPid - 1)
})
}
@@ -908,7 +909,7 @@ class TransactionCoordinatorTest {
(Short.MaxValue - 2).toShort, txnTimeoutMs, Empty, partitions,
time.milliseconds, time.milliseconds)
when(pidGenerator.generateProducerId())
- .thenReturn(producerId + 1)
+ .thenReturn(Success(producerId + 1))
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
@@ -949,7 +950,7 @@ class TransactionCoordinatorTest {
(Short.MaxValue - 2).toShort, txnTimeoutMs, Empty, partitions,
time.milliseconds, time.milliseconds)
when(pidGenerator.generateProducerId())
- .thenReturn(producerId + 1)
+ .thenReturn(Success(producerId + 1))
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
@@ -1208,7 +1209,7 @@ class TransactionCoordinatorTest {
private def validateIncrementEpochAndUpdateMetadata(state:
TransactionState): Unit = {
when(pidGenerator.generateProducerId())
- .thenReturn(producerId)
+ .thenReturn(Success(producerId))
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
.thenReturn(true)
diff --git
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index e59ed821c21..6e296c2892b 100644
---
a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -174,8 +174,13 @@ class AddPartitionsToTxnRequestServerTest extends
BaseRequestTest {
val findCoordinatorResponse =
connectAndReceive[FindCoordinatorResponse](findCoordinatorRequest,
brokerSocketServer(brokers.head.config.brokerId))
val coordinatorId =
findCoordinatorResponse.data().coordinators().get(0).nodeId()
- val initPidRequest = new InitProducerIdRequest.Builder(new
InitProducerIdRequestData().setTransactionalId(transactionalId).setTransactionTimeoutMs(10000)).build()
- val initPidResponse =
connectAndReceive[InitProducerIdResponse](initPidRequest,
brokerSocketServer(coordinatorId))
+ var initPidResponse: InitProducerIdResponse = null
+
+ TestUtils.waitUntilTrue(() => {
+ val initPidRequest = new InitProducerIdRequest.Builder(new
InitProducerIdRequestData().setTransactionalId(transactionalId).setTransactionTimeoutMs(10000)).build()
+ initPidResponse =
connectAndReceive[InitProducerIdResponse](initPidRequest,
brokerSocketServer(coordinatorId))
+ initPidResponse.error() != Errors.COORDINATOR_LOAD_IN_PROGRESS
+ }, "Failed to get a valid InitProducerIdResponse.")
val producerId1 = initPidResponse.data().producerId()
val producerEpoch1 = initPidResponse.data().producerEpoch()
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
index b2633bf7034..c4240018f9b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java
@@ -18,6 +18,8 @@
package org.apache.kafka.server.common;
import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Holds a range of Producer IDs used for Transactional and EOS producers.
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
private final int assignedBrokerId;
private final long firstProducerId;
private final int blockSize;
+ private final AtomicLong producerIdCounter;
public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int
blockSize) {
this.assignedBrokerId = assignedBrokerId;
this.firstProducerId = firstProducerId;
this.blockSize = blockSize;
+ producerIdCounter = new AtomicLong(firstProducerId);
+ }
+
+ /**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in
the block.
+ */
+ public Optional<Long> claimNextId() {
+ long nextId = producerIdCounter.getAndIncrement();
+ if (nextId > lastProducerId()) {
+ return Optional.empty();
+ }
+ return Optional.of(nextId);
}
/**
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
index f15c171a2c0..ea7973d7264 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/ProducerIdsBlockTest.java
@@ -18,7 +18,14 @@ package org.apache.kafka.server.common;
import org.junit.jupiter.api.Test;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class ProducerIdsBlockTest {
@@ -43,4 +50,22 @@ class ProducerIdsBlockTest {
assertEquals(brokerId, block.assignedBrokerId());
}
+ @Test
+ public void testClaimNextId() throws Exception {
+ for (int i = 0; i < 50; i++) {
+ ProducerIdsBlock block = new ProducerIdsBlock(0, 1, 1);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong counter = new AtomicLong(0);
+ CompletableFuture.runAsync(() -> {
+ Optional<Long> pid = block.claimNextId();
+ counter.addAndGet(pid.orElse(0L));
+ latch.countDown();
+ });
+ Optional<Long> pid = block.claimNextId();
+ counter.addAndGet(pid.orElse(0L));
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+ }
+ }
+
}
\ No newline at end of file