This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e65a82bec64 MINOR: use known number of records in 
testComplexShareConsumer to prevent timeout issue (#21235)
e65a82bec64 is described below

commit e65a82bec646e963a6ac2791e0fb7c8ee8e1b070
Author: Bolin Lin <[email protected]>
AuthorDate: Thu Jan 8 05:19:18 2026 -0500

    MINOR: use known number of records in testComplexShareConsumer to prevent 
timeout issue (#21235)
    
    In extreme situations, the existing throttling mechanism in share
    consumer limits the consumer to processing only a single record at a
    time, which can intermittently cause `testComplexShareConsumer` to time
    out. I use a known number of records to make the test reliable.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../org/apache/kafka/clients/consumer/ShareConsumerTest.java | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 32b05351e98..b67202e29fb 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2318,15 +2318,16 @@ public class ShareConsumerTest {
 
         ClientState prodState = new ClientState();
 
-        // Produce messages until we want.
+        // Produce a fixed number of messages for deterministic testing.
+        int targetRecordCount = 2000;
         service.execute(() -> {
             try (Producer<byte[], byte[]> producer = createProducer()) {
-                while (!prodState.done().get()) {
+                do {
                     ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), 
"value".getBytes());
                     producer.send(record);
                     producer.flush();
-                    prodState.count().incrementAndGet();
-                }
+                } while (prodState.count().incrementAndGet() < 
targetRecordCount);
+                prodState.done().set(true);
             }
         });
 
@@ -2345,9 +2346,6 @@ public class ShareConsumerTest {
             TimeUnit.MILLISECONDS
         );
 
-        // Let the complex consumer read the messages.
-        service.schedule(() -> prodState.done().set(true), 5L, 
TimeUnit.SECONDS);
-
         // All messages which can be read are read, some would be redelivered 
(roughly 2 times the records produced).
         TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did 
not close!");
         int delta = complexCons1.recordsRead() - (int) 
(prodState.count().get() * 2 * 0.95);    // 2 times with margin of error (5%).

Reply via email to