This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ee5a2ac26a KAFKA-19446: Add Transaction Version to request builder 
[2/3] (#20868)
7ee5a2ac26a is described below

commit 7ee5a2ac26a9149fbbddc2d62f5fd7f6228beda3
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Nov 13 18:21:55 2025 -0800

    KAFKA-19446: Add Transaction Version to request builder [2/3] (#20868)
    
    In this patch for KIP-1228, we add a new WriteTxnMarkersRequest builder
    constructor  that includes the    transaction versions for each marker.
    The  transaction version is already stored in   the metadata on the
    transaction coordinator. We now extract this version   information and
    pass it to the builder via the TxnMarkerEntry, which propagates it to
    partition leaders.
    
    Since the TransactionVersion field is marked as ignorable, we always
    include it when   building the request. The field will be automatically
    omitted without any errors, during serialization   if the request
    version is < 2, ensuring backward compatibility with brokers that   only
    support version 1.
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../common/requests/WriteTxnMarkersRequest.java    |  43 ++++--
 .../kafka/common/requests/RequestResponseTest.java |   2 +-
 .../requests/WriteTxnMarkersRequestTest.java       | 161 +++++++++++++++++++--
 .../TransactionMarkerChannelManager.scala          |  13 +-
 .../TransactionMarkerChannelManagerTest.scala      |  24 ++-
 ...sactionMarkerRequestCompletionHandlerTest.scala |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  15 +-
 7 files changed, 224 insertions(+), 36 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 38f416654f2..cf7e771c550 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -38,17 +38,20 @@ public class WriteTxnMarkersRequest extends AbstractRequest 
{
         private final int coordinatorEpoch;
         private final TransactionResult result;
         private final List<TopicPartition> partitions;
+        private final short transactionVersion;
 
         public TxnMarkerEntry(long producerId,
                               short producerEpoch,
                               int coordinatorEpoch,
                               TransactionResult result,
-                              List<TopicPartition> partitions) {
+                              List<TopicPartition> partitions,
+                              short transactionVersion) {
             this.producerId = producerId;
             this.producerEpoch = producerEpoch;
             this.coordinatorEpoch = coordinatorEpoch;
             this.result = result;
             this.partitions = partitions;
+            this.transactionVersion = transactionVersion;
         }
 
         public long producerId() {
@@ -71,6 +74,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
             return partitions;
         }
 
+        public short transactionVersion() {
+            return transactionVersion;
+        }
+
         @Override
         public String toString() {
             return "TxnMarkerEntry{" +
@@ -79,6 +86,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
                        ", coordinatorEpoch=" + coordinatorEpoch +
                        ", result=" + result +
                        ", partitions=" + partitions +
+                       ", transactionVersion=" + transactionVersion +
                        '}';
         }
 
@@ -91,12 +99,13 @@ public class WriteTxnMarkersRequest extends AbstractRequest 
{
                        producerEpoch == that.producerEpoch &&
                        coordinatorEpoch == that.coordinatorEpoch &&
                        result == that.result &&
+                       transactionVersion == that.transactionVersion &&
                        Objects.equals(partitions, that.partitions);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(producerId, producerEpoch, coordinatorEpoch, 
result, partitions);
+            return Objects.hash(producerId, producerEpoch, coordinatorEpoch, 
result, partitions, transactionVersion);
         }
     }
 
@@ -109,6 +118,11 @@ public class WriteTxnMarkersRequest extends 
AbstractRequest {
             this.data = data;
         }
 
+        /**
+         * Creates a builder with the given markers. Transaction versions are 
read from each marker entry.
+         *
+         * @param markers the list of transaction marker entries
+         */
         public Builder(final List<TxnMarkerEntry> markers) {
             // version will be determined at build time based on broker 
capabilities
             super(ApiKeys.WRITE_TXN_MARKERS);
@@ -123,12 +137,18 @@ public class WriteTxnMarkersRequest extends 
AbstractRequest {
                     topicMap.put(topicPartition.topic(), topic);
                 }
 
-                dataMarkers.add(new WritableTxnMarker()
-                                    .setProducerId(marker.producerId)
-                                    .setProducerEpoch(marker.producerEpoch)
-                                    
.setCoordinatorEpoch(marker.coordinatorEpoch)
-                                    
.setTransactionResult(marker.transactionResult().id)
-                                    .setTopics(new 
ArrayList<>(topicMap.values())));
+                WritableTxnMarker writableMarker = new WritableTxnMarker()
+                    .setProducerId(marker.producerId)
+                    .setProducerEpoch(marker.producerEpoch)
+                    .setCoordinatorEpoch(marker.coordinatorEpoch)
+                    .setTransactionResult(marker.transactionResult().id)
+                    .setTopics(new ArrayList<>(topicMap.values()));
+
+                // Set transaction version from the marker entry (KIP-1228).
+                // Serialization will automatically omit TransactionVersion 
field in version 1 since it's ignorable.
+                writableMarker.setTransactionVersion((byte) 
marker.transactionVersion);
+
+                dataMarkers.add(writableMarker);
             }
             this.data = new 
WriteTxnMarkersRequestData().setMarkers(dataMarkers);
         }
@@ -178,12 +198,17 @@ public class WriteTxnMarkersRequest extends 
AbstractRequest {
                     topicPartitions.add(new TopicPartition(topic.name(), 
partitionIdx));
                 }
             }
