This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 761fe9f  Add new metric that quantifies how long batch ingest jobs 
waited for segment availability and whether or not that wait was successful 
(#12002)
761fe9f is described below

commit 761fe9f144d56e8c7c2f0a8e4838dc3f2c2d5d31
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri Dec 10 11:40:52 2021 -0600

    Add new metric that quantifies how long batch ingest jobs waited for 
segment availability and whether or not that wait was successful (#12002)
    
    * add a unit test that tests that new metric is emitted
    
    * remove unused import
    
    * clarify in doc that this is for batch tasks
    
    * fix IndexTaskTest
---
 docs/operations/metrics.md                         |  1 +
 .../common/task/AbstractBatchIndexTask.java        |  9 +++
 .../druid/indexing/common/task/IndexTaskTest.java  | 90 ++++++++++++++++++++++
 website/.spelling                                  |  1 +
 4 files changed, 101 insertions(+)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index dc93947..b525424 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for 
the current thread, i
 |`taskSlot/used/count`|Number of busy task slots per emission period. This 
metric is only available if the TaskSlotCountStatsMonitor module is 
included.|category.|Varies.|
 |`taskSlot/lazy/count`|Number of total task slots in lazy marked 
MiddleManagers and Indexers per emission period. This metric is only available 
if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
 |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted 
MiddleManagers and Indexers per emission period. This metric is only available 
if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch 
indexing task waited for newly created segments to become available for 
querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
 
 ## Shuffle metrics (Native parallel task)
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 7a6b8df..df479a5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -53,6 +53,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -674,6 +675,14 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     }
     finally {
       segmentAvailabilityWaitTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+      toolbox.getEmitter().emit(
+          new ServiceMetricEvent.Builder()
+              .setDimension("dataSource", getDataSource())
+              .setDimension("taskType", getType())
+              .setDimension("taskId", getId())
+              .setDimension("segmentAvailabilityConfirmed", 
segmentAvailabilityConfirmationCompleted)
+              .build("task/segmentAvailability/wait/time", 
segmentAvailabilityWaitTimeMs)
+      );
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 4cce707..b714947 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -90,6 +92,7 @@ import 
org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import 
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -124,6 +127,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
@@ -1086,6 +1091,7 @@ public class IndexTaskTest extends IngestionTestBase
     
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
 
     
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
+    EasyMock.expect(mockToolbox.getEmitter()).andReturn(new 
NoopServiceEmitter()).anyTimes();
     
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
     
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
     mockNotifier.start();
@@ -1150,12 +1156,75 @@ public class IndexTaskTest extends IngestionTestBase
     EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
             .andReturn(new NoopSegmentHandoffNotifierFactory())
             .once();
+    EasyMock.expect(mockToolbox.getEmitter()).andReturn(new 
NoopServiceEmitter()).anyTimes();
+
+    
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
+
+    EasyMock.replay(mockToolbox);
+    EasyMock.replay(mockDataSegment1, mockDataSegment2);
+
+    Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, 
segmentsToWaitFor, 30000));
+    EasyMock.verify(mockToolbox);
+    EasyMock.verify(mockDataSegment1, mockDataSegment2);
+  }
+
+  @Test
+  public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws 
IOException, InterruptedException
+  {
+    final File tmpDir = temporaryFolder.newFolder();
+
+    LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
+    latchEmitter.latch = new CountDownLatch(1);
+
+    TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
+
+    DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
+    DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class);
+    List<DataSegment> segmentsToWaitFor = new ArrayList<>();
+    segmentsToWaitFor.add(mockDataSegment1);
+    segmentsToWaitFor.add(mockDataSegment2);
+
+    IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        createDefaultIngestionSpec(
+            jsonMapper,
+            tmpDir,
+            new UniformGranularitySpec(
+                Granularities.HOUR,
+                Granularities.MINUTE,
+                null
+            ),
+            null,
+            createTuningConfigWithMaxRowsPerSegment(2, true),
+            false,
+            false
+        ),
+        null
+    );
+
+    
EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once();
+    
EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once();
+    
EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
+    
EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
+    
EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once();
+    
EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once();
+    
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
+    
EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
+
+    EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
+            .andReturn(new NoopSegmentHandoffNotifierFactory())
+            .once();
+    EasyMock.expect(mockToolbox.getEmitter())
+            .andReturn(latchEmitter).anyTimes();
+
     
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
 
     EasyMock.replay(mockToolbox);
     EasyMock.replay(mockDataSegment1, mockDataSegment2);
 
     Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, 
segmentsToWaitFor, 30000));
+    latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
     EasyMock.verify(mockToolbox);
     EasyMock.verify(mockDataSegment1, mockDataSegment2);
   }
@@ -2709,6 +2778,27 @@ public class IndexTaskTest extends IngestionTestBase
     }
   }
 
+  /**
+   * Used to test that expected metric is emitted by 
AbstractBatchIndexTask#waitForSegmentAvailability
+   */
+  private static class LatchableServiceEmitter extends ServiceEmitter
+  {
+    private CountDownLatch latch;
+
+    private LatchableServiceEmitter()
+    {
+      super("", "", null);
+    }
+
+    @Override
+    public void emit(Event event)
+    {
+      if (latch != null && 
"task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
+        latch.countDown();
+      }
+    }
+  }
+
   @Test
   public void testEqualsAndHashCode()
   {
diff --git a/website/.spelling b/website/.spelling
index e093cbc..d1d736a 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1360,6 +1360,7 @@ numMetrics
 poolKind
 poolName
 remoteAddress
+segmentAvailabilityConfirmed
 serviceName
 taskStatus
 taskType

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to