This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository

The following commit(s) were added to refs/heads/trunk by this push:
     new d3b4c1bdf41 KAFKA-18401: Transaction version 2 does not support commit 
transaction without records (#18448)
d3b4c1bdf41 is described below

commit d3b4c1bdf41de2d9144e0c2adf41ad08a3fee2e6
Author: Kuan-Po Tseng <>
AuthorDate: Thu Jan 16 02:21:11 2025 +0800

    KAFKA-18401: Transaction version 2 does not support commit transaction 
without records (#18448)
    Fix the issue where producer.commitTransaction under transaction version 2 
throws error if no partition or offset is added to transaction. The solution is 
to avoid sending the endTxnRequest unless producer.send or 
producer.sendOffsetsToTransaction is triggered.
    Reviewers: Justine Olshan <>
 .../producer/internals/     |  13 +-
 .../transaction/ProducerIdsIntegrationTest.scala   |  88 --------
 .../transaction/ProducerIntegrationTest.scala      | 223 +++++++++++++++++++++
 3 files changed, 233 insertions(+), 91 deletions(-)

diff --git 
index ad73fa6ba37..4ddf8a13de0 100644
@@ -381,6 +381,7 @@ public class TransactionManager {
         if (isTransactionV2Enabled()) {
             log.debug("Begin adding offsets {} for consumer group {} to 
transaction with transaction protocol V2", offsets, groupMetadata);
             handler = txnOffsetCommitHandler(null, offsets, groupMetadata);
+            transactionStarted = true;
         } else {
             log.debug("Begin adding offsets {} for consumer group {} to 
transaction", offsets, groupMetadata);
             AddOffsetsToTxnRequest.Builder builder = new 
@@ -411,6 +412,7 @@ public class TransactionManager {
             } else if (isTransactionV2Enabled()) {
+                transactionStarted = true;
             } else if (transactionContainsPartition(topicPartition) || 
isPartitionPendingAdd(topicPartition)) {
             } else {
@@ -854,11 +856,16 @@ public class TransactionManager {
             return null;
-        if (nextRequestHandler.isEndTxn() && (!isTransactionV2Enabled() && 
!transactionStarted)) {
+        if (nextRequestHandler.isEndTxn() && !transactionStarted) {
             if (currentState != State.FATAL_ERROR) {
-                log.debug("Not sending EndTxn for completed transaction since 
no partitions " +
-                        "or offsets were successfully added");
+                if (isTransactionV2Enabled) {
+                    log.debug("Not sending EndTxn for completed transaction 
since no send " +
+                            "or sendOffsetsToTransaction were triggered");
+                } else {
+                    log.debug("Not sending EndTxn for completed transaction 
since no partitions " +
+                            "or offsets were successfully added");
+                }
             nextRequestHandler = pendingRequests.poll();
diff --git 
deleted file mode 100644
index d2b1358b369..00000000000
+++ /dev/null
@@ -1,88 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator.transaction
-import kafka.server.IntegrationTestUtils
-import org.apache.kafka.common.test.api.ClusterInstance
-import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
-import org.apache.kafka.common.test.api.ClusterTestExtensions
-import org.apache.kafka.common.message.InitProducerIdRequestData
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{InitProducerIdRequest, 
-import org.apache.kafka.server.common.MetadataVersion
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.extension.ExtendWith
-import{Collectors, IntStream}
-import scala.concurrent.duration.DurationInt
-import scala.jdk.CollectionConverters._
-@ClusterTestDefaults(serverProperties = Array(
-  new ClusterConfigProperty(key = "transaction.state.log.num.partitions", 
value = "1")
-@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-class ProducerIdsIntegrationTest {
-  @ClusterTests(Array(
-    new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion = 
-  ))
-  def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
-    verifyUniqueIds(clusterInstance)
-  }
-  private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
-    // Request enough PIDs from each broker to ensure each broker generates 
two blocks
-    val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
-      IntStream.range(0, 1001).parallel().mapToObj( _ =>
-        nextProducerId(broker, clusterInstance.clientListener())
-      )}).collect(Collectors.toList[Long]).asScala.toSeq
-    val brokerCount = clusterInstance.brokerIds.size
-    val expectedTotalCount = 1001 * brokerCount
-    assertEquals(expectedTotalCount, ids.size, s"Expected exactly 
$expectedTotalCount IDs")
-    assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate 
producer IDs")
-  }
-  private def nextProducerId(broker: SocketServer, listener: ListenerName): 
Long = {
-    // Generating producer ids may fail while waiting for the initial block 
and also
-    // when the current block is full and waiting for the prefetched block.
-    val deadline = 5.seconds.fromNow
-    var shouldRetry = true
-    var response: InitProducerIdResponse = null
-    while (shouldRetry && deadline.hasTimeLeft()) {
-      val data = new InitProducerIdRequestData()
-        .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
-        .setProducerId(RecordBatch.NO_PRODUCER_ID)
-        .setTransactionalId(null)
-        .setTransactionTimeoutMs(10)
-      val request = new InitProducerIdRequest.Builder(data).build()
-      response = 
-        destination = broker,
-        listenerName = listener)
-      shouldRetry = == 
-    }
-    assertTrue(deadline.hasTimeLeft())
-    assertEquals(Errors.NONE.code,
-  }
diff --git 
new file mode 100644
index 00000000000..5fe06748631
--- /dev/null
@@ -0,0 +1,223 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+import kafka.server.IntegrationTestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecords, OffsetAndMetadata}
+import org.apache.kafka.clients.producer.{Producer, ProducerConfig, 
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.errors.RecordTooLargeException
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, 
ClusterFeature, ClusterInstance, ClusterTest, ClusterTestDefaults, 
ClusterTestExtensions, ClusterTests, Type}
+import org.apache.kafka.common.message.InitProducerIdRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.{InitProducerIdRequest, 
+import org.apache.kafka.common.test.TestUtils
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertInstanceOf, 
assertThrows, assertTrue}
+import org.junit.jupiter.api.extension.ExtendWith
+import java.time.Duration
+import java.util
+import java.util.Collections
+import java.util.concurrent.ExecutionException
+import{Collectors, IntStream, StreamSupport}
+import scala.concurrent.duration.DurationInt
+import scala.jdk.CollectionConverters._
+@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
+  new ClusterConfigProperty(key = 
+  new ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+  new ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
+  new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+  new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class ProducerIntegrationTest {
+  @ClusterTests(Array(
+    new ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV0)
+  ))
+  def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
+    verifyUniqueIds(clusterInstance)
+  }
+  @ClusterTests(Array(
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
+  ))
+  def testTransactionWithAndWithoutSend(cluster: ClusterInstance): Unit = {
+    val properties = new util.HashMap[String, Object]
+    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "foobar")
+    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val producer: Producer[Array[Byte], Array[Byte]] = 
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("test", 
"key".getBytes, "value".getBytes))
+      producer.commitTransaction()
+      producer.beginTransaction()
+      producer.commitTransaction()
+    } finally if (producer != null) producer.close()
+  }
+  @ClusterTests(Array(
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
+  ))
+  def testTransactionWithInvalidSendAndEndTxnRequestSent(cluster: 
ClusterInstance): Unit = {
+    val topic = new NewTopic("foobar", 1, 
+    val txnId = "test-txn"
+    val properties = new util.HashMap[String, Object]
+    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
+    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+    properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val admin = cluster.admin()
+    val producer: Producer[Array[Byte], Array[Byte]] = 
+    try {
+      admin.createTopics(List(topic).asJava)
+      producer.initTransactions()
+      producer.beginTransaction()
+      assertInstanceOf(classOf[RecordTooLargeException],
+        assertThrows(classOf[ExecutionException],
+          () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](
+  , Array.fill(100)(0: Byte), Array.fill(100)(0: 
+      producer.abortTransaction()
+    } finally {
+      if (admin != null) admin.close()
+      if (producer != null) producer.close()
+    }
+  }
+  @ClusterTests(Array(
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1))),
+    new ClusterTest(features = Array(
+      new ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2))),
+  ))
+  def testTransactionWithSendOffset(cluster: ClusterInstance): Unit = {
+    val inputTopic: String = "my-input-topic"
+    var producer: Producer[Array[Byte], Array[Byte]] = cluster.producer
+    try {
+      for (i <- 0 until 5) {
+        val key: Array[Byte] = ("key-" + i).getBytes
+        val value: Array[Byte] = ("value-" + i).getBytes
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](inputTopic, 
key, value)).get
+      }
+    } finally if (producer != null) producer.close()
+    val txnId: String = "foobar"
+    val producerProperties: util.Map[String, Object] = new 
util.HashMap[String, Object]
+    producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txnId)
+    producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "test")
+    producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    val consumerProperties: util.Map[String, Object] = new 
util.HashMap[String, Object]
+    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, 
+    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    producer = cluster.producer(producerProperties)
+    val consumer: Consumer[Array[Byte], Array[Byte]] = 
+    try {
+      producer.initTransactions()
+      producer.beginTransaction()
+      consumer.subscribe(util.List.of(inputTopic))
+      var records: ConsumerRecords[Array[Byte], Array[Byte]] = null
+      TestUtils.waitForCondition(() => {
+        records = consumer.poll(Duration.ZERO)
+        records.count == 5
+      }, "poll records size not match")
+      val lastRecord =, 
false).reduce((_, second) => second).orElse(null)
+      val offsets = Collections.singletonMap(
+        new TopicPartition(lastRecord.topic, lastRecord.partition), new 
OffsetAndMetadata(lastRecord.offset + 1))
+      producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata)
+      producer.commitTransaction()
+    } finally {
+      if (producer != null) producer.close()
+      if (consumer != null) consumer.close()
+    }
+    val admin: Admin = cluster.admin
+    try {
+      TestUtils.waitForCondition(() => {
+          .filter(txn => txn.transactionalId == txnId)
+          .anyMatch(txn => txn.state eq TransactionState.COMPLETE_COMMIT)
+      }, "transaction is not in COMPLETE_COMMIT state")
+    } finally if (admin != null) admin.close()
+  }
+  private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
+    // Request enough PIDs from each broker to ensure each broker generates 
two blocks
+    val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
+      IntStream.range(0, 1001).parallel().mapToObj( _ =>
+        nextProducerId(broker, clusterInstance.clientListener())
+      )}).collect(Collectors.toList[Long]).asScala.toSeq
+    val brokerCount = clusterInstance.brokerIds.size
+    val expectedTotalCount = 1001 * brokerCount
+    assertEquals(expectedTotalCount, ids.size, s"Expected exactly 
$expectedTotalCount IDs")
+    assertEquals(expectedTotalCount, ids.distinct.size, "Found duplicate 
producer IDs")
+  }
+  private def nextProducerId(broker: SocketServer, listener: ListenerName): 
Long = {
+    // Generating producer ids may fail while waiting for the initial block 
and also
+    // when the current block is full and waiting for the prefetched block.
+    val deadline = 5.seconds.fromNow
+    var shouldRetry = true
+    var response: InitProducerIdResponse = null
+    while (shouldRetry && deadline.hasTimeLeft()) {
+      val data = new InitProducerIdRequestData()
+        .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
+        .setProducerId(RecordBatch.NO_PRODUCER_ID)
+        .setTransactionalId(null)
+        .setTransactionTimeoutMs(10)
+      val request = new InitProducerIdRequest.Builder(data).build()
+      response = 
+        destination = broker,
+        listenerName = listener)
+      shouldRetry = == 
+    }
+    assertTrue(deadline.hasTimeLeft())
+    assertEquals(Errors.NONE.code,
+  }

Reply via email to