kfaraz commented on code in PR #17847:
URL: https://github.com/apache/druid/pull/17847#discussion_r2094356094


##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;

Review Comment:
   ```suggestion
       private long min = Long.MAX_VALUE;
       private long max = Long.MIN_VALUE;
       private long count = 0;
       double total = 0;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;
+
+    public synchronized double avgMessageGap()
+    {
+      return totalMessageGap / numMessageGap;
+    }
+
+    public synchronized long getMinMessageGap()
+    {
+      return minMessageGap;
+    }
+
+    public synchronized long getMaxMessageGap()
+    {
+      return maxMessageGap;
+    }
+
+    public synchronized long getNumMessageGap()
+    {
+      return numMessageGap;
+    }
+
+    public synchronized void add(final long messageGap)
+    {
+      totalMessageGap += messageGap;
+      ++numMessageGap;
+      if (minMessageGap > messageGap) {
+        minMessageGap = messageGap;
+      }
+      if (maxMessageGap < messageGap) {
+        maxMessageGap = messageGap;
+      }
+    }
+
+    public MessageGapStats getAndReset()
+    {
+      final MessageGapStats copy = new MessageGapStats();
+      synchronized (this) {
+        copy.totalMessageGap = totalMessageGap;
+        copy.numMessageGap = numMessageGap;
+        copy.minMessageGap = minMessageGap;
+        copy.maxMessageGap = maxMessageGap;
+
+        totalMessageGap = 0;
+        numMessageGap = 0;
+        minMessageGap = Long.MAX_VALUE;
+        maxMessageGap = Long.MIN_VALUE;
+      }
+      return copy;
+    }
+  }
+
+  private final MessageGapStats messageGapStats = new MessageGapStats();

Review Comment:
   ```suggestion
     private final AtomicReference<MessageGapStats> messageGapStats = new 
AtomicReference<>(new MessageGapStats());
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;
+
+    public synchronized double avgMessageGap()

