This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 388869fbfa1 Enhance message gap metric to include min/max/avg 
aggregations (#17847)
388869fbfa1 is described below

commit 388869fbfa1d3dacdd44602f2a7ded3a60bee5ba
Author: jtuglu-netflix <[email protected]>
AuthorDate: Sun May 18 20:19:46 2025 -0700

    Enhance message gap metric to include min/max/avg aggregations (#17847)
    
    Changes:
    - Add metrics ingest/events/minMessageGap, ingest/events/maxMessageGap, 
ingest/events/avgMessageGap
    - The current definition of ingest/events/messageGap remains as-is.
---
 .../metrics/SegmentGenerationMetricsBenchmark.java | 97 ++++++++++++++++++++++
 docs/operations/metrics.md                         |  3 +
 .../common/stats/TaskRealtimeMetricsMonitor.java   |  7 ++
 .../common/TaskRealtimeMetricsMonitorTest.java     | 23 ++++-
 .../segment/realtime/SegmentGenerationMetrics.java | 73 +++++++++++++++-
 .../realtime/appenderator/StreamAppenderator.java  |  7 +-
 .../realtime/SegmentGenerationMetricsTest.java     | 72 +++++++++++++---
 7 files changed, 261 insertions(+), 21 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
new file mode 100644
index 00000000000..787d519253e
--- /dev/null
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.indexing.metrics;
+
+import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark for SegmentGenerationMetrics
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1, time = 1)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode({Mode.AverageTime})
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class SegmentGenerationMetricsBenchmark
+{
+  private static final int NUM_ITERATIONS = 10_000;
+  private long[] samples;
+  private SegmentGenerationMetrics metrics;
+
+  @Setup(Level.Iteration)
+  public void setup()
+  {
+    metrics = new SegmentGenerationMetrics();
+    samples = new long[NUM_ITERATIONS];
+
+    final ThreadLocalRandom random = ThreadLocalRandom.current();
+    for (int i = 0; i < NUM_ITERATIONS; ++i) {
+      samples[i] = random.nextLong(1, Long.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Benchmark for reportMessageGap in hot loop.
+   */
+  @Benchmark
+  public void benchmarkMultipleReportMessageGap()
+  {
+    for (int i = 0; i < NUM_ITERATIONS; ++i) {
+      metrics.reportMessageGap(samples[i]);
+    }
+  }
+
+  /**
+   * Benchmark for reportMaxSegmentHandoffTime in hot loop.
+   */
+  @Benchmark
+  public void benchmarkMultipleReportMaxSegmentHandoffTime()
+  {
+    for (int i = 0; i < NUM_ITERATIONS; ++i) {
+      metrics.reportMaxSegmentHandoffTime(samples[i]);
+    }
+  }
+
+  /**
+   * Benchmark for reportMessageMaxTimestamp in hot loop.
+   */
+  @Benchmark
+  public void benchmarkMultipleReportMessageMaxTimestamp()
+  {
+    for (int i = 0; i < NUM_ITERATIONS; ++i) {
+      metrics.reportMessageMaxTimestamp(samples[i]);
+    }
+  }
+}
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index fba26456e01..f8a5cf4d4ed 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -279,6 +279,9 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/handoff/count`|Number of handoffs that happened.|`dataSource`, 
`taskId`, `taskType`, `groupId`, `tags`|Varies. Generally greater than 0 once 
every segment granular period if cluster operating normally.|
 |`ingest/sink/count`|Number of sinks not handed off.|`dataSource`, `taskId`, 
`taskType`, `groupId`, `tags`|1~3|
 |`ingest/events/messageGap`|Time gap in milliseconds between the latest 
ingested event timestamp and the current system timestamp of metrics emission. 
If the value is increasing but lag is low, Druid may not be receiving new data. 
This metric is reset as new tasks spawn up.|`dataSource`, `taskId`, `taskType`, 
`groupId`, `tags`|Greater than 0, depends on the time carried in event.|
+|`ingest/events/maxMessageGap`|Maximum seen time gap in milliseconds between 
each ingested event timestamp and the current system timestamp of metrics 
emission. This metric is reset every emission period.|`dataSource`, `taskId`, 
`taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in 
event.|
+|`ingest/events/minMessageGap`|Minimum seen time gap in milliseconds between 
each ingested event timestamp and the current system timestamp of metrics 
emission. This metric is reset every emission period.|`dataSource`, `taskId`, 
`taskType`, `groupId`, `tags`|Greater than 0, depends on the time carried in 
event.|
+|`ingest/events/avgMessageGap`|Average time gap in milliseconds between each 
ingested event timestamp and the current system timestamp of metrics emission. 
This metric is reset every emission period.|`dataSource`, `taskId`, `taskType`, 
`groupId`, `tags`|Greater than 0, depends on the time carried in event.|
 |`ingest/notices/queueSize`|Number of pending notices to be processed by the 
coordinator.|`dataSource`, `tags`|Typically 0 and occasionally in lower single 
digits. Should not be a very high number. |
 |`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`dataSource`, `tags`| < 1s |
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index 2fb11cd7a12..602b0101308 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -132,6 +132,13 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
       emitter.emit(builder.setMetric("ingest/events/messageGap", messageGap));
     }
 
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
metrics.getMessageGapStats();
+    if (messageGapStats.count() > 0) {
+      emitter.emit(builder.setMetric("ingest/events/minMessageGap", 
messageGapStats.min()));
+      emitter.emit(builder.setMetric("ingest/events/maxMessageGap", 
messageGapStats.max()));
+      emitter.emit(builder.setMetric("ingest/events/avgMessageGap", 
messageGapStats.avg()));
+    }
+
     long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
     if (maxSegmentHandoffTime >= 0) {
       emitter.emit(builder.setMetric("ingest/handoff/time", 
maxSegmentHandoffTime));
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
index f6bea0917f0..d74384e4b29 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
@@ -52,9 +52,8 @@ public class TaskRealtimeMetricsMonitorTest
   );
 
   private static final Map<String, Object> TAGS = ImmutableMap.of("author", 
"Author Name", "version", 10);
-
-  @Mock(answer = Answers.RETURNS_MOCKS)
   private SegmentGenerationMetrics segmentGenerationMetrics;
+
   @Mock(answer = Answers.RETURNS_MOCKS)
   private RowIngestionMeters rowIngestionMeters;
   @Mock
@@ -66,6 +65,7 @@ public class TaskRealtimeMetricsMonitorTest
   public void setUp()
   {
     emittedEvents = new HashMap<>();
+    segmentGenerationMetrics = new SegmentGenerationMetrics();
     
Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class));
     Mockito
         .doAnswer(invocation -> {
@@ -94,4 +94,23 @@ public class TaskRealtimeMetricsMonitorTest
       Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS));
     }
   }
+
+  @Test
+  public void testMessageGapAggStats()
+  {
+    target = new TaskRealtimeMetricsMonitor(segmentGenerationMetrics, 
rowIngestionMeters, DIMENSIONS, null);
+
+    target.doMonitor(emitter);
+    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap"));
+    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/maxMessageGap"));
+    
Assert.assertFalse(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+
+    emittedEvents.clear();
+    segmentGenerationMetrics.reportMessageGap(1);
+    target.doMonitor(emitter);
+
+    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/minMessageGap"));
+    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap"));
+    
Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap"));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 564a3322bf9..66d9d497c96 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Metrics for segment generation.
@@ -55,6 +56,51 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  /**
+   * {@code MessageGapStats} tracks message gap statistics but is not 
thread-safe.
+   */
+  public static class MessageGapStats
+  {
+    private long min = Long.MAX_VALUE;
+    private long max = Long.MIN_VALUE;
+    private long count = 0;
+    private double total = 0;
+
+    public double avg()
+    {
+      return total / count;
+    }
+
+    public long min()
+    {
+      return min;
+    }
+
+    public long max()
+    {
+      return max;
+    }
+
+    public long count()
+    {
+      return count;
+    }
+
+    public void add(final long messageGap)
+    {
+      total += messageGap;
+      ++count;
+      if (min > messageGap) {
+        min = messageGap;
+      }
+      if (max < messageGap) {
+        max = messageGap;
+      }
+    }
+  }
+
+  private final AtomicReference<MessageGapStats> messageGapStats = new 
AtomicReference<>(new MessageGapStats());
+
   public void incrementRowOutputCount(long numRows)
   {
     rowOutputCount.addAndGet(numRows);
@@ -105,14 +151,23 @@ public class SegmentGenerationMetrics
     this.sinkCount.set(sinkCount);
   }
 
+  public void reportMessageGap(final long messageGap)
+  {
+    messageGapStats.get().add(messageGap);
+  }
+
   public void reportMessageMaxTimestamp(long messageMaxTimestamp)
   {
-    this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, 
this.messageMaxTimestamp.get()));
+    if (this.messageMaxTimestamp.get() < messageMaxTimestamp) {
+      this.messageMaxTimestamp.getAndAccumulate(messageMaxTimestamp, 
Math::max);
+    }
   }
 
   public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
   {
-    this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime, 
this.maxSegmentHandoffTime.get()));
+    if (this.maxSegmentHandoffTime.get() < maxSegmentHandoffTime) {
+      this.maxSegmentHandoffTime.getAndAccumulate(maxSegmentHandoffTime, 
Math::max);
+    }
   }
 
   public void markProcessingDone()
@@ -170,6 +225,7 @@ public class SegmentGenerationMetrics
   {
     return pushedRows.get();
   }
+
   public long mergeTimeMillis()
   {
     return mergeTimeMillis.get();
@@ -195,6 +251,14 @@ public class SegmentGenerationMetrics
     return sinkCount.get();
   }
 
+  /**
+   * See {@code MessageGapStats} for current gaurantees on thread-safety.
+   */
+  public MessageGapStats getMessageGapStats()
+  {
+    return messageGapStats.get();
+  }
+
   public long messageGap()
   {
     return messageGap.get();
@@ -220,19 +284,20 @@ public class SegmentGenerationMetrics
     retVal.persistCpuTime.set(persistCpuTime.get());
     retVal.handOffCount.set(handOffCount.get());
     retVal.sinkCount.set(sinkCount.get());
-    retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
     retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
     retVal.mergedRows.set(mergedRows.get());
     retVal.pushedRows.set(pushedRows.get());
 
     long messageGapSnapshot = 0;
-    final long maxTimestamp = retVal.messageMaxTimestamp.get();
+    final long maxTimestamp = messageMaxTimestamp.get();
+    retVal.messageMaxTimestamp.set(maxTimestamp);
     if (processingDone.get()) {
       messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
     } else if (maxTimestamp > 0) {
       messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
     }
     retVal.messageGap.set(messageGapSnapshot);
+    retVal.messageGapStats.set(messageGapStats.getAndSet(new 
MessageGapStats()));
 
     reset();
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 2b37481a8ed..2a1b0c67bc6 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -337,6 +337,9 @@ public class StreamAppenderator implements Appenderator
       throw e;
     }
 
+    final long currTs = System.currentTimeMillis();
+    metrics.reportMessageGap(currTs - row.getTimestampFromEpoch());
+
     if (sinkRowsInMemoryAfterAdd < 0) {
       throw new SegmentNotWritableException("Attempt to add row to swapped-out 
sink for segment[%s].", identifier);
     }
@@ -360,11 +363,11 @@ public class StreamAppenderator implements Appenderator
       persist = true;
       persistReasons.add("No more rows can be appended to sink");
     }
-    if (System.currentTimeMillis() > nextFlush) {
+    if (currTs > nextFlush) {
       persist = true;
       persistReasons.add(StringUtils.format(
           "current time[%d] is greater than nextFlush[%d]",
-          System.currentTimeMillis(),
+          currTs,
           nextFlush
       ));
     }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
index 1928c08b2ac..16dacf32110 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
@@ -20,24 +20,17 @@
 package org.apache.druid.segment.realtime;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class SegmentGenerationMetricsTest
 {
-  private SegmentGenerationMetrics metrics;
-
-  @Before
-  public void setup()
-  {
-    metrics = new SegmentGenerationMetrics();
-  }
-
   @Test
   public void testSnapshotBeforeProcessing()
   {
+    final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+    assertMessageGapAggregateMetricsReset(metrics);
     SegmentGenerationMetrics snapshot = metrics.snapshot();
-    Assert.assertEquals(0L, snapshot.messageGap());
+    assertMessageGapAggregateMetricsReset(metrics);
     // invalid value
     Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
   }
@@ -45,25 +38,78 @@ public class SegmentGenerationMetricsTest
   @Test
   public void testSnapshotAfterProcessingOver()
   {
-    metrics.reportMessageMaxTimestamp(System.currentTimeMillis() - 20L);
+    final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
+    metrics.reportMessageGap(20L);
+    metrics.reportMessageMaxTimestamp(20L);
     metrics.reportMaxSegmentHandoffTime(7L);
+
     SegmentGenerationMetrics snapshot = metrics.snapshot();
+
     Assert.assertTrue(snapshot.messageGap() >= 20L);
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
snapshot.getMessageGapStats();
+    Assert.assertEquals(20L, messageGapStats.min());
+    Assert.assertEquals(20L, messageGapStats.max());
+    Assert.assertEquals(messageGapStats.min(), messageGapStats.max());
+    Assert.assertEquals(20L, messageGapStats.avg(), 0);
     Assert.assertEquals(7, snapshot.maxSegmentHandoffTime());
   }
 
   @Test
   public void testProcessingOverAfterSnapshot()
   {
+    final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
     metrics.reportMessageMaxTimestamp(10);
+    metrics.reportMessageGap(1);
     metrics.reportMaxSegmentHandoffTime(7L);
     // Should reset to invalid value
     metrics.snapshot();
     metrics.markProcessingDone();
     SegmentGenerationMetrics snapshot = metrics.snapshot();
-    // Message gap must be invalid after processing is done
-    Assert.assertTrue(0 > snapshot.messageGap());
+
+    // Latest message gap must be invalid after processing is done
+    Assert.assertEquals(-1, snapshot.messageGap());
+
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
snapshot.getMessageGapStats();
+    Assert.assertEquals(0, messageGapStats.count());
+    Assert.assertEquals(Long.MIN_VALUE, messageGapStats.max());
+    Assert.assertEquals(Long.MAX_VALUE, messageGapStats.min());
+    Assert.assertEquals(Double.NaN, messageGapStats.avg(), 0);
     // value must be invalid
     Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
   }
+
+  @Test
+  public void testMessageGapAggregateMetrics()
+  {
+    final SegmentGenerationMetrics metrics = new SegmentGenerationMetrics();
+
+    for (int i = 0; i < 5; ++i) {
+      metrics.reportMessageGap(i * 30);
+    }
+    metrics.reportMessageMaxTimestamp(10L);
+    metrics.reportMaxSegmentHandoffTime(7L);
+    metrics.markProcessingDone();
+    SegmentGenerationMetrics snapshot = metrics.snapshot();
+    // Latest message gap must be invalid after processing is done
+    Assert.assertEquals(-1, snapshot.messageGap());
+
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
snapshot.getMessageGapStats();
+    Assert.assertEquals(5, messageGapStats.count());
+    Assert.assertEquals(0, messageGapStats.min());
+    Assert.assertEquals(120, messageGapStats.max());
+    Assert.assertEquals(60.0, messageGapStats.avg(), 0);
+
+    Assert.assertEquals(7L, snapshot.maxSegmentHandoffTime());
+  }
+
+  private static void assertMessageGapAggregateMetricsReset(final 
SegmentGenerationMetrics metrics)
+  {
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
metrics.getMessageGapStats();
+    Assert.assertEquals(0, messageGapStats.count());
+    Assert.assertEquals(Long.MIN_VALUE, messageGapStats.max());
+    Assert.assertEquals(Long.MAX_VALUE, messageGapStats.min());
+    Assert.assertEquals(Double.NaN, messageGapStats.avg(), 0);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to