This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2af0ab2425 Metric to report time spent fetching and analyzing segments
(#14752)
2af0ab2425 is described below
commit 2af0ab24252e80cb3c2b9615500cffbfed595791
Author: Suneet Saldanha <[email protected]>
AuthorDate: Mon Aug 7 18:32:48 2023 -0700
Metric to report time spent fetching and analyzing segments (#14752)
* Metric to report time spent fetching and analyzing segments
* fix test
* spell check
* fix tests
* checkstyle
* remove unused variable
* Update docs/operations/metrics.md
Co-authored-by: Katya Macedo <[email protected]>
* Update docs/operations/metrics.md
Co-authored-by: Katya Macedo <[email protected]>
* Update docs/operations/metrics.md
Co-authored-by: Katya Macedo <[email protected]>
---------
Co-authored-by: Katya Macedo <[email protected]>
---
docs/operations/metrics.md | 12 ++-
.../druid/indexing/common/task/CompactionTask.java | 32 +++++++-
.../indexing/common/task/CompactionTaskTest.java | 90 +++++++++++++++++-----
.../AbstractParallelIndexSupervisorTaskTest.java | 3 +
4 files changed, 113 insertions(+), 24 deletions(-)
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index c8918e3517..6383a4057c 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -154,7 +154,7 @@ If SQL is enabled, the Broker will emit the following
metrics for SQL.
## Ingestion metrics
-## General native ingestion metrics
+### General native ingestion metrics
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
@@ -203,6 +203,14 @@ These metrics apply to the [Kinesis indexing
service](../development/extensions-
|`ingest/kinesis/avgLag/time`|Average lag time in milliseconds between the
current message sequence number consumed by the Kinesis indexing tasks and
latest sequence number in Kinesis across all shards. Minimum emission period
for this metric is a minute.|`dataSource`, `stream`, `tags`|Greater than 0, up
to max Kinesis retention period in milliseconds. |
|`ingest/kinesis/partitionLag/time`|Partition-wise lag time in milliseconds
between the current message sequence number consumed by the Kinesis indexing
tasks and latest sequence number in Kinesis. Minimum emission period for this
metric is a minute.|`dataSource`, `stream`, `partition`, `tags`|Greater than 0,
up to max Kinesis retention period in milliseconds. |
+### Compaction metrics
+
+[Compaction tasks](../data-management/compaction.md) emit the following
metrics.
+
+|Metric|Description|Dimensions|Normal value|
+|------|-----------|----------|------------|
+|`compact/segmentAnalyzer/fetchAndProcessMillis`|Time taken to fetch and
process segments to infer the schema for the compaction task to
run.|`dataSource`, `taskId`, `taskType`, `groupId`,`tags`| Varies. A high value
indicates compaction tasks will speed up from explicitly setting the data
schema. |
+
### Other ingestion metrics
Streaming ingestion tasks and certain types of
@@ -232,7 +240,7 @@ batch ingestion emit the following metrics. These metrics
are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the
supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without
ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
-
+|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of
segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the
coordinator cycle time.|
If the JVM does not support CPU time measurement for the current thread,
`ingest/merge/cpu` and `ingest/persists/cpu` will be 0.
## Indexing service
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 108d186b75..c068aa1433 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -75,6 +75,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
@@ -103,6 +104,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -126,6 +128,8 @@ import java.util.stream.IntStream;
public class CompactionTask extends AbstractBatchIndexTask
{
private static final Logger log = new Logger(CompactionTask.class);
+ private static final Clock UTC_CLOCK = Clock.systemUTC();
+
/**
* The CompactionTask creates and runs multiple IndexTask instances. When
the {@link AppenderatorsManager}
@@ -455,6 +459,7 @@ public class CompactionTask extends AbstractBatchIndexTask
emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSchema(
+ UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
ioConfig,
@@ -465,7 +470,8 @@ public class CompactionTask extends AbstractBatchIndexTask
metricsSpec,
granularitySpec,
toolbox.getCoordinatorClient(),
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ getMetricBuilder()
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
@@ -562,6 +568,7 @@ public class CompactionTask extends AbstractBatchIndexTask
*/
@VisibleForTesting
static List<ParallelIndexIngestionSpec> createIngestionSchema(
+ final Clock clock,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final CompactionIOConfig ioConfig,
@@ -572,7 +579,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
- final SegmentCacheManagerFactory segmentCacheManagerFactory
+ final SegmentCacheManagerFactory segmentCacheManagerFactory,
+ final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
retrieveRelevantTimelineHolders(
@@ -628,6 +636,9 @@ public class CompactionTask extends AbstractBatchIndexTask
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse =
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
+ clock,
+ toolbox.getEmitter(),
+ metricBuilder,
segmentProvider.dataSource,
interval,
lazyFetchSegments(segmentsToCompact,
toolbox.getSegmentCacheManager(), toolbox.getIndexIO()),
@@ -659,6 +670,9 @@ public class CompactionTask extends AbstractBatchIndexTask
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
+ clock,
+ toolbox.getEmitter(),
+ metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
@@ -756,6 +770,9 @@ public class CompactionTask extends AbstractBatchIndexTask
}
private static DataSchema createDataSchema(
+ Clock clock,
+ ServiceEmitter emitter,
+ ServiceMetricEvent.Builder metricBuilder,
String dataSource,
Interval totalInterval,
Iterable<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>>
segments,
@@ -773,8 +790,15 @@ public class CompactionTask extends AbstractBatchIndexTask
dimensionsSpec == null,
metricsSpec == null
);
-
- existingSegmentAnalyzer.fetchAndProcessIfNeeded();
+ long start = clock.millis();
+ try {
+ existingSegmentAnalyzer.fetchAndProcessIfNeeded();
+ }
+ finally {
+ if (emitter != null) {
+
emitter.emit(metricBuilder.build("compact/segmentAnalyzer/fetchAndProcessMillis",
clock.millis() - start));
+ }
+ }
final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 76ea03a817..c7f6168d85 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -87,6 +87,8 @@ import
org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.CachingEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
@@ -144,11 +146,16 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -162,6 +169,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+@RunWith(MockitoJUnitRunner.class)
public class CompactionTaskTest
{
private static final long SEGMENT_SIZE_BYTES = 100;
@@ -194,6 +202,8 @@ public class CompactionTaskTest
"Conflicting segment granularities found %s(segmentGranularity) and
%s(granularitySpec.segmentGranularity).\n"
+ "Remove `segmentGranularity` and set the
`granularitySpec.segmentGranularity` to the expected granularity";
+ private static final ServiceMetricEvent.Builder METRIC_BUILDER = new
ServiceMetricEvent.Builder();
+
private static Map<String, DimensionSchema> DIMENSIONS;
private static List<AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
@@ -363,15 +373,22 @@ public class CompactionTaskTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
+ @Mock
+ private Clock clock;
+ private CachingEmitter emitter;
+
@Before
public void setup()
{
final IndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
+ emitter = new CachingEmitter();
toolbox = makeTaskToolbox(
new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
testIndexIO,
- SEGMENT_MAP
+ SEGMENT_MAP,
+ emitter
);
+ Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(OBJECT_MAPPER);
}
@@ -931,6 +948,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchema() throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -941,7 +959,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1004,6 +1023,7 @@ public class CompactionTaskTest
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1014,7 +1034,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1078,6 +1099,7 @@ public class CompactionTaskTest
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1088,7 +1110,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1152,6 +1175,7 @@ public class CompactionTaskTest
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1162,7 +1186,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1216,6 +1241,7 @@ public class CompactionTaskTest
);
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1226,7 +1252,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
ingestionSpecs.sort(
@@ -1260,6 +1287,7 @@ public class CompactionTaskTest
};
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1270,7 +1298,8 @@ public class CompactionTaskTest
customMetricsSpec,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1297,6 +1326,7 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithCustomSegments() throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1307,7 +1337,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1340,6 +1371,7 @@ public class CompactionTaskTest
// Remove one segment in the middle
segments.remove(segments.size() / 2);
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1350,7 +1382,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
}
@@ -1364,6 +1397,7 @@ public class CompactionTaskTest
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
null));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1374,7 +1408,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
}
@@ -1400,6 +1435,7 @@ public class CompactionTaskTest
public void testSegmentGranularityAndNullQueryGranularity() throws
IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1410,7 +1446,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(new
PeriodGranularity(Period.months(3), null, null), null, null),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1438,6 +1475,7 @@ public class CompactionTaskTest
public void testQueryGranularityAndNullSegmentGranularity() throws
IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1448,7 +1486,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(null, new
PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1474,6 +1513,7 @@ public class CompactionTaskTest
public void testQueryGranularityAndSegmentGranularityNonNull() throws
IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1488,7 +1528,8 @@ public class CompactionTaskTest
null
),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1510,12 +1551,16 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
BatchIOConfig.DEFAULT_DROP_EXISTING
);
+ Assert.assertEquals(10_000L,
emitter.getLastEmittedEvent().toMap().get("value"));
+ Assert.assertEquals("compact/segmentAnalyzer/fetchAndProcessMillis",
emitter.getLastEmittedEvent().toMap().get("metric"));
+ Assert.assertEquals("metrics", emitter.getLastEmittedEvent().getFeed());
}
@Test
public void testNullGranularitySpec() throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1526,7 +1571,8 @@ public class CompactionTaskTest
null,
null,
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1553,6 +1599,7 @@ public class CompactionTaskTest
throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1563,7 +1610,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1590,6 +1638,7 @@ public class CompactionTaskTest
throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1600,7 +1649,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(null, null, true),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
Assert.assertEquals(6, ingestionSpecs.size());
@@ -1614,6 +1664,7 @@ public class CompactionTaskTest
throws IOException
{
final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ clock,
toolbox,
LockGranularity.TIME_CHUNK,
new CompactionIOConfig(null, false, null),
@@ -1624,7 +1675,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(null, null, null),
COORDINATOR_CLIENT,
- segmentCacheManagerFactory
+ segmentCacheManagerFactory,
+ METRIC_BUILDER
);
Assert.assertEquals(6, ingestionSpecs.size());
for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
@@ -1880,7 +1932,8 @@ public class CompactionTaskTest
private static TaskToolbox makeTaskToolbox(
TaskActionClient taskActionClient,
IndexIO indexIO,
- Map<DataSegment, File> segments
+ Map<DataSegment, File> segments,
+ CachingEmitter emitter
)
{
final SegmentCacheManager segmentCacheManager = new
NoopSegmentCacheManager()
@@ -1921,6 +1974,7 @@ public class CompactionTaskTest
.segmentCacheManager(segmentCacheManager)
.taskLogPusher(null)
.attemptId("1")
+ .emitter(new ServiceEmitter("service", "host", emitter))
.build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 94d286ec2b..a5c77b33bb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -76,8 +76,10 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.query.CachingEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
@@ -708,6 +710,7 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
.shuffleClient(new LocalShuffleClient(intermediaryDataManager))
.taskLogPusher(null)
.attemptId("1")
+ .emitter(new ServiceEmitter("service", "host", new CachingEmitter()))
.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]