This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ee5a2ac26a KAFKA-19446: Add Transaction Version to request builder
[2/3] (#20868)
7ee5a2ac26a is described below
commit 7ee5a2ac26a9149fbbddc2d62f5fd7f6228beda3
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Nov 13 18:21:55 2025 -0800
KAFKA-19446: Add Transaction Version to request builder [2/3] (#20868)
In this patch for KIP-1228, we add a new WriteTxnMarkersRequest builder
constructor that includes the transaction versions for each marker.
The transaction version is already stored in the metadata on the
transaction coordinator. We now extract this version information and
pass it to the builder via the TxnMarkerEntry, which propagates it to
partition leaders.
Since the TransactionVersion field is marked as ignorable, we always
include it when building the request. The field will be automatically
omitted without any errors, during serialization if the request
version is < 2, ensuring backward compatibility with brokers that only
support version 1.
Reviewers: Justine Olshan <[email protected]>
---
.../common/requests/WriteTxnMarkersRequest.java | 43 ++++--
.../kafka/common/requests/RequestResponseTest.java | 2 +-
.../requests/WriteTxnMarkersRequestTest.java | 161 +++++++++++++++++++--
.../TransactionMarkerChannelManager.scala | 13 +-
.../TransactionMarkerChannelManagerTest.scala | 24 ++-
...sactionMarkerRequestCompletionHandlerTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 15 +-
7 files changed, 224 insertions(+), 36 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index 38f416654f2..cf7e771c550 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -38,17 +38,20 @@ public class WriteTxnMarkersRequest extends AbstractRequest
{
private final int coordinatorEpoch;
private final TransactionResult result;
private final List<TopicPartition> partitions;
+ private final short transactionVersion;
public TxnMarkerEntry(long producerId,
short producerEpoch,
int coordinatorEpoch,
TransactionResult result,
- List<TopicPartition> partitions) {
+ List<TopicPartition> partitions,
+ short transactionVersion) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.coordinatorEpoch = coordinatorEpoch;
this.result = result;
this.partitions = partitions;
+ this.transactionVersion = transactionVersion;
}
public long producerId() {
@@ -71,6 +74,10 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
return partitions;
}
+ public short transactionVersion() {
+ return transactionVersion;
+ }
+
@Override
public String toString() {
return "TxnMarkerEntry{" +
@@ -79,6 +86,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
", coordinatorEpoch=" + coordinatorEpoch +
", result=" + result +
", partitions=" + partitions +
+ ", transactionVersion=" + transactionVersion +
'}';
}
@@ -91,12 +99,13 @@ public class WriteTxnMarkersRequest extends AbstractRequest
{
producerEpoch == that.producerEpoch &&
coordinatorEpoch == that.coordinatorEpoch &&
result == that.result &&
+ transactionVersion == that.transactionVersion &&
Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
- return Objects.hash(producerId, producerEpoch, coordinatorEpoch,
result, partitions);
+ return Objects.hash(producerId, producerEpoch, coordinatorEpoch,
result, partitions, transactionVersion);
}
}
@@ -109,6 +118,11 @@ public class WriteTxnMarkersRequest extends
AbstractRequest {
this.data = data;
}
+ /**
+ * Creates a builder with the given markers. Transaction versions are
read from each marker entry.
+ *
+ * @param markers the list of transaction marker entries
+ */
public Builder(final List<TxnMarkerEntry> markers) {
// version will be determined at build time based on broker
capabilities
super(ApiKeys.WRITE_TXN_MARKERS);
@@ -123,12 +137,18 @@ public class WriteTxnMarkersRequest extends
AbstractRequest {
topicMap.put(topicPartition.topic(), topic);
}
- dataMarkers.add(new WritableTxnMarker()
- .setProducerId(marker.producerId)
- .setProducerEpoch(marker.producerEpoch)
-
.setCoordinatorEpoch(marker.coordinatorEpoch)
-
.setTransactionResult(marker.transactionResult().id)
- .setTopics(new
ArrayList<>(topicMap.values())));
+ WritableTxnMarker writableMarker = new WritableTxnMarker()
+ .setProducerId(marker.producerId)
+ .setProducerEpoch(marker.producerEpoch)
+ .setCoordinatorEpoch(marker.coordinatorEpoch)
+ .setTransactionResult(marker.transactionResult().id)
+ .setTopics(new ArrayList<>(topicMap.values()));
+
+ // Set transaction version from the marker entry (KIP-1228).
+ // Serialization will automatically omit TransactionVersion
field in version 1 since it's ignorable.
+ writableMarker.setTransactionVersion((byte)
marker.transactionVersion);
+
+ dataMarkers.add(writableMarker);
}
this.data = new
WriteTxnMarkersRequestData().setMarkers(dataMarkers);
}
@@ -178,12 +198,17 @@ public class WriteTxnMarkersRequest extends
AbstractRequest {
topicPartitions.add(new TopicPartition(topic.name(),
partitionIdx));
}
}
+ // Read transactionVersion from raw marker data.
+ // For request version 1, this field is set to 0 during
deserialization since it's ignorable.
+ short transactionVersion = markerEntry.transactionVersion();
+
markers.add(new TxnMarkerEntry(
markerEntry.producerId(),
markerEntry.producerEpoch(),
markerEntry.coordinatorEpoch(),
TransactionResult.forId(markerEntry.transactionResult()),
- topicPartitions)
+ topicPartitions,
+ transactionVersion)
);
}
return markers;
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7834db2e5fa..9b0a75506da 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2826,7 +2826,7 @@ public class RequestResponseTest {
private WriteTxnMarkersRequest createWriteTxnMarkersRequest(short version)
{
List<TopicPartition> partitions = singletonList(new
TopicPartition("topic", 73));
- WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new
WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73,
TransactionResult.ABORT, partitions);
+ WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new
WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73,
TransactionResult.ABORT, partitions, (short) 0);
return new
WriteTxnMarkersRequest.Builder(singletonList(txnMarkerEntry)).build(version);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
index 045ac932cf9..178f834e07a 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java
@@ -17,7 +17,9 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.BeforeEach;
@@ -27,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class WriteTxnMarkersRequestTest {
@@ -45,23 +48,51 @@ public class WriteTxnMarkersRequestTest {
markers = Collections.singletonList(
new WriteTxnMarkersRequest.TxnMarkerEntry(
PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
- RESULT, Collections.singletonList(TOPIC_PARTITION))
+ RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 0)
);
}
@Test
public void testConstructor() {
- WriteTxnMarkersRequest.Builder builder = new
WriteTxnMarkersRequest.Builder(markers);
- for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) {
- WriteTxnMarkersRequest request = builder.build(version);
- assertEquals(1, request.markers().size());
- WriteTxnMarkersRequest.TxnMarkerEntry marker =
request.markers().get(0);
- assertEquals(PRODUCER_ID, marker.producerId());
- assertEquals(PRODUCER_EPOCH, marker.producerEpoch());
- assertEquals(COORDINATOR_EPOCH, marker.coordinatorEpoch());
- assertEquals(RESULT, marker.transactionResult());
- assertEquals(Collections.singletonList(TOPIC_PARTITION),
marker.partitions());
- }
+ // We always set the transaction version in the request data using the
arguments provided to the builder.
+ // If the version doesn't support it, it will be omitted during
serialization.
+
+ // Test constructor with transactionVersion = 2
+ List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion =
Collections.singletonList(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+ RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
+ );
+
+ // Build with request version 1.
+ WriteTxnMarkersRequest.Builder builder = new
WriteTxnMarkersRequest.Builder(markersWithVersion);
+ WriteTxnMarkersRequest request = builder.build((short) 1);
+ assertEquals(1, request.data().markers().size());
+ WriteTxnMarkersRequestData.WritableTxnMarker dataMarker =
request.data().markers().get(0);
+ assertEquals(PRODUCER_ID, dataMarker.producerId());
+ assertEquals(PRODUCER_EPOCH, dataMarker.producerEpoch());
+ assertEquals(COORDINATOR_EPOCH, dataMarker.coordinatorEpoch());
+ assertEquals(RESULT.id, dataMarker.transactionResult());
+ assertEquals(1, dataMarker.topics().size());
+ assertEquals(TOPIC_PARTITION.topic(),
dataMarker.topics().get(0).name());
+ assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()),
dataMarker.topics().get(0).partitionIndexes());
+ // Verify TransactionVersion is set to 2 in the data irrespective of
the request version
+ assertEquals((byte) 2, dataMarker.transactionVersion());
+
+ // Build with request version 2
+ WriteTxnMarkersRequest.Builder builderWithVersions = new
WriteTxnMarkersRequest.Builder(markersWithVersion);
+ WriteTxnMarkersRequest requestWithVersion =
builderWithVersions.build((short) 2);
+ assertEquals(1, requestWithVersion.data().markers().size());
+ WriteTxnMarkersRequestData.WritableTxnMarker dataMarkerWithVersion =
requestWithVersion.data().markers().get(0);
+ assertEquals(PRODUCER_ID, dataMarkerWithVersion.producerId());
+ assertEquals(PRODUCER_EPOCH, dataMarkerWithVersion.producerEpoch());
+ assertEquals(COORDINATOR_EPOCH,
dataMarkerWithVersion.coordinatorEpoch());
+ assertEquals(RESULT.id, dataMarkerWithVersion.transactionResult());
+ assertEquals(1, dataMarkerWithVersion.topics().size());
+ assertEquals(TOPIC_PARTITION.topic(),
dataMarkerWithVersion.topics().get(0).name());
+ assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()),
dataMarkerWithVersion.topics().get(0).partitionIndexes());
+ // Verify TransactionVersion is set to 2 in the data
+ assertEquals((byte) 2, dataMarkerWithVersion.transactionVersion());
}
@Test
@@ -79,4 +110,110 @@ public class WriteTxnMarkersRequestTest {
assertEquals(0, errorResponse.throttleTimeMs());
}
}
+
+ @Test
+ public void testTransactionVersion() {
+ // Test that TransactionVersion is set correctly and serialization
handles it properly.
+ List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion =
Collections.singletonList(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+ RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
+ );
+ WriteTxnMarkersRequest.Builder builder = new
WriteTxnMarkersRequest.Builder(markersWithVersion);
+
+ // Test request version 2 - TransactionVersion should be included.
+ WriteTxnMarkersRequest requestV2 = builder.build((short) 2);
+ assertNotNull(requestV2);
+ assertEquals(1, requestV2.markers().size());
+
+ // Verify TransactionVersion is set to 2 in the request data.
+ assertEquals((byte) 2,
requestV2.data().markers().get(0).transactionVersion());
+ // Verify the request can be serialized for version 2
(TransactionVersion field included).
+ // This should not throw an exception.
+ ByteBufferAccessor serializedV2 = requestV2.serialize();
+ assertNotNull(serializedV2, "Serialization should succeed without
error for version 2");
+ // Test deserialization for version 2 - verify TransactionVersion
field was included during serialization.
+ // Use the already serialized request and parse it back to verify the
field is present.
+ serializedV2.buffer().rewind();
+ RequestAndSize requestAndSizeV2 = AbstractRequest.parseRequest(
+ ApiKeys.WRITE_TXN_MARKERS, (short) 2, serializedV2);
+ WriteTxnMarkersRequest parsedRequestV2 = (WriteTxnMarkersRequest)
requestAndSizeV2.request;
+ assertNotNull(parsedRequestV2);
+ assertEquals(1, parsedRequestV2.markers().size());
+ // After deserialization, TransactionVersion should be 2 because it
was included during serialization.
+ assertEquals((short) 2,
parsedRequestV2.markers().get(0).transactionVersion());
+ // Verify the data also shows 2 (since it was read from serialized
bytes with the field).
+ assertEquals((byte) 2,
parsedRequestV2.data().markers().get(0).transactionVersion());
+
+ // Test request version 1 - TransactionVersion should be omitted
(ignorable field).
+ WriteTxnMarkersRequest requestV1 = builder.build((short) 1);
+ assertNotNull(requestV1);
+ assertEquals(1, requestV1.markers().size());
+
+ // Verify TransactionVersion is still set to 2 in the request data
(even for version 1).
+ // This is what the coordinator has when building the request - data()
is used before serialization.
+ // The field value is preserved in the data, but will be omitted
during serialization.
+ assertEquals((byte) 2,
requestV1.data().markers().get(0).transactionVersion());
+ // Verify the request can be serialized for version 1
(TransactionVersion field omitted).
+ // This should not throw an exception even though TransactionVersion
is set to 2
+ // because the field is marked as ignorable.
+ ByteBufferAccessor serializedV1 = requestV1.serialize();
+ assertNotNull(serializedV1, "Serialization should succeed without
error for version 1 even with TransactionVersion set");
+ // Test deserialization for version 1 - verify TransactionVersion
field was omitted during serialization.
+ // Use the already serialized request and parse it back to verify the
field is not present.
+ serializedV1.buffer().rewind();
+ RequestAndSize requestAndSizeV1 = AbstractRequest.parseRequest(
+ ApiKeys.WRITE_TXN_MARKERS, (short) 1, serializedV1);
+ WriteTxnMarkersRequest parsedRequestV1 = (WriteTxnMarkersRequest)
requestAndSizeV1.request;
+ assertNotNull(parsedRequestV1);
+ assertEquals(1, parsedRequestV1.markers().size());
+ // After deserialization, TransactionVersion should be 0 because it
was omitted during serialization.
+ // The field is not present in the serialized bytes for version 1, so
it defaults to 0.
+ assertEquals((short) 0,
parsedRequestV1.markers().get(0).transactionVersion());
+ // Verify the data also shows 0 (since it was read from serialized
bytes without the field).
+ assertEquals((byte) 0,
parsedRequestV1.data().markers().get(0).transactionVersion());
+ }
+
+ @Test
+ public void testRequestWithMultipleMarkersDifferentTransactionVersions() {
+ // Test building a request with two markers - one with tv1 and one
with tv2
+ // and verify that the right transaction versions are updated in the
request data
+ TopicPartition topicPartition1 = new TopicPartition("topic1", 0);
+ TopicPartition topicPartition2 = new TopicPartition("topic2", 1);
+ long producerId1 = 100L;
+ long producerId2 = 200L;
+
+ List<WriteTxnMarkersRequest.TxnMarkerEntry>
markersWithDifferentVersions = List.of(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ producerId1, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+ RESULT, Collections.singletonList(topicPartition1), (short)
1), // tv1
+ new WriteTxnMarkersRequest.TxnMarkerEntry(
+ producerId2, PRODUCER_EPOCH, COORDINATOR_EPOCH,
+ RESULT, Collections.singletonList(topicPartition2), (short) 2)
// tv2
+ );
+
+ WriteTxnMarkersRequest.Builder builder = new
WriteTxnMarkersRequest.Builder(markersWithDifferentVersions);
+ WriteTxnMarkersRequest request = builder.build((short) 2);
+
+ assertNotNull(request);
+ assertEquals(2, request.data().markers().size());
+
+ // Verify first marker has tv1 (transactionVersion = 1) in the request
data
+ WriteTxnMarkersRequestData.WritableTxnMarker dataMarker1 =
request.data().markers().get(0);
+ assertEquals(producerId1, dataMarker1.producerId());
+ assertEquals((byte) 1, dataMarker1.transactionVersion());
+
+ // Verify second marker has tv2 (transactionVersion = 2) in the
request data
+ WriteTxnMarkersRequestData.WritableTxnMarker dataMarker2 =
request.data().markers().get(1);
+ assertEquals(producerId2, dataMarker2.producerId());
+ assertEquals((byte) 2, dataMarker2.transactionVersion());
+
+ // Verify markers() method also returns correct transaction versions
+ List<WriteTxnMarkersRequest.TxnMarkerEntry> markers =
request.markers();
+ assertEquals(2, markers.size());
+ assertEquals((short) 1, markers.get(0).transactionVersion());
+ assertEquals(producerId1, markers.get(0).producerId());
+ assertEquals((short) 2, markers.get(1).transactionVersion());
+ assertEquals(producerId2, markers.get(1).producerId());
+ }
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 227eb9881a2..21d2876d824 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -391,10 +391,21 @@ class TransactionMarkerChannelManager(
}
val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
+ // Extract transaction version from metadata. In practice,
clientTransactionVersion should never be null
+ // (it's always set when loading from log or creating new metadata), but
we check defensively.
+ val transactionVersion = {
+ val clientTransactionVersion =
pendingCompleteTxn.txnMetadata.clientTransactionVersion()
+ if (clientTransactionVersion != null) {
+ clientTransactionVersion.featureLevel()
+ } else {
+ 0.toShort
+ }
+ }
+
for ((broker: Option[Node], topicPartitions:
immutable.Set[TopicPartition]) <- partitionsByDestination) {
broker match {
case Some(brokerNode) =>
- val marker = new TxnMarkerEntry(producerId, producerEpoch,
coordinatorEpoch, result, topicPartitions.toList.asJava)
+ val marker = new TxnMarkerEntry(producerId, producerEpoch,
coordinatorEpoch, result, topicPartitions.toList.asJava, transactionVersion)
val pendingCompleteTxnAndMarker =
PendingCompleteTxnAndMarkerEntry(pendingCompleteTxn, marker)
if (brokerNode == Node.noNode) {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 7699d643a3e..56da07509eb 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -299,10 +299,16 @@ class TransactionMarkerChannelManagerTest {
assertEquals(0,
channelManager.queueForBroker(broker2.id).get.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
- util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1,
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
- new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
+ util.List.of(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1),
TransactionVersion.TV_2.featureLevel()),
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1),
TransactionVersion.TV_2.featureLevel())
+ )
+ ).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
- util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1,
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
+ util.List.of(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition2),
TransactionVersion.TV_2.featureLevel())
+ )
+ ).build()
val requests: Map[Node, WriteTxnMarkersRequest] =
channelManager.generateRequests().asScala.map { handler =>
(handler.destination,
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
@@ -369,10 +375,16 @@ class TransactionMarkerChannelManagerTest {
assertEquals(1,
channelManager.queueForUnknownBroker.totalNumMarkers(txnTopicPartition2))
val expectedBroker1Request = new WriteTxnMarkersRequest.Builder(
- util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1,
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition1)),
- new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1)))).build()
+ util.List.of(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1),
TransactionVersion.TV_2.featureLevel()),
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId2, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition1),
TransactionVersion.TV_2.featureLevel())
+ )
+ ).build()
val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
- util.List.of(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1,
producerEpoch, coordinatorEpoch, txnResult, util.List.of(partition2)))).build()
+ util.List.of(
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(partition2),
TransactionVersion.TV_2.featureLevel())
+ )
+ ).build()
val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] =
channelManager.generateRequests().asScala.map { handler =>
(handler.destination,
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index e955a9009ce..b34953fce33 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -46,7 +46,7 @@ class TransactionMarkerRequestCompletionHandlerTest {
private val pendingCompleteTxnAndMarkers = util.List.of(
PendingCompleteTxnAndMarkerEntry(
PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata,
txnMetadata.prepareComplete(42)),
- new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(topicPartition))))
+ new WriteTxnMarkersRequest.TxnMarkerEntry(producerId, producerEpoch,
coordinatorEpoch, txnResult, util.List.of(topicPartition), 0)))
private val markerChannelManager: TransactionMarkerChannelManager =
mock(classOf[TransactionMarkerChannelManager])
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 96ea2ae8858..6fe6e0fb44f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3109,8 +3109,8 @@ class KafkaApisTest extends Logging {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
util.List.of(
- new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT,
util.List.of(topicPartition)),
- new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT,
util.List.of(topicPartition)),
+ new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT,
util.List.of(topicPartition), TransactionVersion.TV_2.featureLevel()),
+ new TxnMarkerEntry(2, 1.toShort, 0, TransactionResult.COMMIT,
util.List.of(topicPartition), TransactionVersion.TV_2.featureLevel())
)).build()
val request = buildRequest(writeTxnMarkersRequest)
val capturedResponse: ArgumentCaptor[WriteTxnMarkersResponse] =
ArgumentCaptor.forClass(classOf[WriteTxnMarkersResponse])
@@ -3245,14 +3245,16 @@ class KafkaApisTest extends Logging {
1.toShort,
0,
TransactionResult.COMMIT,
- util.List.of(offset0, foo0)
+ util.List.of(offset0, foo0),
+ TransactionVersion.TV_2.featureLevel()
),
new TxnMarkerEntry(
2L,
1.toShort,
0,
TransactionResult.ABORT,
- util.List.of(offset1, foo1)
+ util.List.of(offset1, foo1),
+ TransactionVersion.TV_2.featureLevel()
)
)
).build()
@@ -3368,7 +3370,8 @@ class KafkaApisTest extends Logging {
1.toShort,
0,
TransactionResult.COMMIT,
- util.List.of(offset0)
+ util.List.of(offset0),
+ TransactionVersion.TV_2.featureLevel()
)
)
).build()
@@ -10094,7 +10097,7 @@ class KafkaApisTest extends Logging {
private def createWriteTxnMarkersRequest(partitions:
util.List[TopicPartition]) = {
val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
- util.List.of(new TxnMarkerEntry(1, 1.toShort, 0,
TransactionResult.COMMIT, partitions))).build()
+ util.List.of(new TxnMarkerEntry(1, 1.toShort, 0,
TransactionResult.COMMIT, partitions,
TransactionVersion.TV_2.featureLevel()))).build()
(writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest))
}