+            // Read transactionVersion from raw marker data.
+            // For request version 1, this field is set to 0 during 
deserialization since it's ignorable.
+            short transactionVersion = markerEntry.transactionVersion();
+
             markers.add(new TxnMarkerEntry(
                 markerEntry.producerId(),
                 markerEntry.producerEpoch(),
                 markerEntry.coordinatorEpoch(),
                 TransactionResult.forId(markerEntry.transactionResult()),
-                topicPartitions)
+                topicPartitions,
+                transactionVersion)
             );
         }
         return markers;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7834db2e5fa..9b0a75506da 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2826,7 +2826,7 @@ public class RequestResponseTest {
 
     private WriteTxnMarkersRequest createWriteTxnMarkersRequest(short version) 
{
         List<TopicPartition> partitions = singletonList(new 
TopicPartition("topic", 73));
-        WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new 
WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73, 
TransactionResult.ABORT, partitions);
+        WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new 
WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73, 
TransactionResult.ABORT, partitions, (short) 0);
         return new 
WriteTxnMarkersRequest.Builder(singletonList(txnMarkerEntry)).build(version);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
index 045ac932cf9..178f834e07a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -27,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class WriteTxnMarkersRequestTest {
 
@@ -45,23 +48,51 @@ public class WriteTxnMarkersRequestTest {
         markers = Collections.singletonList(
              new WriteTxnMarkersRequest.TxnMarkerEntry(
                  PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
-                 RESULT, Collections.singletonList(TOPIC_PARTITION))
+                 RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 0)
         );
     }
 
     @Test
     public void testConstructor() {
-        WriteTxnMarkersRequest.Builder builder = new 
WriteTxnMarkersRequest.Builder(markers);
-        for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) {
-            WriteTxnMarkersRequest request = builder.build(version);
-            assertEquals(1, request.markers().size());
-            WriteTxnMarkersRequest.TxnMarkerEntry marker = 
request.markers().get(0);
-            assertEquals(PRODUCER_ID, marker.producerId());
-            assertEquals(PRODUCER_EPOCH, marker.producerEpoch());
-            assertEquals(COORDINATOR_EPOCH, marker.coordinatorEpoch());
-            assertEquals(RESULT, marker.transactionResult());
-            assertEquals(Collections.singletonList(TOPIC_PARTITION), 
marker.partitions());
-        }
+        // We always set the transaction version in the request data using the 
arguments provided to the builder.
+        // If the version doesn't support it, it will be omitted during 
serialization.
+
+        // Test constructor with transactionVersion = 2
+        List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion = 
Collections.singletonList(
+            new WriteTxnMarkersRequest.TxnMarkerEntry(
+                PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+                RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
+        );
+
+        // Build with request version 1.
+        WriteTxnMarkersRequest.Builder builder = new 
WriteTxnMarkersRequest.Builder(markersWithVersion);
+        WriteTxnMarkersRequest request = builder.build((short) 1);
+        assertEquals(1, request.data().markers().size());
+        WriteTxnMarkersRequestData.WritableTxnMarker dataMarker = 
request.data().markers().get(0);
+        assertEquals(PRODUCER_ID, dataMarker.producerId());
+        assertEquals(PRODUCER_EPOCH, dataMarker.producerEpoch());
+        assertEquals(COORDINATOR_EPOCH, dataMarker.coordinatorEpoch());
+        assertEquals(RESULT.id, dataMarker.transactionResult());
+        assertEquals(1, dataMarker.topics().size());
+        assertEquals(TOPIC_PARTITION.topic(), 
dataMarker.topics().get(0).name());
+        assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()), 
dataMarker.topics().get(0).partitionIndexes());
+        // Verify TransactionVersion is set to 2 in the data irrespective of 
the request version
+        assertEquals((byte) 2, dataMarker.transactionVersion());
+
+        // Build with request version 2
+        WriteTxnMarkersRequest.Builder builderWithVersions = new 
WriteTxnMarkersRequest.Builder(markersWithVersion);
+        WriteTxnMarkersRequest requestWithVersion = 
builderWithVersions.build((short) 2);
+        assertEquals(1, requestWithVersion.data().markers().size());
+        WriteTxnMarkersRequestData.WritableTxnMarker dataMarkerWithVersion = 
requestWithVersion.data().markers().get(0);
+        assertEquals(PRODUCER_ID, dataMarkerWithVersion.producerId());
+        assertEquals(PRODUCER_EPOCH, dataMarkerWithVersion.producerEpoch());
+        assertEquals(COORDINATOR_EPOCH, 
dataMarkerWithVersion.coordinatorEpoch());
+        assertEquals(RESULT.id, dataMarkerWithVersion.transactionResult());
+        assertEquals(1, dataMarkerWithVersion.topics().size());
+        assertEquals(TOPIC_PARTITION.topic(), 
dataMarkerWithVersion.topics().get(0).name());
+        assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()), 
dataMarkerWithVersion.topics().get(0).partitionIndexes());
+        // Verify TransactionVersion is set to 2 in the data
+        assertEquals((byte) 2, dataMarkerWithVersion.transactionVersion());
     }
 
     @Test
@@ -79,4 +110,110 @@ public class WriteTxnMarkersRequestTest {
             assertEquals(0, errorResponse.throttleTimeMs());
         }
     }
+
+    @Test
+    public void testTransactionVersion() {
+        // Test that TransactionVersion is set correctly and serialization 
handles it properly.
+        List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion = 
Collections.singletonList(
+            new WriteTxnMarkersRequest.TxnMarkerEntry(
+                PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+                RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
+        );
+        WriteTxnMarkersRequest.Builder builder = new 
WriteTxnMarkersRequest.Builder(markersWithVersion);
+        
+        // Test request version 2 - TransactionVersion should be included.
+        WriteTxnMarkersRequest requestV2 = builder.build((short) 2);
+        assertNotNull(requestV2);
+        assertEquals(1, requestV2.markers().size());
+
+        // Verify TransactionVersion is set to 2 in the request data.
+        assertEquals((byte) 2, 
requestV2.data().markers().get(0).transactionVersion());
+        // Verify the request can be serialized for version 2 
(TransactionVersion field included).
+        // This should not throw an exception.
+        ByteBufferAccessor serializedV2 = requestV2.serialize();
+        assertNotNull(serializedV2, "Serialization should succeed without 
error for version 2");
+        // Test deserialization for version 2 - verify TransactionVersion 
field was included during serialization.
+        // Use the already serialized request and parse it back to verify the 
field is present.
+        serializedV2.buffer().rewind();
+        RequestAndSize requestAndSizeV2 = AbstractRequest.parseRequest(
+            ApiKeys.WRITE_TXN_MARKERS, (short) 2, serializedV2);
+        WriteTxnMarkersRequest parsedRequestV2 = (WriteTxnMarkersRequest) 
requestAndSizeV2.request;
+        assertNotNull(parsedRequestV2);
+        assertEquals(1, parsedRequestV2.markers().size());
+        // After deserialization, TransactionVersion should be 2 because it 
was included during serialization.
+        assertEquals((short) 2, 
parsedRequestV2.markers().get(0).transactionVersion());
+        // Verify the data also shows 2 (since it was read from serialized 
bytes with the field).
+        assertEquals((byte) 2, 
parsedRequestV2.data().markers().get(0).transactionVersion());
+
+        // Test request version 1 - TransactionVersion should be omitted 
(ignorable field).
+        WriteTxnMarkersRequest requestV1 = builder.build((short) 1);
+        assertNotNull(requestV1);
+        assertEquals(1, requestV1.markers().size());
+
+        // Verify TransactionVersion is still set to 2 in the request data 
(even for version 1).
+        // This is what the coordinator has when building the request - data() 
is used before serialization.
+        // The field value is preserved in the data, but will be omitted 
during serialization.
+        assertEquals((byte) 2, 
requestV1.data().markers().get(0).transactionVersion());
+        // Verify the request can be serialized for version 1 
(TransactionVersion field omitted).
+        // This should not throw an exception even though TransactionVersion 
is set to 2
+        // because the field is marked as ignorable.
+        ByteBufferAccessor serializedV1 = requestV1.serialize();
+        assertNotNull(serializedV1, "Serialization should succeed without 
error for version 1 even with TransactionVersion set");
+        // Test deserialization for version 1 - verify TransactionVersion 
field was omitted during serialization.
+        // Use the already serialized request and parse it back to verify the 
field is not present.
+        serializedV1.buffer().rewind();
+        RequestAndSize requestAndSizeV1 = AbstractRequest.parseRequest(
+            ApiKeys.WRITE_TXN_MARKERS, (short) 1, serializedV1);
+        WriteTxnMarkersRequest parsedRequestV1 = (WriteTxnMarkersRequest) 
requestAndSizeV1.request;
+        assertNotNull(parsedRequestV1);
+        assertEquals(1, parsedRequestV1.markers().size());
+        // After deserialization, TransactionVersion should be 0 because it 
was omitted during serialization.
+        // The field is not present in the serialized bytes for version 1, so 
it defaults to 0.
+        assertEquals((short) 0, 
parsedRequestV1.markers().get(0).transactionVersion());
+        // Verify the data also shows 0 (since it was read from serialized 
bytes without the field).
+        assertEquals((byte) 0, 
parsedRequestV1.data().markers().get(0).transactionVersion());
+    }
+
+    @Test
+    public void testRequestWithMultipleMarkersDifferentTransactionVersions() {
+        // Test building a request with two markers - one with tv1 and one 
with tv2
+        // and verify that the right transaction versions are updated in the 
request data
+        TopicPartition topicPartition1 = new TopicPartition("topic1", 0);
+        TopicPartition topicPartition2 = new TopicPartition("topic2", 1);
+        long producerId1 = 100L;
+        long producerId2 = 200L;
+        
+        List<WriteTxnMarkersRequest.TxnMarkerEntry> 
markersWithDifferentVersions = List.of(
+            new WriteTxnMarkersRequest.TxnMarkerEntry(
+                producerId1, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+                RESULT, Collections.singletonList(topicPartition1), (short) 
1), // tv1
+            new WriteTxnMarkersRequest.TxnMarkerEntry(
+                producerId2, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+                RESULT, Collections.singletonList(topicPartition2), (short) 2) 
 // tv2
+        );
+        
+        WriteTxnMarkersRequest.Builder builder = new 
WriteTxnMarkersRequest.Builder(markersWithDifferentVersions);
+        WriteTxnMarkersRequest request = builder.build((short) 2);
+        
+        assertNotNull(request);
+        assertEquals(2, request.data().markers().size());
+        
+        // Verify first marker has tv1 (transactionVersion = 1) in the request 
data
+        WriteTxnMarkersRequestData.WritableTxnMarker dataMarker1 = 
request.data().markers().get(0);
+        assertEquals(producerId1, dataMarker1.producerId());
+        assertEquals((byte) 1, dataMarker1.transactionVersion());
+        
+        // Verify second marker has tv2 (transactionVersion = 2) in the 
request data
+        WriteTxnMarkersRequestData.WritableTxnMarker dataMarker2 = 
request.data().markers().get(1);
+        assertEquals(producerId2, dataMarker2.producerId());
+        assertEquals((byte) 2, dataMarker2.transactionVersion());
+        
+        // Verify markers() method also returns correct transaction versions
+        List<WriteTxnMarkersRequest.TxnMarkerEntry> markers = 
request.markers();
+        assertEquals(2, markers.size());
+        assertEquals((short) 1, markers.get(0).transactionVersion());
+        assertEquals(producerId1, markers.get(0).producerId());
+        assertEquals((short) 2, markers.get(1).transactionVersion());
+        assertEquals(producerId2, markers.get(1).producerId());
+    }
 }
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 227eb9881a2..21d2876d824 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -391,10 +391,21 @@ class TransactionMarkerChannelManager(
     }
 
     val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
