This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 3ff56ebfc7a KAFKA-14417: Address incompatible error code returned by
broker from `InitProducerId` (#12968)
3ff56ebfc7a is described below
commit 3ff56ebfc7ac06bb8b0b604ff9780ddcf6da6f2c
Author: Justine Olshan <[email protected]>
AuthorDate: Mon Dec 19 09:33:11 2022 -0800
KAFKA-14417: Address incompatible error code returned by broker from
`InitProducerId` (#12968)
Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned
from `InitProducerId` when the next producerId block cannot be fetched from the
controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead
which is retriable.
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/coordinator/transaction/ProducerIdManager.scala | 5 +++--
.../coordinator/transaction/ProducerIdManagerTest.scala | 13 +++++++++++--
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index e1f46eb3712..f16785a7b6c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -167,7 +167,9 @@ class RPCProducerIdManager(brokerId: Int,
if (nextProducerId > currentProducerIdBlock.lastProducerId) {
val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
if (block == null) {
- throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next
producer ID block")
+ // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT
since older clients treat the error as fatal
+ // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+ throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out
waiting for next producer ID block")
} else {
block match {
case Success(nextBlock) =>
@@ -236,7 +238,6 @@ class RPCProducerIdManager(brokerId: Int,
private[transaction] def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
- nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
maybeRequestNextBlock()
}
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eefe61d17d6..666a3c363ff 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,6 +19,7 @@ package kafka.coordinator.transaction
import kafka.server.BrokerToControllerChannelManager
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -30,7 +31,6 @@ import org.junit.jupiter.params.provider.{EnumSource,
ValueSource}
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mock, when}
-
import java.util.stream.IntStream
class ProducerIdManagerTest {
@@ -39,10 +39,13 @@ class ProducerIdManagerTest {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
// Mutable test implementation that lets us easily set the idStart and error
- class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen:
Int, var error: Errors = Errors.NONE)
+ class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen:
Int, var error: Errors = Errors.NONE, timeout: Boolean = false)
extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) {
override private[transaction] def sendRequest(): Unit = {
+ if (timeout)
+ return
+
if (error == Errors.NONE) {
handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
new
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
@@ -93,6 +96,12 @@ class ProducerIdManagerTest {
assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2,
manager2.generateProducerId())
}
+ @Test
+ def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = {
+ val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true)
+ assertThrows(classOf[CoordinatorLoadInProgressException], () =>
manager1.generateProducerId())
+ }
+
@Test
def testExceedProducerIdLimitZk(): Unit = {
when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {