This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a3b34c13151 KAFKA-18662: Return CONCURRENT_TRANSACTIONS on produce 
request in TV2 (#18733)
a3b34c13151 is described below

commit a3b34c131517d5aa6c15660a977640802e070d54
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 29 10:15:48 2025 -0800

    KAFKA-18662: Return CONCURRENT_TRANSACTIONS on produce request in TV2 
(#18733)
    
    While testing, it was found that the not_enough_replicas error was super 
common and could be easily confused. Since we are already bumping the request, 
we can signify that the produce request may return this error and new clients 
can handle it
    
    (Note, the java client should be able to handle this already as a retriable 
error, but other client libraries may need to implement this change)
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../kafka/common/requests/ProduceResponse.java     |  1 +
 .../kafka/clients/producer/KafkaProducerTest.java  | 59 ++++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   | 12 +++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 64 ++++++++++++++++++++--
 4 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 186ad9b80a1..99f7f475ba5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
  * {@link Errors#INVALID_RECORD}
  * {@link Errors#INVALID_TXN_STATE}
  * {@link Errors#INVALID_PRODUCER_ID_MAPPING}
+ * {@link Errors#CONCURRENT_TRANSACTIONS}
  */
 public class ProduceResponse extends AbstractResponse {
     public static final long INVALID_OFFSET = -1L;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 0f497b7936d..09662c0a3d6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -1401,6 +1401,65 @@ public class KafkaProducerTest {
         }
     }
 
+    @Test
+    public void testTransactionV2ProduceWithConcurrentTransactionError() 
throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        String topic = "foo";
+        TopicPartition topicPartition = new TopicPartition(topic, 0);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+        when(ctx.metadata.fetch()).thenReturn(cluster);
+
+        long timestamp = ctx.time.milliseconds();
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, 
timestamp, "key", "value");
+
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some-txn");
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        ProducerConfig config = new ProducerConfig(props);
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+        ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+        NodeApiVersions nodeApiVersions = new 
NodeApiVersions(NodeApiVersions.create().allSupportedApiVersions().values(),
+            Arrays.asList(new ApiVersionsResponseData.SupportedFeatureKey()
+                .setName("transaction.version")
+                .setMaxVersion((short) 2)
+                .setMinVersion((short) 0)),
+            Arrays.asList(new ApiVersionsResponseData.FinalizedFeatureKey()
+                .setName("transaction.version")
+                .setMaxVersionLevel((short) 2)
+                .setMinVersionLevel((short) 2)),
+            0);
+        client.setNodeApiVersions(nodeApiVersions);
+        ApiVersions apiVersions = new ApiVersions();
+        apiVersions.update(NODE.idString(), nodeApiVersions);
+
+        ProducerInterceptors<String, String> interceptor = new 
ProducerInterceptors<>(Collections.emptyList());
+
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
+        client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+        client.prepareResponse(produceResponse(topicPartition, 1L, 
Errors.CONCURRENT_TRANSACTIONS, 0, 1));
+        client.prepareResponse(produceResponse(topicPartition, 1L, 
Errors.NONE, 0, 1));
+        client.prepareResponse(endTxnResponse(Errors.NONE));
+
+        try (KafkaProducer<String, String> producer = new KafkaProducer<>(
+            config, new StringSerializer(), new StringSerializer(), metadata, 
client, interceptor, apiVersions, time)
+        ) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(record).get();
+            producer.commitTransaction();
+        }
+    }
+
     @Test
     public void testMeasureAbortTransactionDuration() {
         Map<String, Object> configs = new HashMap<>();
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d8e415028ea..87d1ca927bd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -738,12 +738,20 @@ class ReplicaManager(val config: KafkaConfig,
               // retry correctly. Translate these to an error which will cause 
such clients to retry
               // the produce request. We pick `NOT_ENOUGH_REPLICAS` because it 
does not trigger a
               // metadata refresh.
-              case Errors.CONCURRENT_TRANSACTIONS |
-                   Errors.NETWORK_EXCEPTION |
+              case Errors.NETWORK_EXCEPTION |
                    Errors.COORDINATOR_LOAD_IN_PROGRESS |
                    Errors.COORDINATOR_NOT_AVAILABLE |
                    Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
                 s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+              case Errors.CONCURRENT_TRANSACTIONS =>
+                if (transactionSupportedOperation != addPartition) {
+                  Some(new NotEnoughReplicasException(
+                    s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+                } else {
+                  // Don't convert the Concurrent Transaction exception for 
TV2. Because the error is very common during
+                  // the transaction commit phase. Returning Concurrent 
Transaction is less confusing to the client.
+                  None
+                }
               case _ => None
             }
           topicPartition -> LogAppendResult(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 12f4710fede..ba6b6f80a27 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -118,7 +118,6 @@ class ReplicaManagerTest {
   private var mockRemoteLogManager: RemoteLogManager = _
   private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
   private var brokerTopicStats: BrokerTopicStats = _
-  private val transactionSupportedOperation = genericErrorSupported
   private val quotaExceededThrottleTime = 1000
   private val quotaAvailableThrottleTime = 0
 
@@ -2516,6 +2515,55 @@ class ReplicaManagerTest {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(
+    value = classOf[Errors],
+    names = Array(
+      "NOT_COORDINATOR",
+      "NETWORK_EXCEPTION",
+      "COORDINATOR_LOAD_IN_PROGRESS",
+      "COORDINATOR_NOT_AVAILABLE"
+    )
+  )
+  def testVerificationErrorConversionsTV2(error: Errors): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0))
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
+        (_, _) => ())
+
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      // Start verification and return the coordinator related errors.
+      val expectedMessage = s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}"
+      val result = handleProduceAppend(replicaManager, tp0, 
transactionalRecords, transactionalId = transactionalId, 
transactionSupportedOperation = addPartition)
+      val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, times(1)).addOrVerifyTransaction(
+        ArgumentMatchers.eq(transactionalId),
+        ArgumentMatchers.eq(producerId),
+        ArgumentMatchers.eq(producerEpoch),
+        ArgumentMatchers.eq(Seq(tp0)),
+        appendCallback.capture(),
+        any()
+      )
+
+      // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+      val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+      callback(Map(tp0 -> error).toMap)
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedMessage, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
   @ParameterizedTest
   @EnumSource(
     value = classOf[Errors],
@@ -2527,7 +2575,7 @@ class ReplicaManagerTest {
       "COORDINATOR_NOT_AVAILABLE"
     )
   )
-  def testVerificationErrorConversions(error: Errors): Unit = {
+  def testVerificationErrorConversionsTV1(error: Errors): Unit = {
     val tp0 = new TopicPartition(topic, 0)
     val producerId = 24L
     val producerEpoch = 0.toShort
@@ -2869,7 +2917,9 @@ class ReplicaManagerTest {
                                                   entriesToAppend: 
Map[TopicPartition, MemoryRecords],
                                                   transactionalId: String,
                                                   origin: AppendOrigin = 
AppendOrigin.CLIENT,
-                                                  requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
+                                                  requiredAcks: Short = -1,
+                                                  
transactionSupportedOperation: TransactionSupportedOperation = 
genericErrorSupported
+                                                 ): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
     val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
       responses.foreach( response => 
assertTrue(responses.get(response._1).isDefined))
@@ -2894,7 +2944,9 @@ class ReplicaManagerTest {
                                   records: MemoryRecords,
                                   origin: AppendOrigin = AppendOrigin.CLIENT,
                                   requiredAcks: Short = -1,
-                                  transactionalId: String): 
CallbackResult[PartitionResponse] = {
+                                  transactionalId: String,
+                                  transactionSupportedOperation: 
TransactionSupportedOperation = genericErrorSupported
+                                 ): CallbackResult[PartitionResponse] = {
     val result = new CallbackResult[PartitionResponse]()
 
     def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
@@ -2922,7 +2974,9 @@ class ReplicaManagerTest {
                                                             transactionalId: 
String,
                                                             producerId: Long,
                                                             producerEpoch: 
Short,
-                                                            baseSequence: Int 
= 0): CallbackResult[Either[Errors, VerificationGuard]] = {
+                                                            baseSequence: Int 
= 0,
+                                                            
transactionSupportedOperation: TransactionSupportedOperation = 
genericErrorSupported
+                                                           ): 
CallbackResult[Either[Errors, VerificationGuard]] = {
     val result = new CallbackResult[Either[Errors, VerificationGuard]]()
     def postVerificationCallback(errorAndGuard: (Errors, VerificationGuard)): 
Unit = {
       val (error, verificationGuard) = errorAndGuard

Reply via email to