+    // Extract transaction version from metadata. In practice, 
clientTransactionVersion should never be null
+    // (it's always set when loading from log or creating new metadata), but 
we check defensively.
+    val transactionVersion = {
+      val clientTransactionVersion = 
pendingCompleteTxn.txnMetadata.clientTransactionVersion()
+      if (clientTransactionVersion != null) {
+        clientTransactionVersion.featureLevel()
+      } else {
+        0.toShort
+      }
+    }
+
     for ((broker: Option[Node], topicPartitions: 
immutable.Set[TopicPartition]) <- partitionsByDestination) {
       broker match {
         case Some(brokerNode) =>
-          val marker = new TxnMarkerEntry(producerId, producerEpoch, 
coordinatorEpoch, result, topicPartitions.toList.asJava)
+          val marker = new TxnMarkerEntry(producerId, producerEpoch, 
coordinatorEpoch, result, topicPartitions.toList.asJava, transactionVersion)
           val pendingCompleteTxnAndMarker = 
PendingCompleteTxnAndMarkerEntry(pendingCompleteTxn, marker)
 
           if (brokerNode == Node.noNode) {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 7699d643a3e..56da07509eb 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -299,10 +299,16 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(0, 
channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
 
     val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
-        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
+      util.List.of(
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1), 
TransactionVersion.TV_2.featureLevel()),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1), 
TransactionVersion.TV_2.featureLevel())
+      )
+    ).build()
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
-      util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
+      util.List.of(
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition2), 
TransactionVersion.TV_2.featureLevel())
+      )
+    ).build()
 
     val requests: Map[Node, WriteTxnMarkersRequest] = 
