This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 2f510997588 KAFKA-17507; WriteTxnMarkers API must not return until 
markers are written and materialized in group coordinator's cache (#18168) 
(#18206)
2f510997588 is described below

commit 2f510997588921077d62b8f2f757e7b7e066e3f3
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 17 08:37:31 2024 +0100

    KAFKA-17507; WriteTxnMarkers API must not return until markers are written 
and materialized in group coordinator's cache (#18168) (#18206)
    
    We have observed the below errors in some cluster:
    
    Uncaught exception in scheduled task 'handleTxnCompletion-902667' 
exception.message:Trying to complete a transactional offset commit for 
producerId *** and groupId *** even though the offset commit record itself 
hasn't been appended to the log.
    
    When a transaction is completed, the transaction coordinator sends a 
WriteTxnMarkers request to all the partitions involved in the transaction to 
write the markers to them. When the broker receives it, it writes the markers 
and if markers are written to the __consumer_offsets partitions, it informs the 
group coordinator that it can materialize the pending transactional offsets in 
its main cache. The group coordinator does this asynchronously since Apache 
Kafka 2.0, see this patch.
    
    The above error appends when the asynchronous operation is executed by the 
scheduler and the operation finds that there are pending transactional offsets 
that were not written yet. How come?
    
    There is actually an issue is the steps described above. The group 
coordinator does not wait until the asynchronous operation completes to return 
to the api layer. Hence the WriteTxnMarkers response may be send back to the 
transaction coordinator before the async operation is actually completed. Hence 
it is possible that the next transactional produce to be started also before 
the operation is completed too. This could explain why the group coordinator 
has pending transactional offset [...]
    
    There is a similar issue when the transaction is aborted. However on this 
path, we don't have any checks to verify whether all the pending transactional 
offsets have been written or not so we don't see any errors in our logs. Due to 
the same race condition, it is possible to actually remove the wrong pending 
transactional offsets.
    
    PS: The new group coordinator is not impacted by this bug.
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |   3 +-
 .../group/GroupCoordinatorAdapter.scala            |  16 ++-
 .../coordinator/group/GroupMetadataManager.scala   |  16 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  51 +++++----
 .../group/GroupCoordinatorAdapterTest.scala        |  25 +++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 126 +++++++++++++++++++++
 .../kafka/coordinator/group/GroupCoordinator.java  |   9 +-
 .../coordinator/group/GroupCoordinatorService.java |   2 +-
 8 files changed, 215 insertions(+), 33 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 94117d802ca..0dc1462cf76 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.group.{Group, 
OffsetConfig}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.storage.internals.log.VerificationGuard
 
+import java.util.concurrent.CompletableFuture
 import scala.annotation.nowarn
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.math.max
@@ -980,7 +981,7 @@ private[group] class GroupCoordinator(
 
   def scheduleHandleTxnCompletion(producerId: Long,
                                   offsetsPartitions: Iterable[TopicPartition],
-                                  transactionResult: TransactionResult): Unit 
= {
+                                  transactionResult: TransactionResult): 
CompletableFuture[Void] = {
     require(offsetsPartitions.forall(_.topic == 
Topic.GROUP_METADATA_TOPIC_NAME))
     val isCommit = transactionResult == TransactionResult.COMMIT
     groupManager.scheduleHandleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index a53d4485fb6..716b2acd431 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -580,12 +580,16 @@ private[group] class GroupCoordinatorAdapter(
     producerId: Long,
     partitions: java.lang.Iterable[TopicPartition],
     transactionResult: TransactionResult
-  ): Unit = {
-    coordinator.scheduleHandleTxnCompletion(
-      producerId,
-      partitions.asScala,
-      transactionResult
-    )
+  ): CompletableFuture[Void] = {
+    try {
+      coordinator.scheduleHandleTxnCompletion(
+        producerId,
+        partitions.asScala,
+        transactionResult
+      )
+    } catch {
+      case e: Throwable => FutureUtils.failedFuture(e)
+    }
   }
 
   override def onPartitionsDeleted(
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 252ff063209..eaf56560714 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.util.{Optional, OptionalInt}
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.function.Supplier
 import com.yammer.metrics.core.Gauge
 import kafka.common.OffsetAndMetadata
@@ -939,9 +939,17 @@ class GroupMetadataManager(brokerId: Int,
    * more group metadata locks to handle transaction completion, this 
operation is scheduled on
    * the scheduler thread to avoid deadlocks.
    */
-  def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: 
Set[Int], isCommit: Boolean): Unit = {
-    scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () =>
-      handleTxnCompletion(producerId, completedPartitions, isCommit))
+  def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: 
Set[Int], isCommit: Boolean): CompletableFuture[Void] = {
+    val future = new CompletableFuture[Void]()
+    scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => {
+      try {
+        handleTxnCompletion(producerId, completedPartitions, isCommit)
+        future.complete(null)
+      } catch {
+        case e: Throwable => future.completeExceptionally(e)
+      }
+    })
+    future
   }
 
   private[group] def handleTxnCompletion(producerId: Long, 
completedPartitions: Set[Int], isCommit: Boolean): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f68fec56951..1a813b90942 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2427,28 +2427,41 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace(s"End transaction marker append for producer id $producerId 
completed with status: $currentErrors")
       updateErrors(producerId, currentErrors)
 
-      if (!config.isNewGroupCoordinatorEnabled) {
-        val successfulOffsetsPartitions = currentErrors.asScala.filter { case 
(topicPartition, error) =>
-          topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == 
Errors.NONE
-        }.keys
-
-        if (successfulOffsetsPartitions.nonEmpty) {
-          // as soon as the end transaction marker has been written for a 
transactional offset commit,
-          // call to the group coordinator to materialize the offsets into the 
cache
-          try {
-            groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result)
-          } catch {
-            case e: Exception =>
-              error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", e)
-              val updatedErrors = new ConcurrentHashMap[TopicPartition, 
Errors]()
-              successfulOffsetsPartitions.foreach(updatedErrors.put(_, 
Errors.UNKNOWN_SERVER_ERROR))
-              updateErrors(producerId, updatedErrors)
-          }
+      def maybeSendResponse(): Unit = {
+        if (numAppends.decrementAndGet() == 0) {
+          requestHelper.sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
         }
       }
 
-      if (numAppends.decrementAndGet() == 0)
-        requestHelper.sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
+      // The new group coordinator uses GroupCoordinator#completeTransaction 
so we do
+      // not need to call GroupCoordinator#onTransactionCompleted here.
+      if (config.isNewGroupCoordinatorEnabled) {
+        maybeSendResponse()
+        return
+      }
+
+      val successfulOffsetsPartitions = currentErrors.asScala.filter { case 
(topicPartition, error) =>
+        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == 
Errors.NONE
+      }.keys
+
+      // If no end transaction marker has been written to a __consumer_offsets 
partition, we do not
+      // need to call GroupCoordinator#onTransactionCompleted.
+      if (successfulOffsetsPartitions.isEmpty) {
+        maybeSendResponse()
+        return
+      }
+
+      // Otherwise, we call GroupCoordinator#onTransactionCompleted to 
materialize the offsets
+      // into the cache and we wait until the meterialization is completed.
+      groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
+        if (exception != null) {
+          error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", exception)
+          val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+          successfulOffsetsPartitions.foreach(updatedErrors.put(_, 
Errors.UNKNOWN_SERVER_ERROR))
+          updateErrors(producerId, updatedErrors)
+        }
+        maybeSendResponse()
+      }
     }
 
     // TODO: The current append API makes doing separate writes per producerId 
a little easier, but it would
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index db4f0136af2..02ccb232ce3 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -28,7 +28,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
 OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
OffsetDeleteResponseTopicCollection}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, 
RequestHeader}
+import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, 
RequestHeader, TransactionResult}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
@@ -37,6 +37,7 @@ import org.apache.kafka.test.TestUtils.assertFutureThrows
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.ArgumentMatchers.any
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import org.mockito.Mockito.{mock, verify, when}
 
@@ -930,4 +931,26 @@ class GroupCoordinatorAdapterTest {
     assertTrue(future.isCompletedExceptionally)
     assertFutureThrows(future, classOf[UnsupportedVersionException])
   }
+
+  @Test
+  def testOnTransactionCompletedWithUnexpectedException(): Unit = {
+    val groupCoordinator = mock(classOf[GroupCoordinator])
+    val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+    when(groupCoordinator.scheduleHandleTxnCompletion(
+      any(),
+      any(),
+      any()
+    )).thenThrow(new IllegalStateException("Oh no!"))
+
+    val future = adapter.onTransactionCompleted(
+      10,
+      Seq.empty[TopicPartition].asJava,
+      TransactionResult.COMMIT
+    )
+
+    assertTrue(future.isDone)
+    assertTrue(future.isCompletedExceptionally)
+    assertFutureThrows(future, classOf[Exception])
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7487d05edee..25a06b71035 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3156,6 +3156,132 @@ class KafkaApisTest extends Logging {
       any())
   }
 
+  @Test
+  def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
+    val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+    val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
+    val foo0 = new TopicPartition("foo", 0)
+    val foo1 = new TopicPartition("foo", 1)
+
+    val allPartitions = List(
+      offset0,
+      offset1,
+      foo0,
+      foo1
+    )
+
+    val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
+      ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+      List(
+        new TxnMarkerEntry(
+          1L,
+          1.toShort,
+          0,
+          TransactionResult.COMMIT,
+          List(offset0, foo0).asJava
+        ),
+        new TxnMarkerEntry(
+          2L,
+          1.toShort,
+          0,
+          TransactionResult.ABORT,
+          List(offset1, foo1).asJava
+        )
+      ).asJava
+    ).build()
+
+    val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
+
+    allPartitions.foreach { tp =>
+      when(replicaManager.getMagic(tp))
+        .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+    }
+
+    when(groupCoordinator.onTransactionCompleted(
+      ArgumentMatchers.eq(1L),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.eq(TransactionResult.COMMIT)
+    )).thenReturn(CompletableFuture.completedFuture[Void](null))
+
+    when(groupCoordinator.onTransactionCompleted(
+      ArgumentMatchers.eq(2L),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.eq(TransactionResult.ABORT)
+    
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))
+
+    val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, 
MemoryRecords]] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+    val responseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] 
=> Unit])
+
+    when(replicaManager.appendRecords(
+      ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
+      ArgumentMatchers.eq(-1),
+      ArgumentMatchers.eq(true),
+      ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+      entriesPerPartition.capture(),
+      responseCallback.capture(),
+      any(),
+      any(),
+      ArgumentMatchers.eq(RequestLocal.NoCaching),
+      any(),
+      any()
+    )).thenAnswer { _ =>
+      responseCallback.getValue.apply(
+        entriesPerPartition.getValue.keySet.map { tp =>
+          tp -> new PartitionResponse(Errors.NONE)
+        }.toMap
+      )
+    }
+    kafkaApis = createKafkaApis(overrideProperties = Map(
+      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
+    ))
+    kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, 
RequestLocal.NoCaching)
+
+    val expectedResponse = new WriteTxnMarkersResponseData()
+      .setMarkers(List(
+        new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+          .setProducerId(1L)
+          .setTopics(List(
+            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+              .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+              .setPartitions(List(
+                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+                  .setPartitionIndex(0)
+                  .setErrorCode(Errors.NONE.code)
+              ).asJava),
+            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+              .setName("foo")
+              .setPartitions(List(
+                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+                  .setPartitionIndex(0)
+                  .setErrorCode(Errors.NONE.code)
+              ).asJava)
+          ).asJava),
+        new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+          .setProducerId(2L)
+          .setTopics(List(
+            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+              .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+              .setPartitions(List(
+                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+                  .setPartitionIndex(1)
+                  .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
+              ).asJava),
+            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+              .setName("foo")
+              .setPartitions(List(
+                new 
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+                  .setPartitionIndex(1)
+                  .setErrorCode(Errors.NONE.code)
+              ).asJava)
+          ).asJava)
+      ).asJava)
+
+    val response = 
verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
+    assertEquals(normalize(expectedResponse), normalize(response.data))
+  }
+
   @Test
   def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
     val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 2df6f1136b7..41a6f77b6d5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -333,11 +333,18 @@ public interface GroupCoordinator {
     /**
      * Commit or abort the pending transactional offsets for the given 
partitions.
      *
+     * This method is only used by the old group coordinator. Internally, the 
old
+     * group coordinator completes the transaction asynchronously in order to
+     * avoid deadlocks. Hence, this method returns a future that the caller
+     * can wait on.
+     *
      * @param producerId        The producer id.
      * @param partitions        The partitions.
      * @param transactionResult The result of the transaction.
+     *
+     * @return A future yielding the result.
      */
-    void onTransactionCompleted(
+    CompletableFuture<Void> onTransactionCompleted(
         long producerId,
         Iterable<TopicPartition> partitions,
         TransactionResult transactionResult
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e132e85ec21..a7f7375f089 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -993,7 +993,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
      * See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, 
TransactionResult)}.
      */
     @Override
-    public void onTransactionCompleted(
+    public CompletableFuture<Void> onTransactionCompleted(
         long producerId,
         Iterable<TopicPartition> partitions,
         TransactionResult transactionResult

Reply via email to