This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fb23e38aa7 Fix messageGap emission (#13346)
fb23e38aa7 is described below
commit fb23e38aa716d5f57c4f89352c3e3da5a10ac502
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Thu Nov 10 17:50:19 2022 +0530
Fix messageGap emission (#13346)
* Fix messageGap emission
* Do not emit messageGap after stopping reading events
* Refactoring
* Fix tests
---
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 6 ++--
.../common/stats/TaskRealtimeMetricsMonitor.java | 6 +++-
.../SeekableStreamIndexTaskRunner.java | 3 +-
.../segment/realtime/FireDepartmentMetrics.java | 41 ++++++++++------------
.../segment/realtime/RealtimeMetricsMonitor.java | 6 +++-
.../realtime/FireDepartmentMetricsTest.java | 33 +++--------------
6 files changed, 38 insertions(+), 57 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4bce2c90fa..36ddea4c36 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -419,7 +419,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(3,
task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getThrownAway());
- Assert.assertNotEquals(-1,
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -560,7 +560,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(3,
task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getThrownAway());
- Assert.assertNotEquals(-1,
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
@@ -613,7 +613,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Assert.assertEquals(3,
task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0,
task.getRunner().getRowIngestionMeters().getThrownAway());
- Assert.assertNotEquals(-1,
task.getRunner().getFireDepartmentMetrics().processingCompletionTime());
+
Assert.assertTrue(task.getRunner().getFireDepartmentMetrics().isProcessingDone());
// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
index e4e28844e9..674b6e77bf 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java
@@ -119,7 +119,11 @@ public class TaskRealtimeMetricsMonitor extends
AbstractMonitor
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() -
previousFireDepartmentMetrics.mergeCpuTime()));
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount()
- previousFireDepartmentMetrics.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
- emitter.emit(builder.build("ingest/events/messageGap",
metrics.messageGap()));
+
+ long messageGap = metrics.messageGap();
+ if (messageGap >= 0) {
+ emitter.emit(builder.build("ingest/events/messageGap", messageGap));
+ }
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
if (maxSegmentHandoffTime >= 0) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 2d85eb4407..800dbcdeb9 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -769,6 +769,8 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
finally {
try {
+ // To handle cases where tasks stop reading due to stop request or
exceptions
+ fireDepartmentMetrics.markProcessingDone();
driver.persist(committerSupplier.get()); // persist pending data
}
catch (Exception e) {
@@ -897,7 +899,6 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
finally {
try {
-
if (driver != null) {
driver.close();
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
index 952f4e2161..359951356e 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java
@@ -21,15 +21,16 @@ package org.apache.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class FireDepartmentMetrics
{
- private static final long DEFAULT_PROCESSING_COMPLETION_TIME = -1L;
+ private static final long NO_EMIT_SEGMENT_HANDOFF_TIME = -1L;
- private static final long DEFAULT_SEGMENT_HANDOFF_TIME = -1L;
+ private static final long NO_EMIT_MESSAGE_GAP = -1L;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong processedWithErrorsCount = new AtomicLong(0);
@@ -49,9 +50,9 @@ public class FireDepartmentMetrics
private final AtomicLong sinkCount = new AtomicLong(0);
private final AtomicLong messageMaxTimestamp = new AtomicLong(0);
private final AtomicLong messageGap = new AtomicLong(0);
- private final AtomicLong messageProcessingCompletionTime = new
AtomicLong(DEFAULT_PROCESSING_COMPLETION_TIME);
+ private final AtomicBoolean processingDone = new AtomicBoolean(false);
- private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(DEFAULT_SEGMENT_HANDOFF_TIME);
+ private final AtomicLong maxSegmentHandoffTime = new
AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
public void incrementProcessed()
{
@@ -145,19 +146,13 @@ public class FireDepartmentMetrics
public void markProcessingDone()
{
- markProcessingDone(System.currentTimeMillis());
+ this.processingDone.set(true);
}
@VisibleForTesting
- void markProcessingDone(long timestamp)
+ public boolean isProcessingDone()
{
-
this.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME,
timestamp);
- }
-
- @VisibleForTesting
- public long processingCompletionTime()
- {
- return messageProcessingCompletionTime.get();
+ return processingDone.get();
}
public long processed()
@@ -240,11 +235,6 @@ public class FireDepartmentMetrics
return sinkCount.get();
}
- public long messageMaxTimestamp()
- {
- return messageMaxTimestamp.get();
- }
-
public long messageGap()
{
return messageGap.get();
@@ -276,10 +266,15 @@ public class FireDepartmentMetrics
retVal.sinkCount.set(sinkCount.get());
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
-
retVal.messageProcessingCompletionTime.set(messageProcessingCompletionTime.get());
-
retVal.messageProcessingCompletionTime.compareAndSet(DEFAULT_PROCESSING_COMPLETION_TIME,
System.currentTimeMillis());
- long maxTimestamp = retVal.messageMaxTimestamp.get();
- retVal.messageGap.set(maxTimestamp > 0 ?
retVal.messageProcessingCompletionTime.get() - maxTimestamp : 0L);
+
+ long messageGapSnapshot = 0;
+ final long maxTimestamp = retVal.messageMaxTimestamp.get();
+ if (processingDone.get()) {
+ messageGapSnapshot = NO_EMIT_MESSAGE_GAP;
+ } else if (maxTimestamp > 0) {
+ messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
+ }
+ retVal.messageGap.set(messageGapSnapshot);
reset();
@@ -288,6 +283,6 @@ public class FireDepartmentMetrics
private void reset()
{
- maxSegmentHandoffTime.set(DEFAULT_SEGMENT_HANDOFF_TIME);
+ maxSegmentHandoffTime.set(NO_EMIT_SEGMENT_HANDOFF_TIME);
}
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
index d6ca6770d5..43058851a9 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitor.java
@@ -113,7 +113,11 @@ public class RealtimeMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() -
previous.mergeCpuTime()));
emitter.emit(builder.build("ingest/handoff/count",
metrics.handOffCount() - previous.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
- emitter.emit(builder.build("ingest/events/messageGap",
metrics.messageGap()));
+
+ long messageGap = metrics.messageGap();
+ if (messageGap >= 0) {
+ emitter.emit(builder.build("ingest/events/messageGap", messageGap));
+ }
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
if (maxSegmentHandoffTime >= 0) {
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
index 04f7ed22a4..d9c1c461ab 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentMetricsTest.java
@@ -45,23 +45,13 @@ public class FireDepartmentMetricsTest
@Test
public void testSnapshotAfterProcessingOver()
{
- metrics.reportMessageMaxTimestamp(10);
+ metrics.reportMessageMaxTimestamp(System.currentTimeMillis() - 20L);
metrics.reportMaxSegmentHandoffTime(7L);
- metrics.markProcessingDone(30L);
FireDepartmentMetrics snapshot = metrics.snapshot();
- Assert.assertEquals(20, snapshot.messageGap());
+ Assert.assertTrue(snapshot.messageGap() >= 20L);
Assert.assertEquals(7, snapshot.maxSegmentHandoffTime());
}
- @Test
- public void testSnapshotBeforeProcessingOver()
- {
- metrics.reportMessageMaxTimestamp(10);
- long current = System.currentTimeMillis();
- long messageGap = metrics.snapshot().messageGap();
- Assert.assertTrue("Message gap: " + messageGap, messageGap >= (current -
10));
- }
-
@Test
public void testProcessingOverAfterSnapshot()
{
@@ -69,24 +59,11 @@ public class FireDepartmentMetricsTest
metrics.reportMaxSegmentHandoffTime(7L);
// Should reset to invalid value
metrics.snapshot();
- metrics.markProcessingDone(20);
+ metrics.markProcessingDone();
FireDepartmentMetrics snapshot = metrics.snapshot();
- Assert.assertEquals(10, snapshot.messageGap());
+ // Message gap must be invalid after processing is done
+ Assert.assertTrue(0 > snapshot.messageGap());
// value must be invalid
Assert.assertTrue(0 > snapshot.maxSegmentHandoffTime());
}
-
- @Test
- public void testProcessingOverWithSystemTime()
- {
- metrics.reportMessageMaxTimestamp(10);
- long current = System.currentTimeMillis();
- metrics.markProcessingDone();
- long completionTime = metrics.processingCompletionTime();
- Assert.assertTrue(
- "Completion time: " + completionTime,
- completionTime >= current && completionTime < (current + 10_000)
- );
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]