Review Comment:
   ```suggestion
       public double avg()
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -337,6 +337,9 @@ public AppenderatorAddResult add(
       throw e;
     }
 
+    final long currTs = System.currentTimeMillis();
+    metrics.reportMessageGap(currTs - row.getTimestampFromEpoch());

Review Comment:
   We should just send the message timestamp here. `SegmentGenerationMetrics` 
can decide what to do with it, same way we currently do for the max message 
timestamp.
   ```suggestion
       metrics.reportMessageTimestamp(row.getTimestampFromEpoch());
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;
+
+    public synchronized double avgMessageGap()
+    {
+      return totalMessageGap / numMessageGap;

Review Comment:
   ```suggestion
         return total / count;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;
+
+    public synchronized double avgMessageGap()
+    {
+      return totalMessageGap / numMessageGap;
+    }
+
+    public synchronized long getMinMessageGap()
+    {
+      return minMessageGap;
+    }
+
+    public synchronized long getMaxMessageGap()
+    {
+      return maxMessageGap;
+    }
+
+    public synchronized long getNumMessageGap()
+    {
+      return numMessageGap;
+    }
+
+    public synchronized void add(final long messageGap)
+    {
+      totalMessageGap += messageGap;
+      ++numMessageGap;
+      if (minMessageGap > messageGap) {
+        minMessageGap = messageGap;
+      }
+      if (maxMessageGap < messageGap) {
+        maxMessageGap = messageGap;
+      }
+    }
+
+    public MessageGapStats getAndReset()

Review Comment:
   This method should not be needed, use an `AtomicReference` in 
`SegmentGenerationMetrics` instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java:
##########
@@ -132,6 +132,21 @@ public boolean doMonitor(ServiceEmitter emitter)
       emitter.emit(builder.setMetric("ingest/events/messageGap", messageGap));
     }
 
+    final SegmentGenerationMetrics.MessageGapStats messageGapStats = 
metrics.getMessageGapStats();
+    final long minMessageGap = messageGapStats.getMinMessageGap();
+    final long maxMessageGap = messageGapStats.getMaxMessageGap();
+    final double avgMessageGap = messageGapStats.avgMessageGap();
+    // Best-effort way to ensure parity amongst emitted metrics
+    if (minMessageGap != Long.MAX_VALUE) {
+      emitter.emit(builder.setMetric("ingest/events/minMessageGap", 
minMessageGap));
+    }
+    if (maxMessageGap != Long.MIN_VALUE) {
+      emitter.emit(builder.setMetric("ingest/events/maxMessageGap", 
maxMessageGap));
+    }
+    if (!Double.isNaN(avgMessageGap)) {
+      emitter.emit(builder.setMetric("ingest/events/avgMessageGap", 
avgMessageGap));
+    }

Review Comment:
   ```suggestion
       if (messageGap.count() > 0) {
         emitter.emit(builder.setMetric("ingest/events/minMessageGap", 
messageGap.min()));
         emitter.emit(builder.setMetric("ingest/events/maxMessageGap", 
messageGap.max()));
         emitter.emit(builder.setMetric("ingest/events/avgMessageGap", 
messageGap.avg()));
       }
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats
+  {
+    long minMessageGap = Long.MAX_VALUE;
+    long maxMessageGap = Long.MIN_VALUE;
+    long numMessageGap = 0;
+    double totalMessageGap = 0;
+
+    public synchronized double avgMessageGap()
+    {
+      return totalMessageGap / numMessageGap;
+    }
+
+    public synchronized long getMinMessageGap()

Review Comment:
   Why do these methods need to be synchronized?
   Also, use method names `min`, `max`, `avg`, `count` instead for simplicity.



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -105,14 +164,23 @@ public void setSinkCount(long sinkCount)
     this.sinkCount.set(sinkCount);
   }
 
+  public void reportMessageGap(final long messageGap)
+  {
+    messageGapStats.add(messageGap);
+  }

Review Comment:
   ```suggestion
     public void reportMessageTimestamp(final long messageTimestamp)
     {
       messageGapStats.get().add(System.currentTimeMillis() - messageTimestamp);
     }
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -105,14 +164,23 @@ public void setSinkCount(long sinkCount)
     this.sinkCount.set(sinkCount);
   }
 
+  public void reportMessageGap(final long messageGap)
+  {
+    messageGapStats.add(messageGap);
+  }
+
   public void reportMessageMaxTimestamp(long messageMaxTimestamp)
   {
-    this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, 
this.messageMaxTimestamp.get()));
+    if (this.messageMaxTimestamp.get() < messageMaxTimestamp) {
+      this.messageMaxTimestamp.getAndAccumulate(messageMaxTimestamp, 
Math::max);
+    }
   }
 
   public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
   {
-    this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime, 
this.maxSegmentHandoffTime.get()));
+    if (this.maxSegmentHandoffTime.get() < maxSegmentHandoffTime) {
+      this.maxSegmentHandoffTime.getAndAccumulate(maxSegmentHandoffTime, 
Math::max);

Review Comment:
   Why is this change needed?



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
 
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
+  public static class MessageGapStats

Review Comment:
   Note: I feel this class can have other applications too, but for now we can 
just leave it here.



##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -220,20 +294,26 @@ public SegmentGenerationMetrics snapshot()
     retVal.persistCpuTime.set(persistCpuTime.get());
     retVal.handOffCount.set(handOffCount.get());
     retVal.sinkCount.set(sinkCount.get());
-    retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
     retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
     retVal.mergedRows.set(mergedRows.get());
     retVal.pushedRows.set(pushedRows.get());
 
     long messageGapSnapshot = 0;
-    final long maxTimestamp = retVal.messageMaxTimestamp.get();
+    final long maxTimestamp = messageMaxTimestamp.get();
+    retVal.messageMaxTimestamp.set(maxTimestamp);
     if (processingDone.get()) {
       messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
     } else if (maxTimestamp > 0) {
       messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
     }
     retVal.messageGap.set(messageGapSnapshot);
 
+    final MessageGapStats gapStatsCopy = messageGapStats.getAndReset();
+    retVal.messageGapStats.totalMessageGap = gapStatsCopy.totalMessageGap;
+    retVal.messageGapStats.numMessageGap = gapStatsCopy.numMessageGap;
+    retVal.messageGapStats.maxMessageGap = gapStatsCopy.maxMessageGap;
+    retVal.messageGapStats.minMessageGap = gapStatsCopy.minMessageGap;

Review Comment:
   ```suggestion
       retVal.messageGapStats.set(messageGapStats.getAndSet(new 
MessageGapStats()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to