This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 385b70d2f8d KAFKA-18475 Flaky
PlaintextProducerSendTest.testCloseWithZeroTimeoutFromCallerThread (#20791)
385b70d2f8d is described below
commit 385b70d2f8d17be2e235ed029deb2e5acff57795
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Tue Nov 25 21:06:51 2025 -0500
KAFKA-18475 Flaky
PlaintextProducerSendTest.testCloseWithZeroTimeoutFromCallerThread (#20791)
# Description
The test `testCloseWithZeroTimeoutFromCallerThread` is flaky. The
consumer may gets all of the messages after the producer is force
closed, while futures in the producer are completed exceptionally.
The bug comes from a race condition introduced by
`RecordAccumulator#close` and `RecordAccumulator#batchReady`.
`RecordAccumulator#close` sets the closed flag to true, and
`RecordAccumulator#batchReady` thinks the batch is sendable. As a result
those batches are sent in the same `Sender#runOnce` call because
`runOnce` doesn't check the `forceClose` flag.
# Change
The race condition can not be fixed entirely without using a lock, which
is expensive. The current force close method tries its best to close
without sending more data and we can make no further guarantee.
The test already asserts that after force close at most one more
`Sender#runOnce` would be executed. Hence the assertion in consumer side
is removed.
500 runs:
<img width="925" height="98" alt="Screenshot 2025-11-22 at 7 48 08 PM"
src="https://github.com/user-attachments/assets/3cfecd36-e4de-4cff-a7dd-d37636be69c8"
/>
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala | 2 --
1 file changed, 2 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index add18b260cd..77e8fdf2214 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -508,7 +508,6 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
def testCloseWithZeroTimeoutFromCallerThread(groupProtocol: String): Unit = {
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers,
2, 2)
val partition = 0
- consumer.assign(java.util.List.of(new TopicPartition(topic, partition)))
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic,
partition, null,
"value".getBytes(StandardCharsets.UTF_8))
@@ -522,7 +521,6 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
val e = assertThrows(classOf[ExecutionException], () => future.get())
assertEquals(classOf[KafkaException], e.getCause.getClass)
}
- assertEquals(0, consumer.poll(Duration.ofMillis(50L)).count, "Fetch
response should have no message returned.")
}
}