This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a40ebffc33b Make methods of SegmentGenerationMetrics.MessageGapStats
thread safe (#18014)
a40ebffc33b is described below
commit a40ebffc33b76926ce23d8d536323df2b1265d1f
Author: jtuglu-netflix <[email protected]>
AuthorDate: Mon May 19 01:35:33 2025 -0700
Make methods of SegmentGenerationMetrics.MessageGapStats thread safe
(#18014)
Follow up to #17847
---
.../segment/realtime/SegmentGenerationMetrics.java | 43 ++++++++++++++++------
1 file changed, 32 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 66d9d497c96..2afc2b72831 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Metrics for segment generation.
@@ -57,7 +56,7 @@ public class SegmentGenerationMetrics
private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
/**
- * {@code MessageGapStats} tracks message gap statistics but is not
thread-safe.
+ * {@code MessageGapStats} tracks message gap statistics and is thread-safe.
*/
public static class MessageGapStats
{
@@ -66,27 +65,27 @@ public class SegmentGenerationMetrics
private long count = 0;
private double total = 0;
- public double avg()
+ public synchronized double avg()
{
return total / count;
}
- public long min()
+ public synchronized long min()
{
return min;
}
- public long max()
+ public synchronized long max()
{
return max;
}
- public long count()
+ public synchronized long count()
{
return count;
}
- public void add(final long messageGap)
+ public synchronized void add(final long messageGap)
{
total += messageGap;
++count;
@@ -97,9 +96,26 @@ public class SegmentGenerationMetrics
max = messageGap;
}
}
+
+ public MessageGapStats copyAndReset()
+ {
+ final MessageGapStats copy = new MessageGapStats();
+ synchronized (this) {
+ copy.total = total;
+ copy.count = count;
+ copy.min = min;
+ copy.max = max;
+
+ total = 0;
+ count = 0;
+ min = Long.MAX_VALUE;
+ max = Long.MIN_VALUE;
+ }
+ return copy;
+ }
}
- private final AtomicReference<MessageGapStats> messageGapStats = new
AtomicReference<>(new MessageGapStats());
+ private final MessageGapStats messageGapStats = new MessageGapStats();
public void incrementRowOutputCount(long numRows)
{
@@ -153,7 +169,7 @@ public class SegmentGenerationMetrics
public void reportMessageGap(final long messageGap)
{
- messageGapStats.get().add(messageGap);
+ messageGapStats.add(messageGap);
}
public void reportMessageMaxTimestamp(long messageMaxTimestamp)
@@ -256,7 +272,7 @@ public class SegmentGenerationMetrics
*/
public MessageGapStats getMessageGapStats()
{
- return messageGapStats.get();
+ return messageGapStats;
}
public long messageGap()
@@ -297,7 +313,12 @@ public class SegmentGenerationMetrics
messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
}
retVal.messageGap.set(messageGapSnapshot);
- retVal.messageGapStats.set(messageGapStats.getAndSet(new
MessageGapStats()));
+
+ final MessageGapStats messageGapStatsSnapshot =
messageGapStats.copyAndReset();
+ retVal.messageGapStats.total = messageGapStatsSnapshot.total;
+ retVal.messageGapStats.count = messageGapStatsSnapshot.count;
+ retVal.messageGapStats.max = messageGapStatsSnapshot.max;
+ retVal.messageGapStats.min = messageGapStatsSnapshot.min;
reset();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]