hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1170301813


##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -207,35 +234,38 @@ class RPCProducerIdManager(brokerId: Int,
     })
   }
 
+  // Visible for testing
   private[transaction] def handleAllocateProducerIdsResponse(response: 
AllocateProducerIdsResponse): Unit = {
-    requestInFlight.set(false)
     val data = response.data
+    var successfulResponse = false
     Errors.forCode(data.errorCode()) match {
       case Errors.NONE =>
         debug(s"Got next producer ID block from controller $data")
         // Do some sanity checks on the response
-        if (data.producerIdStart() < currentProducerIdBlock.lastProducerId) {
-          nextProducerIdBlock.put(Failure(new KafkaException(
-            s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")))
+        if (data.producerIdStart() < 
currentProducerIdBlock.get.lastProducerId) {
+          error(s"Producer ID block is not monotonic with current block: 
current=$currentProducerIdBlock response=$data")
         } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || 
data.producerIdStart() > Long.MaxValue - data.producerIdLen()) {
-          nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID 
block includes invalid ID range: $data")))
+          error(s"Producer ID block includes invalid ID range: $data")
         } else {
-          nextProducerIdBlock.put(
-            Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), 
data.producerIdLen())))
+          nextProducerIdBlock.set(new ProducerIdsBlock(brokerId, 
data.producerIdStart(), data.producerIdLen()))
+          successfulResponse = true
         }
       case Errors.STALE_BROKER_EPOCH =>
-        warn("Our broker epoch was stale, trying again.")
-        maybeRequestNextBlock()
+        warn("Our broker currentBlockCount was stale, trying again.")
       case Errors.BROKER_ID_NOT_REGISTERED =>
         warn("Our broker ID is not yet known by the controller, trying again.")
-        maybeRequestNextBlock()
       case e: Errors =>
-        warn("Had an unknown error from the controller, giving up.")
-        nextProducerIdBlock.put(Failure(e.exception()))
+        error(s"Had an unknown error from the controller: ${e.exception}")

Review Comment:
   nit: maybe we can rephrase this message a little for clarity
   ```scala
   error(s"Received an unexpected error code from the controller: $e")
   ```



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+                           controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {

Review Comment:
   It's probably unlikely for this loop to continue indefinitely, but I wonder 
if we should bound the retries to a fixed small number (say 3). if we break the 
loop and we didn't get a result, we can return LOAD_IN_PROGRESS. What do you 
think?



##########
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##########
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
     }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        allocateNewProducerIdBlock()
+        try {
+          allocateNewProducerIdBlock()
+        } catch {
+          case t: Throwable =>
+            return Failure(t)
+        }
         nextProducerId = currentProducerIdBlock.firstProducerId
       }
       nextProducerId += 1
-      nextProducerId - 1
+      Success(nextProducerId - 1)
+    }
+  }
+
+  override def hasValidBlock: Boolean = {
+    this synchronized {
+      !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
     }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+                           time: Time,
                            brokerEpochSupplier: () => Long,
-                           controllerChannel: BrokerToControllerChannelManager,
-                           maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+                           controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-    this synchronized {
-      if (nextProducerId == -1L) {
-        // Send an initial request to get the first block
-        maybeRequestNextBlock()
-        nextProducerId = 0L
-      } else {
-        nextProducerId += 1
-
-        // Check if we need to fetch the next block
-        if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-          maybeRequestNextBlock()
-        }
-      }
+  override def hasValidBlock: Boolean = {
+    nextProducerIdBlock.get != null
+  }
 
-      // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-        val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-        if (block == null) {
-          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-        } else {
-          block match {
-            case Success(nextBlock) =>
-              currentProducerIdBlock = nextBlock
-              nextProducerId = currentProducerIdBlock.firstProducerId
-            case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+    var result: Try[Long] = null
+    while (result == null) {
+      currentProducerIdBlock.get.claimNextId().asScala match {
+        case None =>
+          // Check the next block if current block is full
+          val block = nextProducerIdBlock.getAndSet(null)
+          if (block == null) {
+            // Return COORDINATOR_LOAD_IN_PROGRESS rather than 
REQUEST_TIMED_OUT since older clients treat the error as fatal
+            // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+            maybeRequestNextBlock()
+            result = 
Failure(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is 
full. Waiting for next block"))
+          } else {
+            currentProducerIdBlock.set(block)
+            requestInFlight.set(false)
           }
-        }
+
+        case Some(nextProducerId) =>
+          // Check if we need to prefetch the next block
+          val prefetchTarget = currentProducerIdBlock.get.firstProducerId + 
(currentProducerIdBlock.get.size * 
ProducerIdManager.PidPrefetchThreshold).toLong
+          if (nextProducerId == prefetchTarget) {
+            maybeRequestNextBlock()
+          }
+          result = Success(nextProducerId)
       }
-      nextProducerId
     }
+    result
   }
 
 
-  private def maybeRequestNextBlock(): Unit = {
-    if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, 
true)) {
+  // Visible for testing
+  private[transaction] def maybeRequestNextBlock(): Unit = {
+    if (nextProducerIdBlock.get == null &&
+      requestInFlight.compareAndSet(false, true) ) {
+
+      if (shouldBackoff.compareAndSet(true, false)) {
+        time.sleep(RetryBackoffMs)

Review Comment:
   Sleeping here will block the request thread. I was thinking instead of 
sleeping that we could set a backoff deadline and then check if it has been 
reached. If not, then we break the loop early and return LOAD_IN_PROGRESS. 
Would that work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to