This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fb23e38aa7 Fix messageGap emission (#13346)
fb23e38aa7 is described below

commit fb23e38aa716d5f57c4f89352c3e3da5a10ac502
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Nov 10 17:50:19 2022 +0530

    Fix messageGap emission (#13346)
    
    * Fix messageGap emission
    
    * Do not emit messageGap after stopping reading events
    
    * Refactoring
    
    * Fix tests
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  6 ++--
 .../common/stats/TaskRealtimeMetricsMonitor.java   |  6 +++-
 .../SeekableStreamIndexTaskRunner.java             |  3 +-
 .../segment/realtime/FireDepartmentMetrics.java    | 41 ++++++++++------------
 .../segment/realtime/RealtimeMetricsMonitor.java   |  6 +++-
 .../realtime/FireDepartmentMetricsTest.java        | 33 +++--------------
 6 files changed, 38 insertions(+), 57 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4bce2c90fa..36ddea4c36 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -419,7 +419,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(3, 
task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
-    Assert.assertNotEquals(-1, 
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+    
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -560,7 +560,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(3, 
task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
-    Assert.assertNotEquals(-1, 
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+    
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
@@ -613,7 +613,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
     Assert.assertEquals(3, 
task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, 
task.getRunner().getRowIngestionMeters().getThrownAway());
-    Assert.assertNotEquals(-1, 
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+    
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
 
     // Check published metadata and segments in deep storage
     assertEqualsExceptVersion(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index e4e28844e9..674b6e77bf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -119,7 +119,11 @@ public class TaskRealtimeMetricsMonitor extends 
AbstractMonitor
     emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - 
previousFireDepartmentMetrics.mergeCpuTime()));
     emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() 
- previousFireDepartmentMetrics.handOffCount()));
     emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
-    emitter.emit(builder.build("ingest/events/messageGap", 
metrics.messageGap()));
+
+    long messageGap = metrics.messageGap();
+    if (messageGap >= 0) {
+      emitter.emit(builder.build("ingest/events/messageGap", messageGap));
+    }
 
     long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
     if (maxSegmentHandoffTime >= 0) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 2d85eb4407..800dbcdeb9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -769,6 +769,8 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
       finally {
         try {
+          // To handle cases where tasks stop reading due to stop request or 
exceptions
+          fireDepartmentMetrics.markProcessingDone();
           driver.persist(committerSupplier.get()); // persist pending data
         }
         catch (Exception e) {
@@ -897,7 +899,6 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     }
     finally {
       try {
-
         if (driver != null) {
           driver.close();
         }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
index 952f4e2161..359951356e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
@@ -21,15 +21,16 @@ package org.apache.druid.segment.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  */
 public class FireDepartmentMetrics
 {
-  private static final long DEFAULT_PROCESSING_COMPLETION_TIME = -1L;
+  private static final long NO_EMIT_SEGMENT_HANDOFF_TIME = -1L;
 
-  private static final long DEFAULT_SEGMENT_HANDOFF_TIME = -1L;
+  private static final long NO_EMIT_MESSAGE_GAP = -1L;
 
   private final AtomicLong processedCount = new AtomicLong(0);
   private final AtomicLong processedWithErrorsCount = new AtomicLong(0);
@@ -49,9 +50,9 @@ public class FireDepartmentMetrics
   private final AtomicLong sinkCount = new AtomicLong(0);
   private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
   private final AtomicLong messageGap = new AtomicLong(0);
-  private final AtomicLong messageProcessingCompletionTime = new 
AtomicLong(DEFAULT_PROCESSING_COMPLETION_TIME);
+  private final AtomicBoolean processingDone = new AtomicBoolean(false);
 
-  private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(DEFAULT_SEGMENT_HANDOFF_TIME);
+  private final AtomicLong maxSegmentHandoffTime = new 
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
 
   public void incrementProcessed()
   {
@@ -145,19 +146,13 @@ public class FireDepartmentMetrics
 
   public void markProcessingDone()
   {
-    markProcessingDone(System.currentTimeMillis());
+    this.processingDone.set(true);
   }
 
   @VisibleForTesting
-  void markProcessingDone(long timestamp)
+  public boolean isProcessingDone()
   {
-    
this.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME,
 timestamp);
-  }
-
-  @VisibleForTesting
-  public long processingCompletionTime()
-  {
-    return messageProcessingCompletionTime.get();
+    return processingDone.get();
   }
 
   public long processed()
@@ -240,11 +235,6 @@ public class FireDepartmentMetrics
     return sinkCount.get();
   }
 
-  public long messageMaxTimestamp()
-  {
-    return messageMaxTimestamp.get();
-  }
-
   public long messageGap()
   {
     return messageGap.get();
@@ -276,10 +266,15 @@ public class FireDepartmentMetrics
     retVal.sinkCount.set(sinkCount.get());
     retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
     retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
-    
retVal.messageProcessingCompletionTime.set(messageProcessingCompletionTime.get());
-    
retVal.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME,
 System.currentTimeMillis());
-    long maxTimestamp = retVal.messageMaxTimestamp.get();
-    retVal.messageGap.set(maxTimestamp > 0 ? 
retVal.messageProcessingCompletionTime.get() - maxTimestamp : 0L);
+
+    long messageGapSnapshot = 0;
+    final long maxTimestamp = retVal.messageMaxTimestamp.get();
+    if (processingDone.get()) {
+      messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
+    } else if (maxTimestamp > 0) {
+      messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
+    }
+    retVal.messageGap.set(messageGapSnapshot);
 
     reset();
 
@@ -288,6 +283,6 @@ public class FireDepartmentMetrics
 
   private void reset()
   {
-    maxSegmentHandoffTime.set(DEFAULT_SEGMENT_HANDOFF_TIME);
+    maxSegmentHandoffTime.set(NO_EMIT_SEGMENT_HANDOFF_TIME);
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
index d6ca6770d5..43058851a9 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
@@ -113,7 +113,11 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
       emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - 
previous.mergeCpuTime()));
       emitter.emit(builder.build("ingest/handoff/count", 
metrics.handOffCount() - previous.handOffCount()));
       emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
-      emitter.emit(builder.build("ingest/events/messageGap", 
metrics.messageGap()));
+
+      long messageGap = metrics.messageGap();
+      if (messageGap >= 0) {
+        emitter.emit(builder.build("ingest/events/messageGap", messageGap));
+      }
 
       long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
       if (maxSegmentHandoffTime >= 0) {
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
index 04f7ed22a4..d9c1c461ab 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
@@ -45,23 +45,13 @@ public class FireDepartmentMetricsTest
   @Test
   public void testSnapshotAfterProcessingOver()
   {
-    metrics.reportMessageMaxTimestamp(10);
+    metrics.reportMessageMaxTimestamp(System.currentTimeMillis() - 20L);
     metrics.reportMaxSegmentHandoffTime(7L);
-    metrics.markProcessingDone(30L);
     FireDepartmentMetrics snapshot = metrics.snapshot();
-    Assert.assertEquals(20, snapshot.messageGap());
+    Assert.assertTrue(snapshot.messageGap() >= 20L);
     Assert.assertEquals(7, snapshot.maxSegmentHandoffTime());
   }
 
-  @Test
-  public void testSnapshotBeforeProcessingOver()
-  {
-    metrics.reportMessageMaxTimestamp(10);
-    long current = System.currentTimeMillis();
-    long messageGap = metrics.snapshot().messageGap();
-    Assert.assertTrue("Message gap: " + messageGap, messageGap >= (current - 
10));
-  }
-
   @Test
   public void testProcessingOverAfterSnapshot()
   {
@@ -69,24 +59,11 @@ public class FireDepartmentMetricsTest
     metrics.reportMaxSegmentHandoffTime(7L);
     // Should reset to invalid value
     metrics.snapshot();
-    metrics.markProcessingDone(20);
+    metrics.markProcessingDone();
     FireDepartmentMetrics snapshot = metrics.snapshot();
-    Assert.assertEquals(10, snapshot.messageGap());
+    // Message gap must be invalid after processing is done
+    Assert.assertTrue(0 > snapshot.messageGap());
     // value must be invalid
     Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
   }
-
-  @Test
-  public void testProcessingOverWithSystemTime()
-  {
-    metrics.reportMessageMaxTimestamp(10);
-    long current = System.currentTimeMillis();
-    metrics.markProcessingDone();
-    long completionTime = metrics.processingCompletionTime();
-    Assert.assertTrue(
-        "Completion time: " + completionTime,
-        completionTime >= current && completionTime < (current + 10_000)
-    );
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to