This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 822b8ab3d75 KAFKA-18691: Flaky test testFencingOnTransactionExpiration
(#18793)
822b8ab3d75 is described below
commit 822b8ab3d75f54e56d0bf23fcd86f31c08e27629
Author: Justine Olshan <[email protected]>
AuthorDate: Tue Feb 4 08:45:34 2025 -0800
KAFKA-18691: Flaky test testFencingOnTransactionExpiration (#18793)
It appears this test was failing because the transaction was never aborting
and the concurrent transactions errors would not go away.
ccab9eb introduced the test failure because it requires the transaction to
complete, but I suspect the lack of completion was happening before the change.
The timeout for the write is based on the transactional timeout, and 100ms
seemed too small -- thus the requests to update the state would often
repeatedly time out.
Also removed the loop since it was not necessary.
Reviewers: Jeff Kim <[email protected]>, Calvin Liu <[email protected]>
---
.../integration/kafka/api/TransactionsTest.scala | 26 +++++++++-------------
1 file changed, 11 insertions(+), 15 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index fed2b313d06..27131e58023 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -605,7 +605,7 @@ class TransactionsTest extends IntegrationTestHarness {
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testFencingOnTransactionExpiration(quorum: String, groupProtocol:
String): Unit = {
- val producer = createTransactionalProducer("expiringProducer",
transactionTimeoutMs = 100)
+ val producer = createTransactionalProducer("expiringProducer",
transactionTimeoutMs = 300)
producer.initTransactions()
producer.beginTransaction()
@@ -617,20 +617,16 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
- TestUtils.waitUntilTrue(() => {
- var foundException = false
- try {
- // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException. We may see some
concurrentTransactionsExceptions.
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
- fail("should have raised an error due to concurrent transactions or
invalid producer epoch")
- } catch {
- case _: ConcurrentTransactionsException =>
- case _: InvalidProducerEpochException =>
- case e: ExecutionException =>
- foundException =
e.getCause.isInstanceOf[InvalidProducerEpochException]
- }
- foundException
- }, "Never returned the expected InvalidProducerEpochException")
+ try {
+ // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException. We may see some
concurrentTransactionsExceptions.
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
+ fail("should have raised an error due to concurrent transactions or
invalid producer epoch")
+ } catch {
+ case _: ConcurrentTransactionsException =>
+ case _: InvalidProducerEpochException =>
+ case e: ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException],
"Error was " + e.getCause + " and not InvalidProducerEpochException")
+ }
// Verify that the first message was aborted and the second one was never
written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head