This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 761fe9f Add new metric that quantifies how long batch ingest jobs
waited for segment availability and whether or not that wait was successful
(#12002)
761fe9f is described below
commit 761fe9f144d56e8c7c2f0a8e4838dc3f2c2d5d31
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri Dec 10 11:40:52 2021 -0600
Add new metric that quantifies how long batch ingest jobs waited for
segment availability and whether or not that wait was successful (#12002)
* add a unit test that tests that new metric is emitted
* remove unused import
* clarify in doc that this is for batch tasks
* fix IndexTaskTest
---
docs/operations/metrics.md | 1 +
.../common/task/AbstractBatchIndexTask.java | 9 +++
.../druid/indexing/common/task/IndexTaskTest.java | 90 ++++++++++++++++++++++
website/.spelling | 1 +
4 files changed, 101 insertions(+)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index dc93947..b525424 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for
the current thread, i
|`taskSlot/used/count`|Number of busy task slots per emission period. This
metric is only available if the TaskSlotCountStatsMonitor module is
included.|category.|Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked
MiddleManagers and Indexers per emission period. This metric is only available
if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted
MiddleManagers and Indexers per emission period. This metric is only available
if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
+|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch
indexing task waited for newly created segments to become available for
querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
## Shuffle metrics (Native parallel task)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 7a6b8df..df479a5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -53,6 +53,7 @@ import
org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -674,6 +675,14 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
}
finally {
segmentAvailabilityWaitTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ toolbox.getEmitter().emit(
+ new ServiceMetricEvent.Builder()
+ .setDimension("dataSource", getDataSource())
+ .setDimension("taskType", getType())
+ .setDimension("taskId", getId())
+ .setDimension("segmentAvailabilityConfirmed",
segmentAvailabilityConfirmationCompleted)
+ .build("task/segmentAvailability/wait/time",
segmentAvailabilityWaitTimeMs)
+ );
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 4cce707..b714947 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -90,6 +92,7 @@ import
org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import
org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -124,6 +127,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
@@ -1086,6 +1091,7 @@ public class IndexTaskTest extends IngestionTestBase
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
+ EasyMock.expect(mockToolbox.getEmitter()).andReturn(new
NoopServiceEmitter()).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
mockNotifier.start();
@@ -1150,12 +1156,75 @@ public class IndexTaskTest extends IngestionTestBase
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
+ EasyMock.expect(mockToolbox.getEmitter()).andReturn(new
NoopServiceEmitter()).anyTimes();
+
+
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
+
+ EasyMock.replay(mockToolbox);
+ EasyMock.replay(mockDataSegment1, mockDataSegment2);
+
+ Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox,
segmentsToWaitFor, 30000));
+ EasyMock.verify(mockToolbox);
+ EasyMock.verify(mockDataSegment1, mockDataSegment2);
+ }
+
+ @Test
+ public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws
IOException, InterruptedException
+ {
+ final File tmpDir = temporaryFolder.newFolder();
+
+ LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
+ latchEmitter.latch = new CountDownLatch(1);
+
+ TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
+
+ DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
+ DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class);
+ List<DataSegment> segmentsToWaitFor = new ArrayList<>();
+ segmentsToWaitFor.add(mockDataSegment1);
+ segmentsToWaitFor.add(mockDataSegment2);
+
+ IndexTask indexTask = new IndexTask(
+ null,
+ null,
+ createDefaultIngestionSpec(
+ jsonMapper,
+ tmpDir,
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.MINUTE,
+ null
+ ),
+ null,
+ createTuningConfigWithMaxRowsPerSegment(2, true),
+ false,
+ false
+ ),
+ null
+ );
+
+
EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once();
+
EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once();
+
EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
+
EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
+
EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once();
+
EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once();
+
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
+
EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
+
+ EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
+ .andReturn(new NoopSegmentHandoffNotifierFactory())
+ .once();
+ EasyMock.expect(mockToolbox.getEmitter())
+ .andReturn(latchEmitter).anyTimes();
+
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);
Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox,
segmentsToWaitFor, 30000));
+ latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}
@@ -2709,6 +2778,27 @@ public class IndexTaskTest extends IngestionTestBase
}
}
+ /**
+ * Used to test that expected metric is emitted by
AbstractBatchIndexTask#waitForSegmentAvailability
+ */
+ private static class LatchableServiceEmitter extends ServiceEmitter
+ {
+ private CountDownLatch latch;
+
+ private LatchableServiceEmitter()
+ {
+ super("", "", null);
+ }
+
+ @Override
+ public void emit(Event event)
+ {
+ if (latch != null &&
"task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
+ latch.countDown();
+ }
+ }
+ }
+
@Test
public void testEqualsAndHashCode()
{
diff --git a/website/.spelling b/website/.spelling
index e093cbc..d1d736a 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1360,6 +1360,7 @@ numMetrics
poolKind
poolName
remoteAddress
+segmentAvailabilityConfirmed
serviceName
taskStatus
taskType
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]