This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 385b70d2f8d KAFKA-18475 Flaky 
PlaintextProducerSendTest.testCloseWithZeroTimeoutFromCallerThread (#20791)
385b70d2f8d is described below

commit 385b70d2f8d17be2e235ed029deb2e5acff57795
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Tue Nov 25 21:06:51 2025 -0500

    KAFKA-18475 Flaky 
PlaintextProducerSendTest.testCloseWithZeroTimeoutFromCallerThread (#20791)
    
    # Description
    The test `testCloseWithZeroTimeoutFromCallerThread` is flaky. The
    consumer may gets all of the messages after the producer is force
    closed, while futures in the producer are completed exceptionally.
    
    The bug comes from a race condition introduced by
    `RecordAccumulator#close` and `RecordAccumulator#batchReady`.
    `RecordAccumulator#close` sets the closed flag to true, and
    `RecordAccumulator#batchReady` thinks the batch is sendable. As a result
    those batches are sent in the same `Sender#runOnce` call because
    `runOnce` doesn't check the `forceClose` flag.
    
    # Change
    
    The race condition can not be fixed entirely without using a lock, which
    is expensive. The current force close method tries its best to close
    without sending more data and we can make no further guarantee.
    
    The test already asserts that after force close at most one more
    `Sender#runOnce` would be executed. Hence the assertion in consumer side
    is removed.
    
    500 runs:
    <img width="925" height="98" alt="Screenshot 2025-11-22 at 7 48 08 PM"
    
    
src="https://github.com/user-attachments/assets/3cfecd36-e4de-4cff-a7dd-d37636be69c8";
    />
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index add18b260cd..77e8fdf2214 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -508,7 +508,6 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   def testCloseWithZeroTimeoutFromCallerThread(groupProtocol: String): Unit = {
     TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 
2, 2)
     val partition = 0
-    consumer.assign(java.util.List.of(new TopicPartition(topic, partition)))
     val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 
partition, null,
       "value".getBytes(StandardCharsets.UTF_8))
 
@@ -522,7 +521,6 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
         val e = assertThrows(classOf[ExecutionException], () => future.get())
         assertEquals(classOf[KafkaException], e.getCause.getClass)
       }
-      assertEquals(0, consumer.poll(Duration.ofMillis(50L)).count, "Fetch 
response should have no message returned.")
     }
   }
 

Reply via email to