This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new aa96241acd9 MINOR: Fix time comparison with appendLingerMs in 
CoordinatorRuntime#maybeFlushCurrentBatch (#20773)
aa96241acd9 is described below

commit aa96241acd9207cc47c6c89f75d3e0e67a2bd5c0
Author: majialong <[email protected]>
AuthorDate: Tue Oct 28 21:50:31 2025 +0800

    MINOR: Fix time comparison with appendLingerMs in 
CoordinatorRuntime#maybeFlushCurrentBatch (#20773)
    
    This PR fixed the time comparison logic in
    CoordinatorRuntime#maybeFlushCurrentBatch to ensure that the batch is
    flushed when the elapsed time since appendTimeMs exceeds the
    appendLingerMs parameter.
    
    This issue is also mentioned
    [here](https://github.com/apache/kafka/pull/20653/files#r2442452104).
    
    The fix for this issue was originally in [this
    PR](https://github.com/apache/kafka/pull/20739) in the trunk branch,
    which was backported to the 4.0 branch.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +
 .../common/runtime/CoordinatorRuntime.java         |  2 +-
 .../common/runtime/CoordinatorRuntimeTest.java     | 74 +++++++++++++++++++++-
 3 files changed, 76 insertions(+), 2 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ea5fb9d3098..bc5921e803a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -337,6 +337,8 @@
     <!-- coordinator-common -->
     <suppress checks="NPathComplexity"
               files="CoordinatorRuntime.java"/>
+    <suppress checks="JavaNCSS"
+              files="CoordinatorRuntimeTest.java"/>
 
     <!-- share coordinator -->
     <suppress checks="NPathComplexity"
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index e61983658fa..51ce267da2e 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -835,7 +835,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         private void maybeFlushCurrentBatch(long currentTimeMs) {
             if (currentBatch != null) {
-                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
+                if (currentBatch.builder.isTransactional() || (currentTimeMs - 
currentBatch.appendTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
                     flushCurrentBatch();
                 }
             }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 26e7e7b5f28..710266f0f71 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -107,7 +107,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"checkstyle:JavaNCSS", 
"checkstyle:ClassDataAbstractionCoupling"})
+@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
 public class CoordinatorRuntimeTest {
     private static final TopicPartition TP = new 
TopicPartition("__consumer_offsets", 0);
     private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@@ -5280,6 +5280,7 @@ public class CoordinatorRuntimeTest {
         assertTrue(write1.isCompletedExceptionally());
         verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
     }
+
     @Test
     public void testCoordinatorExecutor() {
         Duration writeTimeout = Duration.ofMillis(1000);
@@ -5367,6 +5368,77 @@ public class CoordinatorRuntimeTest {
         assertTrue(write1.isDone());
     }
 
+    @Test
+    public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws 
Exception {
+        // Provides the runtime clock; we will advance it.
+        MockTimer clockTimer = new MockTimer();
+        // Used for scheduling timer tasks; we won't advance it to avoid a 
timer-triggered batch flush.
+        MockTimer schedulerTimer = new MockTimer();
+
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(clockTimer.time())
+                .withTimer(schedulerTimer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(ACTIVE, ctx.state);
+        assertNull(ctx.currentBatch);
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
+        );
+        assertFalse(write1.isDone());
+        assertNotNull(ctx.currentBatch);
+        assertEquals(0, writer.entries(TP).size());
+
+        // Verify that the linger timeout task is created; there will also be 
a default write timeout task.
+        assertEquals(2, schedulerTimer.size());
+
+        // Advance past the linger time.
+        clockTimer.advanceClock(11);
+
+        // Verify that the linger task is not cancelled.
+        assertFalse(schedulerTimer.taskQueue().peek().cancelled());
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record2"), "response2")
+        );
+
+        // The batch should have been flushed.
+        assertEquals(1, writer.entries(TP).size());
+
+        // Verify that the linger timeout task is cancelled.
+        assertTrue(schedulerTimer.taskQueue().peek().cancelled());
+
+        // Verify batch contains both two records
+        MemoryRecords batch = writer.entries(TP).get(0);
+        assertEquals(2, batch.firstBatch().countOrNull());
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+    }
+
     private static <S extends CoordinatorShard<U>, U> 
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
         CoordinatorRuntime<S, U> runtime,
         TopicPartition tp

Reply via email to