This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e32fb9772b2 KAFKA-19825 [1/2]: Add coordinator effective batch linger 
time metric (#20753)
e32fb9772b2 is described below

commit e32fb9772b24ebd0402722ff10e56a901c376477
Author: Sean Quah <[email protected]>
AuthorDate: Fri Nov 14 16:03:11 2025 +0000

    KAFKA-19825 [1/2]: Add coordinator effective batch linger time metric 
(#20753)
    
    Add an effective batch linger time metric for the group coordinator and
    share coordinator. The effective batch linger time may exceed the
    configured group.coordinator.append.linger.ms or
    share.coordinator.append.linger.ms due to queuing delays in the
    coordinator runtime.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |   1 +
 .../common/runtime/CoordinatorRuntimeMetrics.java  |   7 ++
 .../runtime/CoordinatorRuntimeMetricsImpl.java     |  25 +++++
 .../runtime/CoordinatorRuntimeMetricsImplTest.java |  10 ++
 .../common/runtime/CoordinatorRuntimeTest.java     | 113 +++++++++++++++++++++
 docs/ops.html                                      |   5 +
 6 files changed, 161 insertions(+)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 911a167ebf5..7bd8fd99233 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -794,6 +794,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                     }
 
                     long flushStartMs = time.milliseconds();
+                    runtimeMetrics.recordLingerTime(flushStartMs - 
currentBatch.appendTimeMs);
                     // Write the records to the log and update the last 
written offset.
                     long offset = partitionWriter.append(
                         tp,
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
index 5693e3ea994..5b9b9254230 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
@@ -60,6 +60,13 @@ public interface CoordinatorRuntimeMetrics extends 
AutoCloseable {
      */
     void recordEventPurgatoryTime(long durationMs);
 
+    /**
+     * Record the effective batch linger time.
+     *
+     * @param durationMs The linger time in milliseconds.
+     */
+    void recordLingerTime(long durationMs);
+
     /**
      * Record the flush time.
      *
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index af775c7c451..966d4c5c625 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -58,6 +58,11 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
      */
     public static final String EVENT_PURGATORY_TIME_METRIC_NAME = 
"event-purgatory-time-ms";
 
+    /**
+     * The effective batch linger time metric name.
+     */
+    public static final String BATCH_LINGER_TIME_METRIC_NAME = 
"batch-linger-time-ms";
+
     /**
      * The flush time metric name.
      */
@@ -116,6 +121,11 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
      */
     private final Sensor eventPurgatoryTimeSensor;
 
+    /**
+     * Sensor to measure the effective batch linger time.
+     */
+    private final Sensor lingerTimeSensor;
+
     /**
      * Sensor to measure the flush time.
      */
@@ -199,6 +209,15 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
         this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + 
"-EventPurgatoryTime");
         this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);
 
+        KafkaMetricHistogram lingerTimeHistogram = 
KafkaMetricHistogram.newLatencyHistogram(
+            suffix -> kafkaMetricName(
+                BATCH_LINGER_TIME_METRIC_NAME + "-" + suffix,
+                "The " + suffix + " effective linger time in milliseconds"
+            )
+        );
+        this.lingerTimeSensor = metrics.sensor(this.metricsGroup + 
"-LingerTime");
+        this.lingerTimeSensor.add(lingerTimeHistogram);
+
         KafkaMetricHistogram flushTimeHistogram = 
KafkaMetricHistogram.newLatencyHistogram(
             suffix -> kafkaMetricName(
                 BATCH_FLUSH_TIME_METRIC_NAME + "-" + suffix,
@@ -234,6 +253,7 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
         metrics.removeSensor(eventQueueTimeSensor.name());
         metrics.removeSensor(eventProcessingTimeSensor.name());
         metrics.removeSensor(eventPurgatoryTimeSensor.name());
+        metrics.removeSensor(lingerTimeSensor.name());
         metrics.removeSensor(flushTimeSensor.name());
     }
 
@@ -294,6 +314,11 @@ public class CoordinatorRuntimeMetricsImpl implements 
CoordinatorRuntimeMetrics
         eventPurgatoryTimeSensor.record(purgatoryTimeMs);
     }
 
+    @Override
+    public void recordLingerTime(long durationMs) {
+        lingerTimeSensor.record(durationMs);
+    }
+
     @Override
     public void recordFlushTime(long durationMs) {
         flushTimeSensor.record(durationMs);
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index 68f152f2bea..42c3505c52e 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
+import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
@@ -74,6 +75,11 @@ public class CoordinatorRuntimeMetricsImplTest {
             kafkaMetricName(metrics, "event-purgatory-time-ms-p95"),
             kafkaMetricName(metrics, "event-purgatory-time-ms-p99"),
             kafkaMetricName(metrics, "event-purgatory-time-ms-p999"),
+            kafkaMetricName(metrics, "batch-linger-time-ms-max"),
+            kafkaMetricName(metrics, "batch-linger-time-ms-p50"),
+            kafkaMetricName(metrics, "batch-linger-time-ms-p95"),
+            kafkaMetricName(metrics, "batch-linger-time-ms-p99"),
+            kafkaMetricName(metrics, "batch-linger-time-ms-p999"),
             kafkaMetricName(metrics, "batch-flush-time-ms-max"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
             kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
@@ -236,6 +242,7 @@ public class CoordinatorRuntimeMetricsImplTest {
         EVENT_QUEUE_TIME_METRIC_NAME,
         EVENT_PROCESSING_TIME_METRIC_NAME,
         EVENT_PURGATORY_TIME_METRIC_NAME,
+        BATCH_LINGER_TIME_METRIC_NAME,
         BATCH_FLUSH_TIME_METRIC_NAME
     })
     public void testHistogramMetrics(String metricNamePrefix) {
@@ -255,6 +262,9 @@ public class CoordinatorRuntimeMetricsImplTest {
                 case EVENT_PURGATORY_TIME_METRIC_NAME:
                     runtimeMetrics.recordEventPurgatoryTime(i);
                     break;
+                case BATCH_LINGER_TIME_METRIC_NAME:
+                    runtimeMetrics.recordLingerTime(i);
+                    break;
                 case BATCH_FLUSH_TIME_METRIC_NAME:
                     runtimeMetrics.recordFlushTime(i);
             }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 494b8f34d51..80c3db57252 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4233,6 +4233,119 @@ public class CoordinatorRuntimeTest {
         assertNull(complete1.get(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testRecordAppendLingerTime() throws Exception {
+        MockTimer timer = new MockTimer();
+
+        // Writer sleeps for 10ms before appending records.
+        MockPartitionWriter writer = new MockPartitionWriter(timer.time(), 
Integer.MAX_VALUE, false);
+        CoordinatorRuntimeMetrics runtimeMetrics = 
mock(CoordinatorRuntimeMetrics.class);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                .withCoordinatorRuntimeMetrics(runtimeMetrics)
+                .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .withExecutorService(mock(ExecutorService.class))
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each. Keep in 
mind that
+        // each batch has a header so it is not possible to have those four 
records
+        // in one single batch.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        long firstBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(List.of(), writer.entries(TP));
+        verify(runtimeMetrics, times(0)).recordFlushTime(10);
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        long secondBatchTimestamp = timer.time().milliseconds();
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        // The new batch's timestamp comes from before the flush.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            records(firstBatchTimestamp, records.subList(0, 3))
+        ), writer.entries(TP));
+        verify(runtimeMetrics, times(1)).recordLingerTime(0);
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(List.of(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(
+            records(secondBatchTimestamp, records.subList(0, 3)),
+            records(secondBatchTimestamp, records.subList(3, 4))
+        ), writer.entries(TP));
+        verify(runtimeMetrics, times(1)).recordLingerTime(21);
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals(4L, ctx.coordinator.lastCommittedOffset());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
     @Test
     public void testRecordFlushTime() throws Exception {
         MockTimer timer = new MockTimer();
diff --git a/docs/ops.html b/docs/ops.html
index b1dd7d615a9..a9a5ac54370 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1920,6 +1920,11 @@ The following set of metrics are available for 
monitoring the group coordinator:
       
<td>kafka.server:type=group-coordinator-metrics,name=event-purgatory-time-ms-[max|p50|p95|p99|p999]</td>
       <td>The time that an event waited in the purgatory before being 
completed</td>
     </tr>
+    <tr>
+      <td>Batch Linger Time (Ms)</td>
+      
<td>kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-[max|p50|p95|p99|p999]</td>
+      <td>The effective linger time of a batch before being flushed to the 
local partition</td>
+    </tr>
     <tr>
       <td>Batch Flush Time (Ms)</td>
       
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p95|p99|p999]</td>

Reply via email to