This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2af0ab2425 Metric to report time spent fetching and analyzing segments 
(#14752)
2af0ab2425 is described below

commit 2af0ab24252e80cb3c2b9615500cffbfed595791
Author: Suneet Saldanha <[email protected]>
AuthorDate: Mon Aug 7 18:32:48 2023 -0700

    Metric to report time spent fetching and analyzing segments (#14752)
    
    * Metric to report time spent fetching and analyzing segments
    
    * fix test
    
    * spell check
    
    * fix tests
    
    * checkstyle
    
    * remove unused variable
    
    * Update docs/operations/metrics.md
    
    Co-authored-by: Katya Macedo  <[email protected]>
    
    * Update docs/operations/metrics.md
    
    Co-authored-by: Katya Macedo  <[email protected]>
    
    * Update docs/operations/metrics.md
    
    Co-authored-by: Katya Macedo  <[email protected]>
    
    ---------
    
    Co-authored-by: Katya Macedo <[email protected]>
---
 docs/operations/metrics.md                         | 12 ++-
 .../druid/indexing/common/task/CompactionTask.java | 32 +++++++-
 .../indexing/common/task/CompactionTaskTest.java   | 90 +++++++++++++++++-----
 .../AbstractParallelIndexSupervisorTaskTest.java   |  3 +
 4 files changed, 113 insertions(+), 24 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index c8918e3517..6383a4057c 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -154,7 +154,7 @@ If SQL is enabled, the Broker will emit the following 
metrics for SQL.
 
 ## Ingestion metrics
 
-## General native ingestion metrics
+### General native ingestion metrics
 
 |Metric|Description|Dimensions|Normal value|
 |------|-----------|----------|------------|
@@ -203,6 +203,14 @@ These metrics apply to the [Kinesis indexing 
service](../development/extensions-
 |`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the 
current message sequence number consumed by the Kinesis indexing tasks and 
latest sequence number in Kinesis across all shards. Minimum emission period 
for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up 
to max Kinesis retention period in milliseconds. |
 |`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds 
between the current message sequence number consumed by the Kinesis indexing 
tasks and latest sequence number in Kinesis. Minimum emission period for this 
metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0, 
up to max Kinesis retention period in milliseconds. |
 
+### Compaction metrics
+
+[Compaction tasks](../data-management/compaction.md) emit the following 
metrics.
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+|`compact/segmentAnalyzer/fetchAndProcessMillis`|Time taken to fetch and 
process segments to infer the schema for the compaction task to 
run.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`| Varies. A high value 
indicates compaction tasks will speed up from explicitly setting the data 
schema. |
+
 ### Other ingestion metrics
 
 Streaming ingestion tasks and certain types of
@@ -232,7 +240,7 @@ batch ingestion emit the following metrics. These metrics 
are deltas for each em
 |`ingest/notices/time`|Milliseconds taken to process a notice by the 
supervisor.|`dataSource`, `tags`| < 1s |
 |`ingest/pause/time`|Milliseconds spent by a task in a paused state without 
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
 |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
-
+|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of 
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the 
coordinator cycle time.|
 If the JVM does not support CPU time measurement for the current thread, 
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
 
 ## Indexing service
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 108d186b75..c068aa1433 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -75,6 +75,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.DimensionHandler;
 import org.apache.druid.segment.IndexIO;
@@ -103,6 +104,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -126,6 +128,8 @@ import java.util.stream.IntStream;
 public class CompactionTask extends AbstractBatchIndexTask
 {
   private static final Logger log = new Logger(CompactionTask.class);
+  private static final Clock UTC_CLOCK = Clock.systemUTC();
+
 
   /**
    * The CompactionTask creates and runs multiple IndexTask instances. When 
the {@link AppenderatorsManager}
@@ -455,6 +459,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSchema(
+        UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
         ioConfig,
@@ -465,7 +470,8 @@ public class CompactionTask extends AbstractBatchIndexTask
         metricsSpec,
         granularitySpec,
         toolbox.getCoordinatorClient(),
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        getMetricBuilder()
     );
     final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
         .range(0, ingestionSpecs.size())
@@ -562,6 +568,7 @@ public class CompactionTask extends AbstractBatchIndexTask
    */
   @VisibleForTesting
   static List<ParallelIndexIngestionSpec> createIngestionSchema(
+      final Clock clock,
       final TaskToolbox toolbox,
       final LockGranularity lockGranularityInUse,
       final CompactionIOConfig ioConfig,
@@ -572,7 +579,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       final CoordinatorClient coordinatorClient,
-      final SegmentCacheManagerFactory segmentCacheManagerFactory
+      final SegmentCacheManagerFactory segmentCacheManagerFactory,
+      final ServiceMetricEvent.Builder metricBuilder
   ) throws IOException
   {
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = 
retrieveRelevantTimelineHolders(
@@ -628,6 +636,9 @@ public class CompactionTask extends AbstractBatchIndexTask
         // creates new granularitySpec and set segmentGranularity
         Granularity segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
         final DataSchema dataSchema = createDataSchema(
+            clock,
+            toolbox.getEmitter(),
+            metricBuilder,
             segmentProvider.dataSource,
             interval,
             lazyFetchSegments(segmentsToCompact, 
toolbox.getSegmentCacheManager(), toolbox.getIndexIO()),
@@ -659,6 +670,9 @@ public class CompactionTask extends AbstractBatchIndexTask
     } else {
       // given segment granularity
       final DataSchema dataSchema = createDataSchema(
+          clock,
+          toolbox.getEmitter(),
+          metricBuilder,
           segmentProvider.dataSource,
           JodaUtils.umbrellaInterval(
               Iterables.transform(
@@ -756,6 +770,9 @@ public class CompactionTask extends AbstractBatchIndexTask
   }
 
   private static DataSchema createDataSchema(
+      Clock clock,
+      ServiceEmitter emitter,
+      ServiceMetricEvent.Builder metricBuilder,
       String dataSource,
       Interval totalInterval,
       Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> 
segments,
@@ -773,8 +790,15 @@ public class CompactionTask extends AbstractBatchIndexTask
         dimensionsSpec == null,
         metricsSpec == null
     );
-
-    existingSegmentAnalyzer.fetchAndProcessIfNeeded();
+    long start = clock.millis();
+    try {
+      existingSegmentAnalyzer.fetchAndProcessIfNeeded();
+    }
+    finally {
+      if (emitter != null) {
+        
emitter.emit(metricBuilder.build("compact/segmentAnalyzer/fetchAndProcessMillis",
 clock.millis() - start));
+      }
+    }
 
     final Granularity queryGranularityToUse;
     if (granularitySpec.getQueryGranularity() == null) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 76ea03a817..c7f6168d85 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -87,6 +87,8 @@ import 
org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.CachingEmitter;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -144,11 +146,16 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -162,6 +169,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+@RunWith(MockitoJUnitRunner.class)
 public class CompactionTaskTest
 {
   private static final long SEGMENT_SIZE_BYTES = 100;
@@ -194,6 +202,8 @@ public class CompactionTaskTest
       "Conflicting segment granularities found %s(segmentGranularity) and 
%s(granularitySpec.segmentGranularity).\n"
       + "Remove `segmentGranularity` and set the 
`granularitySpec.segmentGranularity` to the expected granularity";
 
+  private static final ServiceMetricEvent.Builder METRIC_BUILDER = new 
ServiceMetricEvent.Builder();
+
   private static Map<String, DimensionSchema> DIMENSIONS;
   private static List<AggregatorFactory> AGGREGATORS;
   private static List<DataSegment> SEGMENTS;
@@ -363,15 +373,22 @@ public class CompactionTaskTest
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
+  @Mock
+  private Clock clock;
+  private CachingEmitter emitter;
+
   @Before
   public void setup()
   {
     final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
+    emitter = new CachingEmitter();
     toolbox = makeTaskToolbox(
         new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
         testIndexIO,
-        SEGMENT_MAP
+        SEGMENT_MAP,
+        emitter
     );
+    Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
     segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
   }
 
@@ -931,6 +948,7 @@ public class CompactionTaskTest
   public void testCreateIngestionSchema() throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -941,7 +959,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1004,6 +1023,7 @@ public class CompactionTaskTest
         null
     );
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1014,7 +1034,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1078,6 +1099,7 @@ public class CompactionTaskTest
         null
     );
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1088,7 +1110,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1152,6 +1175,7 @@ public class CompactionTaskTest
         null
     );
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1162,7 +1186,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1216,6 +1241,7 @@ public class CompactionTaskTest
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1226,7 +1252,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
 
     ingestionSpecs.sort(
@@ -1260,6 +1287,7 @@ public class CompactionTaskTest
     };
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1270,7 +1298,8 @@ public class CompactionTaskTest
         customMetricsSpec,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
 
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
@@ -1297,6 +1326,7 @@ public class CompactionTaskTest
   public void testCreateIngestionSchemaWithCustomSegments() throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1307,7 +1337,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1340,6 +1371,7 @@ public class CompactionTaskTest
     // Remove one segment in the middle
     segments.remove(segments.size() / 2);
     CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1350,7 +1382,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
   }
 
@@ -1364,6 +1397,7 @@ public class CompactionTaskTest
     
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
 null));
     final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
     CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1374,7 +1408,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
   }
 
@@ -1400,6 +1435,7 @@ public class CompactionTaskTest
   public void testSegmentGranularityAndNullQueryGranularity() throws 
IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1410,7 +1446,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(new 
PeriodGranularity(Period.months(3), null, null), null, null),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1438,6 +1475,7 @@ public class CompactionTaskTest
   public void testQueryGranularityAndNullSegmentGranularity() throws 
IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1448,7 +1486,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(null, new 
PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1474,6 +1513,7 @@ public class CompactionTaskTest
   public void testQueryGranularityAndSegmentGranularityNonNull() throws 
IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1488,7 +1528,8 @@ public class CompactionTaskTest
             null
         ),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1510,12 +1551,16 @@ public class CompactionTaskTest
         new PeriodGranularity(Period.months(3), null, null),
         BatchIOConfig.DEFAULT_DROP_EXISTING
     );
+    Assert.assertEquals(10_000L, 
emitter.getLastEmittedEvent().toMap().get("value"));
+    Assert.assertEquals("compact/segmentAnalyzer/fetchAndProcessMillis", 
emitter.getLastEmittedEvent().toMap().get("metric"));
+    Assert.assertEquals("metrics", emitter.getLastEmittedEvent().getFeed());
   }
 
   @Test
   public void testNullGranularitySpec() throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1526,7 +1571,8 @@ public class CompactionTaskTest
         null,
         null,
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1553,6 +1599,7 @@ public class CompactionTaskTest
       throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1563,7 +1610,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(null, null, null),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1590,6 +1638,7 @@ public class CompactionTaskTest
       throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1600,7 +1649,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(null, null, true),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
 
     Assert.assertEquals(6, ingestionSpecs.size());
@@ -1614,6 +1664,7 @@ public class CompactionTaskTest
       throws IOException
   {
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new CompactionIOConfig(null, false, null),
@@ -1624,7 +1675,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(null, null, null),
         COORDINATOR_CLIENT,
-        segmentCacheManagerFactory
+        segmentCacheManagerFactory,
+        METRIC_BUILDER
     );
     Assert.assertEquals(6, ingestionSpecs.size());
     for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
@@ -1880,7 +1932,8 @@ public class CompactionTaskTest
   private static TaskToolbox makeTaskToolbox(
       TaskActionClient taskActionClient,
       IndexIO indexIO,
-      Map<DataSegment, File> segments
+      Map<DataSegment, File> segments,
+      CachingEmitter emitter
   )
   {
     final SegmentCacheManager segmentCacheManager = new 
NoopSegmentCacheManager()
@@ -1921,6 +1974,7 @@ public class CompactionTaskTest
         .segmentCacheManager(segmentCacheManager)
         .taskLogPusher(null)
         .attemptId("1")
+        .emitter(new ServiceEmitter("service", "host", emitter))
         .build();
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 94d286ec2b..a5c77b33bb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -76,8 +76,10 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.CachingEmitter;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
@@ -708,6 +710,7 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
         .shuffleClient(new LocalShuffleClient(intermediaryDataManager))
         .taskLogPusher(null)
         .attemptId("1")
+        .emitter(new ServiceEmitter("service", "host", new CachingEmitter()))
         .build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to