This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 82a1935b7e8 MINOR: Fix time comparison with appendLingerMs in
CoordinatorRuntime#maybeFlushCurrentBatch (#20739)
82a1935b7e8 is described below
commit 82a1935b7e813bfe6b181e7ea784e7ffdc5b9b98
Author: majialong <[email protected]>
AuthorDate: Sat Oct 25 02:08:09 2025 +0800
MINOR: Fix time comparison with appendLingerMs in
CoordinatorRuntime#maybeFlushCurrentBatch (#20739)
This PR fixed the time comparison logic in
`CoordinatorRuntime#maybeFlushCurrentBatch` to ensure that the batch is
flushed when the elapsed time since `appendTimeMs` exceeds the
`appendLingerMs` parameter.
This issue is also mentioned [here](
https://github.com/apache/kafka/pull/20653/files#r2442452104).
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 2 +-
.../common/runtime/CoordinatorRuntimeTest.java | 76 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 1 deletion(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 965f8074f80..552e7e72eff 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -833,7 +833,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
- if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
+ if (currentBatch.builder.isTransactional() || (currentTimeMs -
currentBatch.appendTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index dfbbdf048bc..de1aa21f3f1 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4779,6 +4779,7 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isCompletedExceptionally());
verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
}
+
@Test
public void testCoordinatorExecutor() {
Duration writeTimeout = Duration.ofMillis(1000);
@@ -4866,6 +4867,81 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isDone());
}
+ @Test
+ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws
Exception {
+ // Provides the runtime clock; we will advance it.
+ MockTimer clockTimer = new MockTimer();
+ // Used for scheduling timer tasks; we won't advance it to avoid a
timer-triggered batch flush.
+ MockTimer schedulerTimer = new MockTimer();
+
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(clockTimer.time())
+ .withTimer(schedulerTimer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(ACTIVE, ctx.state);
+ assertNull(ctx.currentBatch);
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of("record1"), "response1")
+ );
+ assertFalse(write1.isDone());
+ assertNotNull(ctx.currentBatch);
+ assertEquals(0, writer.entries(TP).size());
+
+ // Verify that the linger timeout task is created; there will also be
a default write timeout task.
+ assertEquals(2, schedulerTimer.size());
+
+ // Advance past the linger time.
+ clockTimer.advanceClock(11);
+
+ // At this point, there are still two scheduled tasks; the linger task
has not fired
+ // because we did not advance the schedulerTimer.
+ assertEquals(2, schedulerTimer.size());
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of("record2"), "response2")
+ );
+
+ // The batch should have been flushed.
+ assertEquals(1, writer.entries(TP).size());
+
+ // Because flushing the batch cancels the linger task, there should
now be two write timeout tasks.
+ assertEquals(2, schedulerTimer.size());
+
+ // Verify batch contains both two records
+ MemoryRecords batch = writer.entries(TP).get(0);
+ RecordBatch recordBatch = batch.firstBatch();
+ assertEquals(2, recordBatch.countOrNull());
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ // Now that all scheduled tasks have been cancelled, the scheduler
queue should be empty.
+ assertEquals(0, schedulerTimer.size());
+ }
+
private static <S extends CoordinatorShard<U>, U>
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp