This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50598191dcd MINOR: Add tests on TxnOffsetCommit and EndTxnMarker 
protection against invalid producer epoch when TV2 is used (#20024)
50598191dcd is described below

commit 50598191dcd4763384ef1798fc07db899f731f3d
Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com>
AuthorDate: Sun Jul 20 18:34:29 2025 -0400

    MINOR: Add tests on TxnOffsetCommit and EndTxnMarker protection against 
invalid producer epoch when TV2 is used (#20024)
    
    This patch adds an API level integration test for the producer epoch
    verification when processing transactional offset commit and end txn
    markers.
    
    Reviewers: PoAn Yang <pay...@apache.org>, TengYao Chi
     <kiting...@gmail.com>, Sean Quah <sq...@confluent.io>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  56 ++++++-
 .../kafka/server/TxnOffsetCommitRequestTest.scala  | 141 +++++++++++++----
 .../kafka/server/WriteTxnMarkersRequestTest.scala  | 172 +++++++++++++++++++++
 3 files changed, 338 insertions(+), 31 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 67e4cb8df53..431e431504f 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -19,14 +19,16 @@ package kafka.server
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.{TopicCollection, TopicIdPartition, 
TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, 
DeletableGroupResultCollection}
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
-import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, 
DeleteGroupsResponseData, DescribeGroupsRequestData, 
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, 
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, 
JoinGroupResponseData, LeaveGroupRes [...]
+import 
org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, 
WritableTxnMarkerTopic}
+import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, 
AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, 
ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, 
DeleteGroupsResponseData, DescribeGroupsRequestData, 
DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, 
HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, 
JoinGroupResponseData, LeaveGroupRes [...]
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, 
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, 
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, 
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, 
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, 
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, 
ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, 
ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, 
DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, 
HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, 
InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, L [...]
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
@@ -352,6 +354,35 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     assertEquals(expectedError, 
connectAndReceive[EndTxnResponse](request).error)
   }
 
+  protected def writeTxnMarkers(
+    producerId: Long,
+    producerEpoch: Short,
+    committed: Boolean,
+    expectedError: Errors = Errors.NONE,
+    version: Short = 
ApiKeys.WRITE_TXN_MARKERS.latestVersion(isUnstableApiEnabled)
+  ): Unit = {
+    val request = new WriteTxnMarkersRequest.Builder(
+      new WriteTxnMarkersRequestData()
+        .setMarkers(List(
+          new WritableTxnMarker()
+            .setProducerId(producerId)
+            .setProducerEpoch(producerEpoch)
+            .setTransactionResult(committed)
+            .setTopics(List(
+              new WritableTxnMarkerTopic()
+                .setName(Topic.GROUP_METADATA_TOPIC_NAME)
+                .setPartitionIndexes(List[Integer](0).asJava)
+            ).asJava)
+            .setCoordinatorEpoch(0)
+        ).asJava)
+    ).build(version)
+
+    assertEquals(
+      expectedError.code,
+      
connectAndReceive[WriteTxnMarkersResponse](request).data.markers.get(0).topics.get(0).partitions.get(0).errorCode
+    )
+  }
+
   protected def fetchOffsets(
     groups: List[OffsetFetchRequestData.OffsetFetchRequestGroup],
     requireStable: Boolean,
@@ -422,6 +453,27 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     groupResponse
   }
 
+  protected def fetchOffset(
+    groupId: String,
+    topic: String,
+    partition: Int
+  ): Long = {
+    val groupIdRecord = fetchOffsets(
+      group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
+        .setGroupId(groupId)
+        .setTopics(List(
+          new OffsetFetchRequestData.OffsetFetchRequestTopics()
+            .setName(topic)
+            .setPartitionIndexes(List[Integer](partition).asJava)
+        ).asJava),
+      requireStable = true,
+      version = 9
+    )
+    val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
+    val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex 
== partition).head
+    partitionRecord.committedOffset
+  }
+
   protected def deleteOffset(
     groupId: String,
     topic: String,
diff --git 
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index b0f1bee2333..aef40390d85 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -16,19 +16,16 @@
  */
 package kafka.server
 
-import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.message.OffsetFetchRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
 import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
-
-import scala.jdk.CollectionConverters._
+import org.junit.jupiter.api.Assertions.{assertNotEquals, assertThrows}
 
 @ClusterTestDefaults(
   types = Array(Type.KRAFT),
@@ -51,6 +48,16 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
     testTxnOffsetCommit(false)
   }
 
+  @ClusterTest
+  def 
testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithNewConsumerGroupProtocol():
 Unit = {
+    testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(true)
+  }
+
+  @ClusterTest
+  def 
testDelayedTxnOffsetCommitWithBumpedEpochIsRejectedWithOldConsumerGroupProtocol():
 Unit = {
+    testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(false)
+  }
+
   private def testTxnOffsetCommit(useNewProtocol: Boolean): Unit = {
     val topic = "topic"
     val partition = 0
@@ -65,8 +72,8 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
     // Join the consumer group. Note that we don't heartbeat here so we must 
use
     // a session long enough for the duration of the test.
     val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, 
useNewProtocol)
-    assertTrue(memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID)
-    assertTrue(memberEpoch != JoinGroupRequest.UNKNOWN_GENERATION_ID)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
 
     createTopic(topic, 1)
 
@@ -178,7 +185,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
       transactionalId = transactionalId
     )
 
