hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1199413525
##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
}
}
- def generateProducerId(): Long = {
+ def generateProducerId(): Try[Long] = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
- allocateNewProducerIdBlock()
+ try {
+ allocateNewProducerIdBlock()
+ } catch {
+ case t: Throwable =>
+ return Failure(t)
+ }
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
- nextProducerId - 1
+ Success(nextProducerId - 1)
+ }
+ }
+
+ override def hasValidBlock: Boolean = {
+ this synchronized {
+ !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
}
}
}
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is
waiting on a new block.
+ */
class RPCProducerIdManager(brokerId: Int,
+ time: Time,
brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager,
- maxWaitMs: Int) extends ProducerIdManager with
Logging {
+ controllerChannel:
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
- private val nextProducerIdBlock = new
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+ private val IterationLimit = 3
Review Comment:
Since this is constant, maybe we can move it to the companion class?
##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +130,107 @@ class ZkProducerIdManager(brokerId: Int,
}
}
- def generateProducerId(): Long = {
+ def generateProducerId(): Try[Long] = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
- allocateNewProducerIdBlock()
+ try {
+ allocateNewProducerIdBlock()
+ } catch {
+ case t: Throwable =>
+ return Failure(t)
+ }
nextProducerId = currentProducerIdBlock.firstProducerId
}
nextProducerId += 1
- nextProducerId - 1
+ Success(nextProducerId - 1)
+ }
+ }
+
+ override def hasValidBlock: Boolean = {
+ this synchronized {
+ !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
}
}
}
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is
waiting on a new block.
+ */
class RPCProducerIdManager(brokerId: Int,
+ time: Time,
brokerEpochSupplier: () => Long,
- controllerChannel: BrokerToControllerChannelManager,
- maxWaitMs: Int) extends ProducerIdManager with
Logging {
+ controllerChannel:
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
- private val nextProducerIdBlock = new
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+ private val IterationLimit = 3
+ // Visible for testing
+ private[transaction] var nextProducerIdBlock = new
AtomicReference[ProducerIdsBlock](null)
+ private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new
AtomicReference(ProducerIdsBlock.EMPTY)
private val requestInFlight = new AtomicBoolean(false)
- private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
- private var nextProducerId: Long = -1L
+ private val backoffDeadlineMs = new AtomicLong(NoRetry)
- override def generateProducerId(): Long = {
- this synchronized {
- if (nextProducerId == -1L) {
- // Send an initial request to get the first block
- maybeRequestNextBlock()
- nextProducerId = 0L
- } else {
- nextProducerId += 1
+ override def hasValidBlock: Boolean = {
+ nextProducerIdBlock.get != null
+ }
- // Check if we need to fetch the next block
- if (nextProducerId >= (currentProducerIdBlock.firstProducerId +
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
- maybeRequestNextBlock()
- }
- }
+ override def generateProducerId(): Try[Long] = {
+ var result: Try[Long] = null
+ var iteration = 0
+ while (result == null) {
+ currentProducerIdBlock.get.claimNextId().asScala match {
+ case None =>
+ // Check the next block if current block is full
+ val block = nextProducerIdBlock.getAndSet(null)
+ if (block == null) {
+ // Return COORDINATOR_LOAD_IN_PROGRESS rather than
REQUEST_TIMED_OUT since older clients treat the error as fatal
+ // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+ maybeRequestNextBlock()
+ result =
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is
full. Waiting for next block"))
+ } else {
+ currentProducerIdBlock.set(block)
+ requestInFlight.set(false)
+ iteration = iteration + 1
+ }
- // If we've exhausted the current block, grab the next block (waiting if
necessary)
- if (nextProducerId > currentProducerIdBlock.lastProducerId) {
- val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
- if (block == null) {
- // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT
since older clients treat the error as fatal
- // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
- throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out
waiting for next producer ID block")
- } else {
- block match {
- case Success(nextBlock) =>
- currentProducerIdBlock = nextBlock
- nextProducerId = currentProducerIdBlock.firstProducerId
- case Failure(t) => throw t
+ case Some(nextProducerId) =>
+ // Check if we need to prefetch the next block
+ val prefetchTarget = currentProducerIdBlock.get.firstProducerId +
(currentProducerIdBlock.get.size *
ProducerIdManager.PidPrefetchThreshold).toLong
+ if (nextProducerId == prefetchTarget) {
+ maybeRequestNextBlock()
}
- }
+ result = Success(nextProducerId)
+ }
+ if (iteration == IterationLimit) {
+ result =
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is
full. Waiting for next block"))
}
- nextProducerId
}
+ result
}
- private def maybeRequestNextBlock(): Unit = {
- if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false,
true)) {
- sendRequest()
+ // Visible for testing
+ private[transaction] def maybeRequestNextBlock(): Unit = {
+ if (nextProducerIdBlock.get == null &&
+ requestInFlight.compareAndSet(false, true) ) {
Review Comment:
Hmm, this check seems backwards to me. Could we check the backoff deadline
first and then use `requestInFlight` to gate the sending of the request? If
this thread sends the request successfully, then it can reset the backoff
deadline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]