This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new aa96241acd9 MINOR: Fix time comparison with appendLingerMs in
CoordinatorRuntime#maybeFlushCurrentBatch (#20773)
aa96241acd9 is described below
commit aa96241acd9207cc47c6c89f75d3e0e67a2bd5c0
Author: majialong <[email protected]>
AuthorDate: Tue Oct 28 21:50:31 2025 +0800
MINOR: Fix time comparison with appendLingerMs in
CoordinatorRuntime#maybeFlushCurrentBatch (#20773)
This PR fixed the time comparison logic in
CoordinatorRuntime#maybeFlushCurrentBatch to ensure that the batch is
flushed when the elapsed time since appendTimeMs exceeds the
appendLingerMs parameter.
This issue is also mentioned
[here](https://github.com/apache/kafka/pull/20653/files#r2442452104).
The fix for this issue was originally in [this
PR](https://github.com/apache/kafka/pull/20739) in the trunk branch,
which was backported to the 4.0 branch.
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/suppressions.xml | 2 +
.../common/runtime/CoordinatorRuntime.java | 2 +-
.../common/runtime/CoordinatorRuntimeTest.java | 74 +++++++++++++++++++++-
3 files changed, 76 insertions(+), 2 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ea5fb9d3098..bc5921e803a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -337,6 +337,8 @@
<!-- coordinator-common -->
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>
+ <suppress checks="JavaNCSS"
+ files="CoordinatorRuntimeTest.java"/>
<!-- share coordinator -->
<suppress checks="NPathComplexity"
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index e61983658fa..51ce267da2e 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -835,7 +835,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
- if (currentBatch.builder.isTransactional() ||
(currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
+ if (currentBatch.builder.isTransactional() || (currentTimeMs -
currentBatch.appendTimeMs) >= appendLingerMs ||
!currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 26e7e7b5f28..710266f0f71 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -107,7 +107,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@SuppressWarnings({"checkstyle:JavaNCSS",
"checkstyle:ClassDataAbstractionCoupling"})
+@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new
TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@@ -5280,6 +5280,7 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isCompletedExceptionally());
verify(runtimeMetrics,
times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
}
+
@Test
public void testCoordinatorExecutor() {
Duration writeTimeout = Duration.ofMillis(1000);
@@ -5367,6 +5368,77 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isDone());
}
+ @Test
+ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws
Exception {
+ // Provides the runtime clock; we will advance it.
+ MockTimer clockTimer = new MockTimer();
+ // Used for scheduling timer tasks; we won't advance it to avoid a
timer-triggered batch flush.
+ MockTimer schedulerTimer = new MockTimer();
+
+ MockPartitionWriter writer = new MockPartitionWriter();
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(clockTimer.time())
+ .withTimer(schedulerTimer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertEquals(ACTIVE, ctx.state);
+ assertNull(ctx.currentBatch);
+
+ // Write #1.
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of("record1"), "response1")
+ );
+ assertFalse(write1.isDone());
+ assertNotNull(ctx.currentBatch);
+ assertEquals(0, writer.entries(TP).size());
+
+ // Verify that the linger timeout task is created; there will also be
a default write timeout task.
+ assertEquals(2, schedulerTimer.size());
+
+ // Advance past the linger time.
+ clockTimer.advanceClock(11);
+
+ // Verify that the linger task is not cancelled.
+ assertFalse(schedulerTimer.taskQueue().peek().cancelled());
+
+ // Write #2.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+ state -> new CoordinatorResult<>(List.of("record2"), "response2")
+ );
+
+ // The batch should have been flushed.
+ assertEquals(1, writer.entries(TP).size());
+
+ // Verify that the linger timeout task is cancelled.
+ assertTrue(schedulerTimer.taskQueue().peek().cancelled());
+
+ // Verify batch contains both two records
+ MemoryRecords batch = writer.entries(TP).get(0);
+ assertEquals(2, batch.firstBatch().countOrNull());
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ }
+
private static <S extends CoordinatorShard<U>, U>
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp