This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e32fb9772b2 KAFKA-19825 [1/2]: Add coordinator effective batch linger
time metric (#20753)
e32fb9772b2 is described below
commit e32fb9772b24ebd0402722ff10e56a901c376477
Author: Sean Quah <[email protected]>
AuthorDate: Fri Nov 14 16:03:11 2025 +0000
KAFKA-19825 [1/2]: Add coordinator effective batch linger time metric
(#20753)
Add an effective batch linger time metric for the group coordinator and
share coordinator. The effective batch linger time may exceed the
configured group.coordinator.append.linger.ms or
share.coordinator.append.linger.ms due to queuing delays in the
coordinator runtime.
Reviewers: David Jacot <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 1 +
.../common/runtime/CoordinatorRuntimeMetrics.java | 7 ++
.../runtime/CoordinatorRuntimeMetricsImpl.java | 25 +++++
.../runtime/CoordinatorRuntimeMetricsImplTest.java | 10 ++
.../common/runtime/CoordinatorRuntimeTest.java | 113 +++++++++++++++++++++
docs/ops.html | 5 +
6 files changed, 161 insertions(+)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 911a167ebf5..7bd8fd99233 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -794,6 +794,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
}
long flushStartMs = time.milliseconds();
+ runtimeMetrics.recordLingerTime(flushStartMs -
currentBatch.appendTimeMs);
// Write the records to the log and update the last
written offset.
long offset = partitionWriter.append(
tp,
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
index 5693e3ea994..5b9b9254230 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java
@@ -60,6 +60,13 @@ public interface CoordinatorRuntimeMetrics extends
AutoCloseable {
*/
void recordEventPurgatoryTime(long durationMs);
+ /**
+ * Record the effective batch linger time.
+ *
+ * @param durationMs The linger time in milliseconds.
+ */
+ void recordLingerTime(long durationMs);
+
/**
* Record the flush time.
*
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
index af775c7c451..966d4c5c625 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java
@@ -58,6 +58,11 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
*/
public static final String EVENT_PURGATORY_TIME_METRIC_NAME =
"event-purgatory-time-ms";
+ /**
+ * The effective batch linger time metric name.
+ */
+ public static final String BATCH_LINGER_TIME_METRIC_NAME =
"batch-linger-time-ms";
+
/**
* The flush time metric name.
*/
@@ -116,6 +121,11 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
*/
private final Sensor eventPurgatoryTimeSensor;
+ /**
+ * Sensor to measure the effective batch linger time.
+ */
+ private final Sensor lingerTimeSensor;
+
/**
* Sensor to measure the flush time.
*/
@@ -199,6 +209,15 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup +
"-EventPurgatoryTime");
this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);
+ KafkaMetricHistogram lingerTimeHistogram =
KafkaMetricHistogram.newLatencyHistogram(
+ suffix -> kafkaMetricName(
+ BATCH_LINGER_TIME_METRIC_NAME + "-" + suffix,
+ "The " + suffix + " effective linger time in milliseconds"
+ )
+ );
+ this.lingerTimeSensor = metrics.sensor(this.metricsGroup +
"-LingerTime");
+ this.lingerTimeSensor.add(lingerTimeHistogram);
+
KafkaMetricHistogram flushTimeHistogram =
KafkaMetricHistogram.newLatencyHistogram(
suffix -> kafkaMetricName(
BATCH_FLUSH_TIME_METRIC_NAME + "-" + suffix,
@@ -234,6 +253,7 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
metrics.removeSensor(eventQueueTimeSensor.name());
metrics.removeSensor(eventProcessingTimeSensor.name());
metrics.removeSensor(eventPurgatoryTimeSensor.name());
+ metrics.removeSensor(lingerTimeSensor.name());
metrics.removeSensor(flushTimeSensor.name());
}
@@ -294,6 +314,11 @@ public class CoordinatorRuntimeMetricsImpl implements
CoordinatorRuntimeMetrics
eventPurgatoryTimeSensor.record(purgatoryTimeMs);
}
+ @Override
+ public void recordLingerTime(long durationMs) {
+ lingerTimeSensor.record(durationMs);
+ }
+
@Override
public void recordFlushTime(long durationMs) {
flushTimeSensor.record(durationMs);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
index 68f152f2bea..42c3505c52e 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.stream.IntStream;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
+import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
@@ -74,6 +75,11 @@ public class CoordinatorRuntimeMetricsImplTest {
kafkaMetricName(metrics, "event-purgatory-time-ms-p95"),
kafkaMetricName(metrics, "event-purgatory-time-ms-p99"),
kafkaMetricName(metrics, "event-purgatory-time-ms-p999"),
+ kafkaMetricName(metrics, "batch-linger-time-ms-max"),
+ kafkaMetricName(metrics, "batch-linger-time-ms-p50"),
+ kafkaMetricName(metrics, "batch-linger-time-ms-p95"),
+ kafkaMetricName(metrics, "batch-linger-time-ms-p99"),
+ kafkaMetricName(metrics, "batch-linger-time-ms-p999"),
kafkaMetricName(metrics, "batch-flush-time-ms-max"),
kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
@@ -236,6 +242,7 @@ public class CoordinatorRuntimeMetricsImplTest {
EVENT_QUEUE_TIME_METRIC_NAME,
EVENT_PROCESSING_TIME_METRIC_NAME,
EVENT_PURGATORY_TIME_METRIC_NAME,
+ BATCH_LINGER_TIME_METRIC_NAME,
BATCH_FLUSH_TIME_METRIC_NAME
})
public void testHistogramMetrics(String metricNamePrefix) {
@@ -255,6 +262,9 @@ public class CoordinatorRuntimeMetricsImplTest {
case EVENT_PURGATORY_TIME_METRIC_NAME:
runtimeMetrics.recordEventPurgatoryTime(i);
break;
+ case BATCH_LINGER_TIME_METRIC_NAME:
+ runtimeMetrics.recordLingerTime(i);
+ break;
case BATCH_FLUSH_TIME_METRIC_NAME:
runtimeMetrics.recordFlushTime(i);
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 494b8f34d51..80c3db57252 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -4233,6 +4233,119 @@ public class CoordinatorRuntimeTest {
assertNull(complete1.get(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testRecordAppendLingerTime() throws Exception {
+ MockTimer timer = new MockTimer();
+
+ // Writer sleeps for 10ms before appending records.
+ MockPartitionWriter writer = new MockPartitionWriter(timer.time(),
Integer.MAX_VALUE, false);
+ CoordinatorRuntimeMetrics runtimeMetrics =
mock(CoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+ new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(Duration.ofMillis(20))
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(new
MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .withCoordinatorMetrics(mock(CoordinatorMetrics.class))
+ .withSerializer(new StringSerializer())
+ .withAppendLingerMs(10)
+ .withExecutorService(mock(ExecutorService.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify the initial state.
+ CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext
ctx = runtime.contextOrThrow(TP);
+ assertNull(ctx.currentBatch);
+
+ // Get the max batch size.
+ int maxBatchSize = writer.config(TP).maxMessageSize();
+
+ // Create records with a quarter of the max batch size each. Keep in
mind that
+ // each batch has a header so it is not possible to have those four
records
+ // in one single batch.
+ List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+ char[] payload = new char[maxBatchSize / 4];
+ Arrays.fill(payload, c);
+ return new String(payload);
+ }).collect(Collectors.toList());
+
+ // Write #1 with two records.
+ long firstBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write1 =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records.subList(0, 2),
"response1")
+ );
+
+ // A batch has been created.
+ assertNotNull(ctx.currentBatch);
+
+ // Write #2 with one record.
+ CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records.subList(2, 3),
"response2")
+ );
+
+ // Verify the state. Records are replayed but no batch written.
+ assertEquals(List.of(), writer.entries(TP));
+ verify(runtimeMetrics, times(0)).recordFlushTime(10);
+
+ // Write #3 with one record. This one cannot go into the existing batch
+ // so the existing batch should be flushed and a new one should be
created.
+ long secondBatchTimestamp = timer.time().milliseconds();
+ CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
+ state -> new CoordinatorResult<>(records.subList(3, 4),
"response3")
+ );
+
+ // Verify the state. Records are replayed. The previous batch
+ // got flushed with all the records but the new one from #3.
+ // The new batch's timestamp comes from before the flush.
+ assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(firstBatchTimestamp, records.subList(0, 3))
+ ), writer.entries(TP));
+ verify(runtimeMetrics, times(1)).recordLingerTime(0);
+
+ // Advance past the linger time.
+ timer.advanceClock(11);
+
+ // Verify the state. The pending batch is flushed.
+ assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+ assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+ assertEquals(List.of(
+ new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+ new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+ new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+ new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+ ), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(
+ records(secondBatchTimestamp, records.subList(0, 3)),
+ records(secondBatchTimestamp, records.subList(3, 4))
+ ), writer.entries(TP));
+ verify(runtimeMetrics, times(1)).recordLingerTime(21);
+
+ // Commit and verify that writes are completed.
+ writer.commit(TP);
+ assertTrue(write1.isDone());
+ assertTrue(write2.isDone());
+ assertTrue(write3.isDone());
+ assertEquals(4L, ctx.coordinator.lastCommittedOffset());
+ assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+ assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+ assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+ }
+
@Test
public void testRecordFlushTime() throws Exception {
MockTimer timer = new MockTimer();
diff --git a/docs/ops.html b/docs/ops.html
index b1dd7d615a9..a9a5ac54370 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1920,6 +1920,11 @@ The following set of metrics are available for
monitoring the group coordinator:
<td>kafka.server:type=group-coordinator-metrics,name=event-purgatory-time-ms-[max|p50|p95|p99|p999]</td>
<td>The time that an event waited in the purgatory before being
completed</td>
</tr>
+ <tr>
+ <td>Batch Linger Time (Ms)</td>
+
<td>kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-[max|p50|p95|p99|p999]</td>
+ <td>The effective linger time of a batch before being flushed to the
local partition</td>
+ </tr>
<tr>
<td>Batch Flush Time (Ms)</td>
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p95|p99|p999]</td>