This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a40ebffc33b Make methods of SegmentGenerationMetrics.MessageGapStats 
thread safe (#18014)
a40ebffc33b is described below

commit a40ebffc33b76926ce23d8d536323df2b1265d1f
Author: jtuglu-netflix <[email protected]>
AuthorDate: Mon May 19 01:35:33 2025 -0700

    Make methods of SegmentGenerationMetrics.MessageGapStats thread safe 
(#18014)
    
    Follow up to #17847
---
 .../segment/realtime/SegmentGenerationMetrics.java | 43 ++++++++++++++++------
 1 file changed, 32 insertions(+), 11 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
index 66d9d497c96..2afc2b72831 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Metrics for segment generation.
@@ -57,7 +56,7 @@ public class SegmentGenerationMetrics
   private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
   /**
-   * {@code MessageGapStats} tracks message gap statistics but is not 
thread-safe.
+   * {@code MessageGapStats} tracks message gap statistics and is thread-safe.
    */
   public static class MessageGapStats
   {
@@ -66,27 +65,27 @@ public class SegmentGenerationMetrics
     private long count = 0;
     private double total = 0;
 
-    public double avg()
+    public synchronized double avg()
     {
       return total / count;
     }
 
-    public long min()
+    public synchronized long min()
     {
       return min;
     }
 
-    public long max()
+    public synchronized long max()
     {
       return max;
     }
 
-    public long count()
+    public synchronized long count()
     {
       return count;
     }
 
-    public void add(final long messageGap)
+    public synchronized void add(final long messageGap)
     {
       total += messageGap;
       ++count;
@@ -97,9 +96,26 @@ public class SegmentGenerationMetrics
         max = messageGap;
       }
     }
+
+    public MessageGapStats copyAndReset()
+    {
+      final MessageGapStats copy = new MessageGapStats();
+      synchronized (this) {
+        copy.total = total;
+        copy.count = count;
+        copy.min = min;
+        copy.max = max;
+
+        total = 0;
+        count = 0;
+        min = Long.MAX_VALUE;
+        max = Long.MIN_VALUE;
+      }
+      return copy;
+    }
   }
 
-  private final AtomicReference<MessageGapStats> messageGapStats = new 
AtomicReference<>(new MessageGapStats());
+  private final MessageGapStats messageGapStats = new MessageGapStats();
 
   public void incrementRowOutputCount(long numRows)
   {
@@ -153,7 +169,7 @@ public class SegmentGenerationMetrics
 
   public void reportMessageGap(final long messageGap)
   {
-    messageGapStats.get().add(messageGap);
+    messageGapStats.add(messageGap);
   }
 
   public void reportMessageMaxTimestamp(long messageMaxTimestamp)
@@ -256,7 +272,7 @@ public class SegmentGenerationMetrics
    */
   public MessageGapStats getMessageGapStats()
   {
-    return messageGapStats.get();
+    return messageGapStats;
   }
 
   public long messageGap()
@@ -297,7 +313,12 @@ public class SegmentGenerationMetrics
       messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
     }
     retVal.messageGap.set(messageGapSnapshot);
-    retVal.messageGapStats.set(messageGapStats.getAndSet(new 
MessageGapStats()));
+
+    final MessageGapStats messageGapStatsSnapshot = 
messageGapStats.copyAndReset();
+    retVal.messageGapStats.total = messageGapStatsSnapshot.total;
+    retVal.messageGapStats.count = messageGapStatsSnapshot.count;
+    retVal.messageGapStats.max = messageGapStatsSnapshot.max;
+    retVal.messageGapStats.min = messageGapStatsSnapshot.min;
 
     reset();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to