This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 45327fd597b KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWithTV2Disabled failed on trunk (#20102) 45327fd597b is described below commit 45327fd597b68ed4651361ecad913f7c719fffeb Author: Gaurav Narula <gaurav_naru...@apple.com> AuthorDate: Fri Jul 4 22:00:10 2025 +0100 KAFKA-18035: Backport TransactionsTest testBumpTransactionalEpochWithTV2Disabled failed on trunk (#20102) Backports the flakyness fix in #18451 to 4.0 branch > Sometimes we didn't get into abortable state before aborting, so the epoch didn't get bumped. Now we force abortable state with an attempt to send before aborting so the epoch bump occurs as expected. > > Reviewers: Jeff Kim <jeff....@confluent.io> Reviewers: Chia-Ping Tsai <chia7...@gmail.com> Co-authored-by: Justine Olshan <jols...@confluent.io> --- .../scala/integration/kafka/api/TransactionsTest.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 74737668127..c531f62595a 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -21,7 +21,7 @@ import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{ConcurrentTransactionsException, InvalidProducerEpochException, ProducerFencedException, TimeoutException} import org.apache.kafka.common.test.api.Flaky import org.apache.kafka.coordinator.group.GroupCoordinatorConfig @@ -738,6 +738,19 @@ class TransactionsTest extends IntegrationTestHarness { restartDeadBrokers() org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, classOf[TimeoutException]) + // Ensure the producer transitions to abortable_error state. + TestUtils.waitUntilTrue(() => { + var failed = false + try { + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false)) + } catch { + case e: Exception => + if (e.isInstanceOf[KafkaException]) + failed = true + } + failed + }, "The send request never failed as expected.") + assertThrows(classOf[KafkaException], () => producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(testTopic, 0, "3", "3", willBeCommitted = false))) producer.abortTransaction() producer.beginTransaction() @@ -760,7 +773,7 @@ class TransactionsTest extends IntegrationTestHarness { producerStateEntry = brokers(partitionLeader).logManager.getLog(new TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.get(producerId) assertNotNull(producerStateEntry) - assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch) + assertTrue(producerStateEntry.producerEpoch > initialProducerEpoch, "InitialProduceEpoch: " + initialProducerEpoch + " ProducerStateEntry: " + producerStateEntry) } finally { producer.close(Duration.ZERO) }