channelManager.generateRequests().asScala.map { handler =>
       (handler.destination, 
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -369,10 +375,16 @@ class TransactionMarkerChannelManagerTest {
     assertEquals(1, 
channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
 
     val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
-      util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
-        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
+      util.List.of(
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1), 
TransactionVersion.TV_2.featureLevel()),
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition1), 
TransactionVersion.TV_2.featureLevel())
+      )
+    ).build()
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
-      util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
+      util.List.of(
+        new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(partition2), 
TransactionVersion.TV_2.featureLevel())
+      )
+    ).build()
 
     val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = 
channelManager.generateRequests().asScala.map { handler =>
       (handler.destination, 
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index e955a9009ce..b34953fce33 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -46,7 +46,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private val pendingCompleteTxnAndMarkers = util.List.of(
     PendingCompleteTxnAndMarkerEntry(
       PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, 
txnMetadata.prepareComplete(42)),
-      new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(topicPartition))))
+      new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch, 
coordinatorEpoch, txnResult, util.List.of(topicPartition), 0)))
 
   private val markerChannelManager: TransactionMarkerChannelManager =
     mock(classOf[TransactionMarkerChannelManager])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 96ea2ae8858..6fe6e0fb44f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3109,8 +3109,8 @@ class KafkaApisTest extends Logging {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
     val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
       util.List.of(
-        new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, 
util.List.of(topicPartition)),
-        new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT, 
util.List.of(topicPartition)),
+        new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, 
util.List.of(topicPartition), TransactionVersion.TV_2.featureLevel()),
+        new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT, 
util.List.of(topicPartition), TransactionVersion.TV_2.featureLevel())
       )).build()
     val request = buildRequest(writeTxnMarkersRequest)
     val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] = 
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
@@ -3245,14 +3245,16 @@ class KafkaApisTest extends Logging {
           1.toShort,
           0,
           TransactionResult.COMMIT,
-          util.List.of(offset0, foo0)
+          util.List.of(offset0, foo0),
+          TransactionVersion.TV_2.featureLevel()
         ),
         new TxnMarkerEntry(
           2L,
           1.toShort,
           0,
           TransactionResult.ABORT,
-          util.List.of(offset1, foo1)
+          util.List.of(offset1, foo1),
+          TransactionVersion.TV_2.featureLevel()
         )
       )
     ).build()
@@ -3368,7 +3370,8 @@ class KafkaApisTest extends Logging {
           1.toShort,
           0,
           TransactionResult.COMMIT,
-          util.List.of(offset0)
+          util.List.of(offset0),
+          TransactionVersion.TV_2.featureLevel()
         )
       )
     ).build()
@@ -10094,7 +10097,7 @@ class KafkaApisTest extends Logging {
 
   private def createWriteTxnMarkersRequest(partitions: 
util.List[TopicPartition]) = {
     val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
-      util.List.of(new TxnMarkerEntry(1, 1.toShort, 0, 
TransactionResult.COMMIT, partitions))).build()
+      util.List.of(new TxnMarkerEntry(1, 1.toShort, 0, 
TransactionResult.COMMIT, partitions, 
TransactionVersion.TV_2.featureLevel()))).build()
     (writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest))
   }
 

Reply via email to