This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 388869fbfa1 Enhance message gap metric to include min/max/avg
aggregations (#17847)
388869fbfa1 is described below
commit 388869fbfa1d3dacdd44602f2a7ded3a60bee5ba
Author: jtuglu-netflix <[email protected]>
AuthorDate: Sun May 18 20:19:46 2025 -0700
Enhance message gap metric to include min/max/avg aggregations (#17847)
Changes:
- Add metrics ingest/events/minMessageGap, ingest/events/maxMessageGap,
ingest/events/avgMessageGap
- The current definition of ingest/events/messageGap remains as-is.
---
.../metrics/SegmentGenerationMetricsBenchmark.java | 97 ++++++++++++++++++++++
docs/operations/metrics.md | 3 +
.../common/stats/TaskRealtimeMetricsMonitor.java | 7 ++
.../common/TaskRealtimeMetricsMonitorTest.java | 23 ++++-
.../segment/realtime/SegmentGenerationMetrics.java | 73 +++++++++++++++-
.../realtime/appenderator/StreamAppenderator.java | 7 +-
.../realtime/SegmentGenerationMetricsTest.java | 72 +++++++++++++---
7 files changed, 261 insertions(+), 21 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
new file mode 100644
index 00000000000..787d519253e
--- /dev/null
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.indexing.metrics;
+
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark for SegmentGenerationMetrics
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1, time = 1)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode({Mode.AverageTime})
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class SegmentGenerationMetricsBenchmark
+{
+ private static final int NUM_ITERATIONS = 10_000;
+ private long[] samples;
+ private SegmentGenerationMetrics metrics;
+
+ @Setup(Level.Iteration)
+ public void setup()
+ {
+ metrics = new SegmentGenerationMetrics();
+ samples = new long[NUM_ITERATIONS];
+
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ samples[i] = random.nextLong(1, Long.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Benchmark for reportMessageGap in hot loop.
+ */
+ @Benchmark
+ public void benchmarkMultipleReportMessageGap()
+ {
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ metrics.reportMessageGap(samples[i]);
+ }
+ }
+
+ /**
+ * Benchmark for reportMaxSegmentHandoffTime in hot loop.
+ */
+ @Benchmark
+ public void benchmarkMultipleReportMaxSegmentHandoffTime()
+ {
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ metrics.reportMaxSegmentHandoffTime(samples[i]);
+ }
+ }
+
+ /**
+ * Benchmark for reportMessageMaxTimestamp in hot loop.
+ */
+ @Benchmark
+ public void benchmarkMultipleReportMessageMaxTimestamp()
+ {
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ metrics.reportMessageMaxTimestamp(samples[i]);
+ }
+ }
+}
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index fba26456e01..f8a5cf4d4ed 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -279,6 +279,9 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`,
`taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once
every segment granular period if cluster operating normally.|
|`ingest/sink/count`|Number of sinks not handed off.|`dataSource`, `taskId`,
`taskType`, `groupId`, `tags`|1~3|
|`ingest/events/messageGap`|Time gap in milliseconds between the latest
ingested event timestamp and the current system timestamp of metrics emission.
If the value is increasing but lag is low, Druid may not be receiving new data.
This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|Greater than 0, depends on the time carried in event.|
+|`ingest/events/maxMessageGap`|Maximum seen time gap in milliseconds between
each ingested event timestamp and the current system timestamp of metrics
emission. This metric is reset every emission period.|`dataSource`, `taskId`,
`taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in
event.|
+|`ingest/events/minMessageGap`|Minimum seen time gap in milliseconds between
each ingested event timestamp and the current system timestamp of metrics
emission. This metric is reset every emission period.|`dataSource`, `taskId`,
`taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in
event.|
+|`ingest/events/avgMessageGap`|Average time gap in milliseconds between each
ingested event timestamp and the current system timestamp of metrics emission.
This metric is reset every emission period.|`dataSource`, `taskId`, `taskType`,
`groupId`, `tags`|Greater than 0, depends on the time carried in event.|
|`ingest/notices/queueSize`|Number of pending notices to be processed by the
coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single
digits. Should not be a very high number. |
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 2fb11cd7a12..602b0101308 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -132,6 +132,13 @@ public class TaskRealtimeMetricsMonitor extends
AbstractMonitor
emitter.emit(builder.setMetric("ingest/events/messageGap", messageGap));
}
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
metrics.getMessageGapStats();
+ if (messageGapStats.count() > 0) {
+ emitter.emit(builder.setMetric("ingest/events/minMessageGap",
messageGapStats.min()));
+ emitter.emit(builder.setMetric("ingest/events/maxMessageGap",
messageGapStats.max()));
+ emitter.emit(builder.setMetric("ingest/events/avgMessageGap",
messageGapStats.avg()));
+ }
+
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
if (maxSegmentHandoffTime >= 0) {
emitter.emit(builder.setMetric("ingest/handoff/time",
maxSegmentHandoffTime));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
index f6bea0917f0..d74384e4b29 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -52,9 +52,8 @@ public class TaskRealtimeMetricsMonitorTest
);
private static final Map<String, Object> TAGS = ImmutableMap.of("author",
"Author Name", "version", 10);
-
- @Mock(answer = Answers.RETURNS_MOCKS)
private SegmentGenerationMetrics segmentGenerationMetrics;
+
@Mock(answer = Answers.RETURNS_MOCKS)
private RowIngestionMeters rowIngestionMeters;
@Mock
@@ -66,6 +65,7 @@ public class TaskRealtimeMetricsMonitorTest
public void setUp()
{
emittedEvents = new HashMap<>();
+ segmentGenerationMetrics = new SegmentGenerationMetrics();
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
Mockito
.doAnswer(invocation -> {
@@ -94,4 +94,23 @@ public class TaskRealtimeMetricsMonitorTest
Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
}
}
+
+ @Test
+ public void testMessageGapAggStats()
+ {
+ target = new TaskRealtimeMetricsMonitor(segmentGenerationMetrics,
rowIngestionMeters, DIMENSIONS, null);
+
+ target.doMonitor(emitter);
+
Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap"));
+
Assert.assertFalse(emittedEvents.containsKey("ingest/events/maxMessageGap"));
+
Assert.assertFalse(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+
+ emittedEvents.clear();
+ segmentGenerationMetrics.reportMessageGap(1);
+ target.doMonitor(emitter);
+
+
Assert.assertTrue(emittedEvents.containsKey("ingest/events/minMessageGap"));
+
Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap"));
+
Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 564a3322bf9..66d9d497c96 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Metrics for segment generation.
@@ -55,6 +56,51 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ /**
+ * {@code MessageGapStats} tracks message gap statistics but is not
thread-safe.
+ */
+ public static class MessageGapStats
+ {
+ private long min = Long.MAX_VALUE;
+ private long max = Long.MIN_VALUE;
+ private long count = 0;
+ private double total = 0;
+
+ public double avg()
+ {
+ return total / count;
+ }
+
+ public long min()
+ {
+ return min;
+ }
+
+ public long max()
+ {
+ return max;
+ }
+
+ public long count()
+ {
+ return count;
+ }
+
+ public void add(final long messageGap)
+ {
+ total += messageGap;
+ ++count;
+ if (min > messageGap) {
+ min = messageGap;
+ }
+ if (max < messageGap) {
+ max = messageGap;
+ }
+ }
+ }
+
+ private final AtomicReference<MessageGapStats> messageGapStats = new
AtomicReference<>(new MessageGapStats());
+
public void incrementRowOutputCount(long numRows)
{
rowOutputCount.addAndGet(numRows);
@@ -105,14 +151,23 @@ public class SegmentGenerationMetrics
this.sinkCount.set(sinkCount);
}
+ public void reportMessageGap(final long messageGap)
+ {
+ messageGapStats.get().add(messageGap);
+ }
+
public void reportMessageMaxTimestamp(long messageMaxTimestamp)
{
- this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp,
this.messageMaxTimestamp.get()));
+ if (this.messageMaxTimestamp.get() < messageMaxTimestamp) {
+ this.messageMaxTimestamp.getAndAccumulate(messageMaxTimestamp,
Math::max);
+ }
}
public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
{
- this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime,
this.maxSegmentHandoffTime.get()));
+ if (this.maxSegmentHandoffTime.get() < maxSegmentHandoffTime) {
+ this.maxSegmentHandoffTime.getAndAccumulate(maxSegmentHandoffTime,
Math::max);
+ }
}
public void markProcessingDone()
@@ -170,6 +225,7 @@ public class SegmentGenerationMetrics
{
return pushedRows.get();
}
+
public long mergeTimeMillis()
{
return mergeTimeMillis.get();
@@ -195,6 +251,14 @@ public class SegmentGenerationMetrics
return sinkCount.get();
}
+ /**
+ * See {@code MessageGapStats} for current gaurantees on thread-safety.
+ */
+ public MessageGapStats getMessageGapStats()
+ {
+ return messageGapStats.get();
+ }
+
public long messageGap()
{
return messageGap.get();
@@ -220,19 +284,20 @@ public class SegmentGenerationMetrics
retVal.persistCpuTime.set(persistCpuTime.get());
retVal.handOffCount.set(handOffCount.get());
retVal.sinkCount.set(sinkCount.get());
- retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
retVal.mergedRows.set(mergedRows.get());
retVal.pushedRows.set(pushedRows.get());
long messageGapSnapshot = 0;
- final long maxTimestamp = retVal.messageMaxTimestamp.get();
+ final long maxTimestamp = messageMaxTimestamp.get();
+ retVal.messageMaxTimestamp.set(maxTimestamp);
if (processingDone.get()) {
messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
} else if (maxTimestamp > 0) {
messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
}
retVal.messageGap.set(messageGapSnapshot);
+ retVal.messageGapStats.set(messageGapStats.getAndSet(new
MessageGapStats()));
reset();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 2b37481a8ed..2a1b0c67bc6 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -337,6 +337,9 @@ public class StreamAppenderator implements Appenderator
throw e;
}
+ final long currTs = System.currentTimeMillis();
+ metrics.reportMessageGap(currTs - row.getTimestampFromEpoch());
+
if (sinkRowsInMemoryAfterAdd < 0) {
throw new SegmentNotWritableException("Attempt to add row to swapped-out
sink for segment[%s].", identifier);
}
@@ -360,11 +363,11 @@ public class StreamAppenderator implements Appenderator
persist = true;
persistReasons.add("No more rows can be appended to sink");
}
- if (System.currentTimeMillis() > nextFlush) {
+ if (currTs > nextFlush) {
persist = true;
persistReasons.add(StringUtils.format(
"current time[%d] is greater than nextFlush[%d]",
- System.currentTimeMillis(),
+ currTs,
nextFlush
));
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
index 1928c08b2ac..16dacf32110 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
@@ -20,24 +20,17 @@
package org.apache.druid.segment.realtime;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class SegmentGenerationMetricsTest
{
- private SegmentGenerationMetrics metrics;
-
- @Before
- public void setup()
- {
- metrics = new SegmentGenerationMetrics();
- }
-
@Test
public void testSnapshotBeforeProcessing()
{
+ final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+ assertMessageGapAggregateMetricsReset(metrics);
SegmentGenerationMetrics snapshot = metrics.snapshot();
- Assert.assertEquals(0L, snapshot.messageGap());
+ assertMessageGapAggregateMetricsReset(metrics);
// invalid value
Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
}
@@ -45,25 +38,78 @@ public class SegmentGenerationMetricsTest
@Test
public void testSnapshotAfterProcessingOver()
{
- metrics.reportMessageMaxTimestamp(System.currentTimeMillis() - 20L);
+ final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
+ metrics.reportMessageGap(20L);
+ metrics.reportMessageMaxTimestamp(20L);
metrics.reportMaxSegmentHandoffTime(7L);
+
SegmentGenerationMetrics snapshot = metrics.snapshot();
+
Assert.assertTrue(snapshot.messageGap() >= 20L);
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
snapshot.getMessageGapStats();
+ Assert.assertEquals(20L, messageGapStats.min());
+ Assert.assertEquals(20L, messageGapStats.max());
+ Assert.assertEquals(messageGapStats.min(), messageGapStats.max());
+ Assert.assertEquals(20L, messageGapStats.avg(), 0);
Assert.assertEquals(7, snapshot.maxSegmentHandoffTime());
}
@Test
public void testProcessingOverAfterSnapshot()
{
+ final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
metrics.reportMessageMaxTimestamp(10);
+ metrics.reportMessageGap(1);
metrics.reportMaxSegmentHandoffTime(7L);
// Should reset to invalid value
metrics.snapshot();
metrics.markProcessingDone();
SegmentGenerationMetrics snapshot = metrics.snapshot();
- // Message gap must be invalid after processing is done
- Assert.assertTrue(0 > snapshot.messageGap());
+
+ // Latest message gap must be invalid after processing is done
+ Assert.assertEquals(-1, snapshot.messageGap());
+
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
snapshot.getMessageGapStats();
+ Assert.assertEquals(0, messageGapStats.count());
+ Assert.assertEquals(Long.MIN_VALUE, messageGapStats.max());
+ Assert.assertEquals(Long.MAX_VALUE, messageGapStats.min());
+ Assert.assertEquals(Double.NaN, messageGapStats.avg(), 0);
// value must be invalid
Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
}
+
+ @Test
+ public void testMessageGapAggregateMetrics()
+ {
+ final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
+ for (int i = 0; i < 5; ++i) {
+ metrics.reportMessageGap(i * 30);
+ }
+ metrics.reportMessageMaxTimestamp(10L);
+ metrics.reportMaxSegmentHandoffTime(7L);
+ metrics.markProcessingDone();
+ SegmentGenerationMetrics snapshot = metrics.snapshot();
+ // Latest message gap must be invalid after processing is done
+ Assert.assertEquals(-1, snapshot.messageGap());
+
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
snapshot.getMessageGapStats();
+ Assert.assertEquals(5, messageGapStats.count());
+ Assert.assertEquals(0, messageGapStats.min());
+ Assert.assertEquals(120, messageGapStats.max());
+ Assert.assertEquals(60.0, messageGapStats.avg(), 0);
+
+ Assert.assertEquals(7L, snapshot.maxSegmentHandoffTime());
+ }
+
+ private static void assertMessageGapAggregateMetricsReset(final
SegmentGenerationMetrics metrics)
+ {
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
metrics.getMessageGapStats();
+ Assert.assertEquals(0, messageGapStats.count());
+ Assert.assertEquals(Long.MIN_VALUE, messageGapStats.max());
+ Assert.assertEquals(Long.MAX_VALUE, messageGapStats.min());
+ Assert.assertEquals(Double.NaN, messageGapStats.avg(), 0);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]