-    val originalOffset = fetchOffset(topic, partition, groupId)
+    val originalOffset = fetchOffset(groupId, topic, partition)
 
     commitTxnOffset(
       groupId = groupId,
@@ -207,31 +214,107 @@ class 
TxnOffsetCommitRequestTest(cluster:ClusterInstance) extends GroupCoordinat
 
     TestUtils.waitUntilTrue(() =>
       try {
-        fetchOffset(topic, partition, groupId) == expectedOffset
+        fetchOffset(groupId, topic, partition) == expectedOffset
       } catch {
         case _: Throwable => false
       }, "txn commit offset validation failed"
     )
   }
 
-  private def fetchOffset(
-     topic: String,
-     partition: Int,
-     groupId: String
-  ): Long = {
-    val groupIdRecord = fetchOffsets(
-      group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
-        .setGroupId(groupId)
-        .setTopics(List(
-          new OffsetFetchRequestData.OffsetFetchRequestTopics()
-            .setName(topic)
-            .setPartitionIndexes(List[Integer](partition).asJava)
-        ).asJava),
-      requireStable = true,
-      version = 9
-    )
-    val topicRecord = groupIdRecord.topics.asScala.find(_.name == topic).head
-    val partitionRecord = topicRecord.partitions.asScala.find(_.partitionIndex 
== partition).head
-    partitionRecord.committedOffset
+  private def 
testDelayedTxnOffsetCommitWithBumpedEpochIsRejected(useNewProtocol: Boolean): 
Unit = {
+    val topic = "topic"
+    val partition = 0
+    val transactionalId = "txn"
+    val groupId = "group"
+    val offset = 100L
+
+    // Creates the __consumer_offsets and __transaction_state topics because 
it won't be created automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+    createTransactionStateTopic()
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, 
useNewProtocol)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
+
+    createTopic(topic, 1)
+
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+      val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+
+      // Initialize producer. Wait until the coordinator finishes loading.
+      var producerIdAndEpoch: ProducerIdAndEpoch = null
+      TestUtils.waitUntilTrue(() =>
+        try {
+          producerIdAndEpoch = initProducerId(
+            transactionalId = transactionalId,
+            producerIdAndEpoch = ProducerIdAndEpoch.NONE,
+            expectedError = Errors.NONE
+          )
+          true
+        } catch {
+          case _: Throwable => false
+        }, "initProducerId request failed"
+      )
+
+      addOffsetsToTxn(
+        groupId = groupId,
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        transactionalId = transactionalId
+      )
+
+      // Complete the transaction.
+      endTxn(
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        isTransactionV2Enabled = useTV2,
+        committed = true,
+        expectedError = Errors.NONE
+      )
+
+      // Start a new transaction. Wait for the previous transaction to 
complete.
+      TestUtils.waitUntilTrue(() =>
+        try {
+          addOffsetsToTxn(
+            groupId = groupId,
+            producerId = producerIdAndEpoch.producerId,
+            producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
+            transactionalId = transactionalId
+          )
+          true
+        } catch {
+          case _: Throwable => false
+        }, "addOffsetsToTxn request failed"
+      )
+
+      // Committing offset with old epoch succeeds for TV1 and fails for TV2.
+      commitTxnOffset(
+        groupId = groupId,
+        memberId = if (version >= 3) memberId else 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
+        generationId = if (version >= 3) 1 else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        topic = topic,
+        partition = partition,
+        offset = offset,
+        expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else 
Errors.NONE,
+        version = version.toShort
+      )
+
+      // Complete the transaction.
+      endTxn(
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        isTransactionV2Enabled = useTV2,
+        committed = true,
+        expectedError = Errors.NONE
+      )
+    }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
new file mode 100644
index 00000000000..a68de4dacc0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{EndTxnRequest, JoinGroupRequest}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.common.utils.ProducerIdAndEpoch
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.junit.jupiter.api.Assertions.assertNotEquals
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+  )
+)
+class WriteTxnMarkersRequestTest(cluster:ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest
+  def 
testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithNewConsumerGroupProtocol():
 Unit = {
+    testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(true)
+  }
+
+  @ClusterTest
+  def 
testDelayedWriteTxnMarkersShouldNotCommitTxnOffsetWithOldConsumerGroupProtocol():
 Unit = {
+    testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(false)
+  }
+
+  private def 
testDelayedWriteTxnMarkersShouldNotCommitTxnOffset(useNewProtocol: Boolean): 
Unit = {
+    val topic = "topic"
+    val partition = 0
+    val transactionalId = "txn"
+    val groupId = "group"
+    val offset = 100L
+
+    // Creates the __consumer_offsets and __transaction_state topics because 
it won't be created automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+    createTransactionStateTopic()
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId: String, memberEpoch: Int) = joinConsumerGroup(groupId, 
useNewProtocol)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, memberId)
+    assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, memberEpoch)
+
+    createTopic(topic, 1)
+
+    for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)) {
+      val useTV2 = version > 
EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2
+
+      // Initialize producer. Wait until the coordinator finishes loading.
+      var producerIdAndEpoch: ProducerIdAndEpoch = null
+      TestUtils.waitUntilTrue(() =>
+        try {
+          producerIdAndEpoch = initProducerId(
+            transactionalId = transactionalId,
+            producerIdAndEpoch = ProducerIdAndEpoch.NONE,
+            expectedError = Errors.NONE
+          )
+          true
+        } catch {
+          case _: Throwable => false
+        }, "initProducerId request failed"
+      )
+
+      addOffsetsToTxn(
+        groupId = groupId,
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        transactionalId = transactionalId
+      )
+
+      // Complete the transaction.
+      endTxn(
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        isTransactionV2Enabled = useTV2,
+        committed = true,
+        expectedError = Errors.NONE
+      )
+
+      // Start a new transaction. Wait for the previous transaction to 
complete.
+      TestUtils.waitUntilTrue(() =>
+        try {
+          addOffsetsToTxn(
+            groupId = groupId,
+            producerId = producerIdAndEpoch.producerId,
+            producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
+            transactionalId = transactionalId
+          )
+          true
+        } catch {
+          case _: Throwable => false
+        }, "addOffsetsToTxn request failed"
+      )
+
+      commitTxnOffset(
+        groupId = groupId,
+        memberId = if (version >= 3) memberId else 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
+        generationId = if (version >= 3) 1 else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        topic = topic,
+        partition = partition,
+        offset = offset + version,
+        expectedError = Errors.NONE,
+        version = version.toShort
+      )
+
+      // Delayed txn marker should be accepted for TV1 and rejected for TV2.
+      // Note that for the ideal case, producer epoch + 1 should also be 
rejected for TV2,
+      // which is still under fixing.
+      writeTxnMarkers(
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = producerIdAndEpoch.epoch,
+        committed = true,
+        expectedError = if (useTV2) Errors.INVALID_PRODUCER_EPOCH else 
Errors.NONE
+      )
+
+      // The offset is committed for TV1 and not committed for TV2.
+      TestUtils.waitUntilTrue(() =>
+        try {
+          fetchOffset(groupId, topic, partition) == (if (useTV2) -1L else 
offset + version)
+        } catch {
+          case _: Throwable => false
+        }, "unexpected txn commit offset"
+      )
+
+      // Complete the transaction.
+      endTxn(
+        producerId = producerIdAndEpoch.producerId,
+        producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
+        transactionalId = transactionalId,
+        isTransactionV2Enabled = useTV2,
+        committed = true,
+        expectedError = Errors.NONE
+      )
+
+      // The offset is committed for TV2.
+      TestUtils.waitUntilTrue(() =>
+        try {
+          fetchOffset(groupId, topic, partition) == offset + version
+        } catch {
+          case _: Throwable => false
+        }, "txn commit offset validation failed"
+      )
+    }
+  }
+}

Reply via email to