This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 2f510997588 KAFKA-17507; WriteTxnMarkers API must not return until
markers are written and materialized in group coordinator's cache (#18168)
(#18206)
2f510997588 is described below
commit 2f510997588921077d62b8f2f757e7b7e066e3f3
Author: David Jacot <[email protected]>
AuthorDate: Tue Dec 17 08:37:31 2024 +0100
KAFKA-17507; WriteTxnMarkers API must not return until markers are written
and materialized in group coordinator's cache (#18168) (#18206)
We have observed the below errors in some cluster:
Uncaught exception in scheduled task 'handleTxnCompletion-902667'
exception.message:Trying to complete a transactional offset commit for
producerId *** and groupId *** even though the offset commit record itself
hasn't been appended to the log.
When a transaction is completed, the transaction coordinator sends a
WriteTxnMarkers request to all the partitions involved in the transaction to
write the markers to them. When the broker receives it, it writes the markers
and if markers are written to the __consumer_offsets partitions, it informs the
group coordinator that it can materialize the pending transactional offsets in
its main cache. The group coordinator does this asynchronously since Apache
Kafka 2.0, see this patch.
The above error appends when the asynchronous operation is executed by the
scheduler and the operation finds that there are pending transactional offsets
that were not written yet. How come?
There is actually an issue is the steps described above. The group
coordinator does not wait until the asynchronous operation completes to return
to the api layer. Hence the WriteTxnMarkers response may be send back to the
transaction coordinator before the async operation is actually completed. Hence
it is possible that the next transactional produce to be started also before
the operation is completed too. This could explain why the group coordinator
has pending transactional offset [...]
There is a similar issue when the transaction is aborted. However on this
path, we don't have any checks to verify whether all the pending transactional
offsets have been written or not so we don't see any errors in our logs. Due to
the same race condition, it is possible to actually remove the wrong pending
transactional offsets.
PS: The new group coordinator is not impacted by this bug.
Reviewers: Justine Olshan <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 3 +-
.../group/GroupCoordinatorAdapter.scala | 16 ++-
.../coordinator/group/GroupMetadataManager.scala | 16 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 51 +++++----
.../group/GroupCoordinatorAdapterTest.scala | 25 +++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 126 +++++++++++++++++++++
.../kafka/coordinator/group/GroupCoordinator.java | 9 +-
.../coordinator/group/GroupCoordinatorService.java | 2 +-
8 files changed, 215 insertions(+), 33 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 94117d802ca..0dc1462cf76 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.coordinator.group.{Group,
OffsetConfig}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard
+import java.util.concurrent.CompletableFuture
import scala.annotation.nowarn
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max
@@ -980,7 +981,7 @@ private[group] class GroupCoordinator(
def scheduleHandleTxnCompletion(producerId: Long,
offsetsPartitions: Iterable[TopicPartition],
- transactionResult: TransactionResult): Unit
= {
+ transactionResult: TransactionResult):
CompletableFuture[Void] = {
require(offsetsPartitions.forall(_.topic ==
Topic.GROUP_METADATA_TOPIC_NAME))
val isCommit = transactionResult == TransactionResult.COMMIT
groupManager.scheduleHandleTxnCompletion(producerId,
offsetsPartitions.map(_.partition).toSet, isCommit)
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index a53d4485fb6..716b2acd431 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -580,12 +580,16 @@ private[group] class GroupCoordinatorAdapter(
producerId: Long,
partitions: java.lang.Iterable[TopicPartition],
transactionResult: TransactionResult
- ): Unit = {
- coordinator.scheduleHandleTxnCompletion(
- producerId,
- partitions.asScala,
- transactionResult
- )
+ ): CompletableFuture[Void] = {
+ try {
+ coordinator.scheduleHandleTxnCompletion(
+ producerId,
+ partitions.asScala,
+ transactionResult
+ )
+ } catch {
+ case e: Throwable => FutureUtils.failedFuture(e)
+ }
}
override def onPartitionsDeleted(
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 252ff063209..eaf56560714 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.{Optional, OptionalInt}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
@@ -939,9 +939,17 @@ class GroupMetadataManager(brokerId: Int,
* more group metadata locks to handle transaction completion, this
operation is scheduled on
* the scheduler thread to avoid deadlocks.
*/
- def scheduleHandleTxnCompletion(producerId: Long, completedPartitions:
Set[Int], isCommit: Boolean): Unit = {
- scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () =>
- handleTxnCompletion(producerId, completedPartitions, isCommit))
+ def scheduleHandleTxnCompletion(producerId: Long, completedPartitions:
Set[Int], isCommit: Boolean): CompletableFuture[Void] = {
+ val future = new CompletableFuture[Void]()
+ scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => {
+ try {
+ handleTxnCompletion(producerId, completedPartitions, isCommit)
+ future.complete(null)
+ } catch {
+ case e: Throwable => future.completeExceptionally(e)
+ }
+ })
+ future
}
private[group] def handleTxnCompletion(producerId: Long,
completedPartitions: Set[Int], isCommit: Boolean): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f68fec56951..1a813b90942 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2427,28 +2427,41 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"End transaction marker append for producer id $producerId
completed with status: $currentErrors")
updateErrors(producerId, currentErrors)
- if (!config.isNewGroupCoordinatorEnabled) {
- val successfulOffsetsPartitions = currentErrors.asScala.filter { case
(topicPartition, error) =>
- topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error ==
Errors.NONE
- }.keys
-
- if (successfulOffsetsPartitions.nonEmpty) {
- // as soon as the end transaction marker has been written for a
transactional offset commit,
- // call to the group coordinator to materialize the offsets into the
cache
- try {
- groupCoordinator.onTransactionCompleted(producerId,
successfulOffsetsPartitions.asJava, result)
- } catch {
- case e: Exception =>
- error(s"Received an exception while trying to update the offsets
cache on transaction marker append", e)
- val updatedErrors = new ConcurrentHashMap[TopicPartition,
Errors]()
- successfulOffsetsPartitions.foreach(updatedErrors.put(_,
Errors.UNKNOWN_SERVER_ERROR))
- updateErrors(producerId, updatedErrors)
- }
+ def maybeSendResponse(): Unit = {
+ if (numAppends.decrementAndGet() == 0) {
+ requestHelper.sendResponseExemptThrottle(request, new
WriteTxnMarkersResponse(errors))
}
}
- if (numAppends.decrementAndGet() == 0)
- requestHelper.sendResponseExemptThrottle(request, new
WriteTxnMarkersResponse(errors))
+ // The new group coordinator uses GroupCoordinator#completeTransaction
so we do
+ // not need to call GroupCoordinator#onTransactionCompleted here.
+ if (config.isNewGroupCoordinatorEnabled) {
+ maybeSendResponse()
+ return
+ }
+
+ val successfulOffsetsPartitions = currentErrors.asScala.filter { case
(topicPartition, error) =>
+ topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error ==
Errors.NONE
+ }.keys
+
+ // If no end transaction marker has been written to a __consumer_offsets
partition, we do not
+ // need to call GroupCoordinator#onTransactionCompleted.
+ if (successfulOffsetsPartitions.isEmpty) {
+ maybeSendResponse()
+ return
+ }
+
+ // Otherwise, we call GroupCoordinator#onTransactionCompleted to
materialize the offsets
+ // into the cache and we wait until the meterialization is completed.
+ groupCoordinator.onTransactionCompleted(producerId,
successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
+ if (exception != null) {
+ error(s"Received an exception while trying to update the offsets
cache on transaction marker append", exception)
+ val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
+ successfulOffsetsPartitions.foreach(updatedErrors.put(_,
Errors.UNKNOWN_SERVER_ERROR))
+ updateErrors(producerId, updatedErrors)
+ }
+ maybeSendResponse()
+ }
}
// TODO: The current append API makes doing separate writes per producerId
a little easier, but it would
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index db4f0136af2..02ccb232ce3 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -28,7 +28,7 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic,
OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext,
RequestHeader}
+import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext,
RequestHeader, TransactionResult}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
@@ -37,6 +37,7 @@ import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
@@ -930,4 +931,26 @@ class GroupCoordinatorAdapterTest {
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
+
+ @Test
+ def testOnTransactionCompletedWithUnexpectedException(): Unit = {
+ val groupCoordinator = mock(classOf[GroupCoordinator])
+ val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
+
+ when(groupCoordinator.scheduleHandleTxnCompletion(
+ any(),
+ any(),
+ any()
+ )).thenThrow(new IllegalStateException("Oh no!"))
+
+ val future = adapter.onTransactionCompleted(
+ 10,
+ Seq.empty[TopicPartition].asJava,
+ TransactionResult.COMMIT
+ )
+
+ assertTrue(future.isDone)
+ assertTrue(future.isCompletedExceptionally)
+ assertFutureThrows(future, classOf[Exception])
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 7487d05edee..25a06b71035 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3156,6 +3156,132 @@ class KafkaApisTest extends Logging {
any())
}
+ @Test
+ def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
+ val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
+ val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
+ val foo0 = new TopicPartition("foo", 0)
+ val foo1 = new TopicPartition("foo", 1)
+
+ val allPartitions = List(
+ offset0,
+ offset1,
+ foo0,
+ foo1
+ )
+
+ val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
+ ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
+ List(
+ new TxnMarkerEntry(
+ 1L,
+ 1.toShort,
+ 0,
+ TransactionResult.COMMIT,
+ List(offset0, foo0).asJava
+ ),
+ new TxnMarkerEntry(
+ 2L,
+ 1.toShort,
+ 0,
+ TransactionResult.ABORT,
+ List(offset1, foo1).asJava
+ )
+ ).asJava
+ ).build()
+
+ val requestChannelRequest = buildRequest(writeTxnMarkersRequest)
+
+ allPartitions.foreach { tp =>
+ when(replicaManager.getMagic(tp))
+ .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+ }
+
+ when(groupCoordinator.onTransactionCompleted(
+ ArgumentMatchers.eq(1L),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.eq(TransactionResult.COMMIT)
+ )).thenReturn(CompletableFuture.completedFuture[Void](null))
+
+ when(groupCoordinator.onTransactionCompleted(
+ ArgumentMatchers.eq(2L),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.eq(TransactionResult.ABORT)
+
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))
+
+ val entriesPerPartition: ArgumentCaptor[Map[TopicPartition,
MemoryRecords]] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
+ val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse]
=> Unit])
+
+ when(replicaManager.appendRecords(
+ ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
+ ArgumentMatchers.eq(-1),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
+ entriesPerPartition.capture(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ ArgumentMatchers.eq(RequestLocal.NoCaching),
+ any(),
+ any()
+ )).thenAnswer { _ =>
+ responseCallback.getValue.apply(
+ entriesPerPartition.getValue.keySet.map { tp =>
+ tp -> new PartitionResponse(Errors.NONE)
+ }.toMap
+ )
+ }
+ kafkaApis = createKafkaApis(overrideProperties = Map(
+ GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
+ ))
+ kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest,
RequestLocal.NoCaching)
+
+ val expectedResponse = new WriteTxnMarkersResponseData()
+ .setMarkers(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+ .setProducerId(1L)
+ .setTopics(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitions(List(
+ new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName("foo")
+ .setPartitions(List(
+ new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
+ .setProducerId(2L)
+ .setTopics(List(
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+ .setPartitions(List(
+ new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
+ ).asJava),
+ new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
+ .setName("foo")
+ .setPartitions(List(
+ new
WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
+ .setPartitionIndex(1)
+ .setErrorCode(Errors.NONE.code)
+ ).asJava)
+ ).asJava)
+ ).asJava)
+
+ val response =
verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
+ assertEquals(normalize(expectedResponse), normalize(response.data))
+ }
+
@Test
def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 2df6f1136b7..41a6f77b6d5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -333,11 +333,18 @@ public interface GroupCoordinator {
/**
* Commit or abort the pending transactional offsets for the given
partitions.
*
+ * This method is only used by the old group coordinator. Internally, the
old
+ * group coordinator completes the transaction asynchronously in order to
+ * avoid deadlocks. Hence, this method returns a future that the caller
+ * can wait on.
+ *
* @param producerId The producer id.
* @param partitions The partitions.
* @param transactionResult The result of the transaction.
+ *
+ * @return A future yielding the result.
*/
- void onTransactionCompleted(
+ CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e132e85ec21..a7f7375f089 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -993,7 +993,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
* See {@link GroupCoordinator#onTransactionCompleted(long, Iterable,
TransactionResult)}.
*/
@Override
- public void onTransactionCompleted(
+ public CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult