This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c91243a4b71 KAFKA-17877; Only call once maybeSendResponseCallback for 
each marker (#17619)
c91243a4b71 is described below

commit c91243a4b71fb7430b0987f0a2ddf37f3a922927
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Nov 5 02:06:32 2024 -0800

    KAFKA-17877; Only call once maybeSendResponseCallback for each marker 
(#17619)
    
    We should only call once `maybeSendResponseCallback` for each marker during 
the WriteTxnMarkersRequest handling.
    
    Consider the following 2 cases:
    
    First
    We have 2 markers to append, one for producer-0, one for producer-1
    When we first process producer-0, it appends a marker to the 
__consumer_offset.
    The __consumer_offset append finishes very fast because the group 
coordinator is no longer the leader. So the coordinator directly returns 
NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the 
first time, and because there is only one partition to append, it is able to go 
further to call maybeSendResponseCallback() and decrement numAppends.
    Then it calls the replica manager append for nothing, in the callback, it 
calls the maybeComplete() for the second time. This time, it also decrements 
numAppends.
    
    Second
    We have 2 markers to append, one for producer-0, one for producer-1
    When we first process producer-0, it appends a marker to the 
__consumer_offset and a data topic foo.
    The 2 appends will be handled by group coordinator and replica manager 
asynchronously.
    It can be a race that, both appends finishes together, then they can fill 
the `markerResults` at the same time, then call the  `maybeComplete`. Because 
the `partitionsWithCompatibleMessageFormat.size == markerResults.size` 
condition is satisfied, both `maybeComplete` calls can go through to decrement 
the `numAppends` and cause a premature response.
    
    Note: the problem only happens with KIP-848 coordinator enabled.
    
    Reviewers: Jeff Kim <[email protected]>, Justine Olshan 
<[email protected]>, David Jacot <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 38 ++++++++++++----------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 38 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f9a52fd50cc..0ef77fb6376 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2431,8 +2431,12 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
-        def maybeComplete(): Unit = {
-          if (partitionsWithCompatibleMessageFormat.size == 
markerResults.size) {
+        val numPartitions = new 
AtomicInteger(partitionsWithCompatibleMessageFormat.size)
+        def addResultAndMaybeComplete(partition: TopicPartition, error: 
Errors): Unit = {
+          markerResults.put(partition, error)
+          // We should only call maybeSendResponseCallback once per marker. 
Otherwise, it causes sending the response
+          // prematurely.
+          if (numPartitions.decrementAndGet() == 0) {
             maybeSendResponseCallback(producerId, marker.transactionResult, 
markerResults)
           }
         }
@@ -2462,8 +2466,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                     error
                 }
               }
-              markerResults.put(partition, error)
-              maybeComplete()
+              addResultAndMaybeComplete(partition, error)
             }
           } else {
             // Otherwise, the regular appendRecords path is used for all the 
non __consumer_offsets
@@ -2476,20 +2479,21 @@ class KafkaApis(val requestChannel: RequestChannel,
           }
         }
 
-        replicaManager.appendRecords(
-          timeout = config.requestTimeoutMs.toLong,
-          requiredAcks = -1,
-          internalTopicsAllowed = true,
-          origin = AppendOrigin.COORDINATOR,
-          entriesPerPartition = controlRecords,
-          requestLocal = requestLocal,
-          responseCallback = errors => {
-            errors.foreachEntry { (tp, partitionResponse) =>
-              markerResults.put(tp, partitionResponse.error)
+        if (controlRecords.nonEmpty) {
+          replicaManager.appendRecords(
+            timeout = config.requestTimeoutMs.toLong,
+            requiredAcks = -1,
+            internalTopicsAllowed = true,
+            origin = AppendOrigin.COORDINATOR,
+            entriesPerPartition = controlRecords,
+            requestLocal = requestLocal,
+            responseCallback = errors => {
+              errors.foreachEntry { (tp, partitionResponse) =>
+                addResultAndMaybeComplete(tp, partitionResponse.error)
+              }
             }
-            maybeComplete()
-          }
-        )
+          )
+        }
       }
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f86a7baea15..31fdc9efe81 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3070,6 +3070,44 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedErrors, markersResponse.errorsByProducerId.get(1L))
   }
 
+  @Test
+  def testWriteTxnMarkersShouldAllBeIncludedInTheResponse(): Unit = {
+    // This test verifies the response will not be sent prematurely because of 
calling replicaManager append
+    // with no records.
+    val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+    val writeTxnMarkersRequest = new 
WriteTxnMarkersRequest.Builder(ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+      asList(
+        new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, 
asList(topicPartition)),
+        new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT, 
asList(topicPartition)),
+      )).build()
+    val request = buildRequest(writeTxnMarkersRequest)
+    val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = 
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
+
+    when(replicaManager.getMagic(any()))
+      .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    when(groupCoordinator.isNewGroupCoordinator)
+      .thenReturn(true)
+    when(groupCoordinator.completeTransaction(
+      ArgumentMatchers.eq(topicPartition),
+      any(),
+      ArgumentMatchers.eq(1.toShort),
+      ArgumentMatchers.eq(0),
+      ArgumentMatchers.eq(TransactionResult.COMMIT),
+      any()
+    )).thenReturn(CompletableFuture.completedFuture[Void](null))
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handleWriteTxnMarkersRequest(request, 
RequestLocal.withThreadConfinedCaching)
+
+    verify(requestChannel).sendResponse(
+      ArgumentMatchers.eq(request),
+      capturedResponse.capture(),
+      ArgumentMatchers.eq(None)
+    )
+    val markersResponse = capturedResponse.getValue
+    assertEquals(2, markersResponse.errorsByProducerId.size())
+  }
+
   @Test
   def 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition():
 Unit = {
     val tp1 = new TopicPartition("t", 0)

Reply via email to