This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 6ffc58bf8f7 MINOR: Fix time comparison with appendLingerMs in 
CoordinatorRuntime#maybeFlushCurrentBatch (#20739)
6ffc58bf8f7 is described below

commit 6ffc58bf8f74ef5a1d8905fe012cb6171248e762
Author: majialong <[email protected]>
AuthorDate: Sat Oct 25 02:08:09 2025 +0800

    MINOR: Fix time comparison with appendLingerMs in 
CoordinatorRuntime#maybeFlushCurrentBatch (#20739)
    
    This PR fixed the time comparison logic in
    `CoordinatorRuntime#maybeFlushCurrentBatch` to ensure that the batch is
    flushed when the elapsed time since `appendTimeMs` exceeds the
    `appendLingerMs` parameter.
    
    This issue is also mentioned [here](
    https://github.com/apache/kafka/pull/20653/files#r2442452104).
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  2 +-
 .../common/runtime/CoordinatorRuntimeTest.java     | 76 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index cb0c7bbe05f..901acd3d7fd 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -835,7 +835,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         private void maybeFlushCurrentBatch(long currentTimeMs) {
             if (currentBatch != null) {
-                if (currentBatch.builder.isTransactional() || 
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
+                if (currentBatch.builder.isTransactional() || (currentTimeMs - 
currentBatch.appendTimeMs) >= appendLingerMs || 
!currentBatch.builder.hasRoomFor(0)) {
                     flushCurrentBatch();
                 }
             }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 96cccd795a8..57ea3fec953 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4780,6 +4780,7 @@ public class CoordinatorRuntimeTest {
         assertTrue(write1.isCompletedExceptionally());
         verify(runtimeMetrics, 
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
     }
+
     @Test
     public void testCoordinatorExecutor() {
         Duration writeTimeout = Duration.ofMillis(1000);
@@ -4867,6 +4868,81 @@ public class CoordinatorRuntimeTest {
         assertTrue(write1.isDone());
     }
 
+    @Test
+    public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws 
Exception {
+        // Provides the runtime clock; we will advance it.
+        MockTimer clockTimer = new MockTimer();
+        // Used for scheduling timer tasks; we won't advance it to avoid a 
timer-triggered batch flush.
+        MockTimer schedulerTimer = new MockTimer();
+
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(clockTimer.time())
+                .withTimer(schedulerTimer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(ACTIVE, ctx.state);
+        assertNull(ctx.currentBatch);
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
+        );
+        assertFalse(write1.isDone());
+        assertNotNull(ctx.currentBatch);
+        assertEquals(0, writer.entries(TP).size());
+
+        // Verify that the linger timeout task is created; there will also be 
a default write timeout task.
+        assertEquals(2, schedulerTimer.size());
+
+        // Advance past the linger time.
+        clockTimer.advanceClock(11);
+
+        // At this point, there are still two scheduled tasks; the linger task 
has not fired
+        // because we did not advance the schedulerTimer.
+        assertEquals(2, schedulerTimer.size());
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(List.of("record2"), "response2")
+        );
+
+        // The batch should have been flushed.
+        assertEquals(1, writer.entries(TP).size());
+
+        // Because flushing the batch cancels the linger task, there should 
now be two write timeout tasks.
+        assertEquals(2, schedulerTimer.size());
+
+        // Verify batch contains both two records
+        MemoryRecords batch = writer.entries(TP).get(0);
+        RecordBatch recordBatch = batch.firstBatch();
+        assertEquals(2, recordBatch.countOrNull());
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        // Now that all scheduled tasks have been cancelled, the scheduler 
queue should be empty.
+        assertEquals(0, schedulerTimer.size());
+    }
+
     private static <S extends CoordinatorShard<U>, U> 
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
         CoordinatorRuntime<S, U> runtime,
         TopicPartition tp

Reply via email to