kfaraz commented on code in PR #17847:
URL: https://github.com/apache/druid/pull/17847#discussion_r2094356094
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
Review Comment:
```suggestion
private long min = Long.MAX_VALUE;
private long max = Long.MIN_VALUE;
private long count = 0;
double total = 0;
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
+
+ public synchronized double avgMessageGap()
+ {
+ return totalMessageGap / numMessageGap;
+ }
+
+ public synchronized long getMinMessageGap()
+ {
+ return minMessageGap;
+ }
+
+ public synchronized long getMaxMessageGap()
+ {
+ return maxMessageGap;
+ }
+
+ public synchronized long getNumMessageGap()
+ {
+ return numMessageGap;
+ }
+
+ public synchronized void add(final long messageGap)
+ {
+ totalMessageGap += messageGap;
+ ++numMessageGap;
+ if (minMessageGap > messageGap) {
+ minMessageGap = messageGap;
+ }
+ if (maxMessageGap < messageGap) {
+ maxMessageGap = messageGap;
+ }
+ }
+
+ public MessageGapStats getAndReset()
+ {
+ final MessageGapStats copy = new MessageGapStats();
+ synchronized (this) {
+ copy.totalMessageGap = totalMessageGap;
+ copy.numMessageGap = numMessageGap;
+ copy.minMessageGap = minMessageGap;
+ copy.maxMessageGap = maxMessageGap;
+
+ totalMessageGap = 0;
+ numMessageGap = 0;
+ minMessageGap = Long.MAX_VALUE;
+ maxMessageGap = Long.MIN_VALUE;
+ }
+ return copy;
+ }
+ }
+
+ private final MessageGapStats messageGapStats = new MessageGapStats();
Review Comment:
```suggestion
private final AtomicReference<MessageGapStats> messageGapStats = new
AtomicReference<>(new MessageGapStats());
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
+
+ public synchronized double avgMessageGap()
Review Comment:
```suggestion
public double avg()
```
##########
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java:
##########
@@ -337,6 +337,9 @@ public AppenderatorAddResult add(
throw e;
}
+ final long currTs = System.currentTimeMillis();
+ metrics.reportMessageGap(currTs - row.getTimestampFromEpoch());
Review Comment:
We should just send the message timestamp here. `SegmentGenerationMetrics`
can decide what to do with it, same way we currently do for the max message
timestamp.
```suggestion
metrics.reportMessageTimestamp(row.getTimestampFromEpoch());
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
+
+ public synchronized double avgMessageGap()
+ {
+ return totalMessageGap / numMessageGap;
Review Comment:
```suggestion
return total / count;
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
+
+ public synchronized double avgMessageGap()
+ {
+ return totalMessageGap / numMessageGap;
+ }
+
+ public synchronized long getMinMessageGap()
+ {
+ return minMessageGap;
+ }
+
+ public synchronized long getMaxMessageGap()
+ {
+ return maxMessageGap;
+ }
+
+ public synchronized long getNumMessageGap()
+ {
+ return numMessageGap;
+ }
+
+ public synchronized void add(final long messageGap)
+ {
+ totalMessageGap += messageGap;
+ ++numMessageGap;
+ if (minMessageGap > messageGap) {
+ minMessageGap = messageGap;
+ }
+ if (maxMessageGap < messageGap) {
+ maxMessageGap = messageGap;
+ }
+ }
+
+ public MessageGapStats getAndReset()
Review Comment:
This method should not be needed, use an `AtomicReference` in
`SegmentGenerationMetrics` instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java:
##########
@@ -132,6 +132,21 @@ public boolean doMonitor(ServiceEmitter emitter)
emitter.emit(builder.setMetric("ingest/events/messageGap", messageGap));
}
+ final SegmentGenerationMetrics.MessageGapStats messageGapStats =
metrics.getMessageGapStats();
+ final long minMessageGap = messageGapStats.getMinMessageGap();
+ final long maxMessageGap = messageGapStats.getMaxMessageGap();
+ final double avgMessageGap = messageGapStats.avgMessageGap();
+ // Best-effort way to ensure parity amongst emitted metrics
+ if (minMessageGap != Long.MAX_VALUE) {
+ emitter.emit(builder.setMetric("ingest/events/minMessageGap",
minMessageGap));
+ }
+ if (maxMessageGap != Long.MIN_VALUE) {
+ emitter.emit(builder.setMetric("ingest/events/maxMessageGap",
maxMessageGap));
+ }
+ if (!Double.isNaN(avgMessageGap)) {
+ emitter.emit(builder.setMetric("ingest/events/avgMessageGap",
avgMessageGap));
+ }
Review Comment:
```suggestion
if (messageGap.count() > 0) {
emitter.emit(builder.setMetric("ingest/events/minMessageGap",
messageGap.min()));
emitter.emit(builder.setMetric("ingest/events/maxMessageGap",
messageGap.max()));
emitter.emit(builder.setMetric("ingest/events/avgMessageGap",
messageGap.avg()));
}
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
+ {
+ long minMessageGap = Long.MAX_VALUE;
+ long maxMessageGap = Long.MIN_VALUE;
+ long numMessageGap = 0;
+ double totalMessageGap = 0;
+
+ public synchronized double avgMessageGap()
+ {
+ return totalMessageGap / numMessageGap;
+ }
+
+ public synchronized long getMinMessageGap()
Review Comment:
Why do these methods need to be synchronized?
Also, use method names `min`, `max`, `avg`, `count` instead for simplicity.
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -105,14 +164,23 @@ public void setSinkCount(long sinkCount)
this.sinkCount.set(sinkCount);
}
+ public void reportMessageGap(final long messageGap)
+ {
+ messageGapStats.add(messageGap);
+ }
Review Comment:
```suggestion
public void reportMessageTimestamp(final long messageTimestamp)
{
messageGapStats.get().add(System.currentTimeMillis() - messageTimestamp);
}
```
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -105,14 +164,23 @@ public void setSinkCount(long sinkCount)
this.sinkCount.set(sinkCount);
}
+ public void reportMessageGap(final long messageGap)
+ {
+ messageGapStats.add(messageGap);
+ }
+
public void reportMessageMaxTimestamp(long messageMaxTimestamp)
{
- this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp,
this.messageMaxTimestamp.get()));
+ if (this.messageMaxTimestamp.get() < messageMaxTimestamp) {
+ this.messageMaxTimestamp.getAndAccumulate(messageMaxTimestamp,
Math::max);
+ }
}
public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
{
- this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime,
this.maxSegmentHandoffTime.get()));
+ if (this.maxSegmentHandoffTime.get() < maxSegmentHandoffTime) {
+ this.maxSegmentHandoffTime.getAndAccumulate(maxSegmentHandoffTime,
Math::max);
Review Comment:
Why is this change needed?
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -55,6 +55,65 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
+ public static class MessageGapStats
Review Comment:
Note: I feel this class can have other applications too, but for now we can
just leave it here.
##########
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java:
##########
@@ -220,20 +294,26 @@ public SegmentGenerationMetrics snapshot()
retVal.persistCpuTime.set(persistCpuTime.get());
retVal.handOffCount.set(handOffCount.get());
retVal.sinkCount.set(sinkCount.get());
- retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
retVal.mergedRows.set(mergedRows.get());
retVal.pushedRows.set(pushedRows.get());
long messageGapSnapshot = 0;
- final long maxTimestamp = retVal.messageMaxTimestamp.get();
+ final long maxTimestamp = messageMaxTimestamp.get();
+ retVal.messageMaxTimestamp.set(maxTimestamp);
if (processingDone.get()) {
messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
} else if (maxTimestamp > 0) {
messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
}
retVal.messageGap.set(messageGapSnapshot);
+ final MessageGapStats gapStatsCopy = messageGapStats.getAndReset();
+ retVal.messageGapStats.totalMessageGap = gapStatsCopy.totalMessageGap;
+ retVal.messageGapStats.numMessageGap = gapStatsCopy.numMessageGap;
+ retVal.messageGapStats.maxMessageGap = gapStatsCopy.maxMessageGap;
+ retVal.messageGapStats.minMessageGap = gapStatsCopy.minMessageGap;
Review Comment:
```suggestion
retVal.messageGapStats.set(messageGapStats.getAndSet(new
MessageGapStats()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]