This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a618c5dd0de Refactor: Miscellaneous batch task cleanup (#16730)
a618c5dd0de is described below
commit a618c5dd0dea8917986166e1d4d988f7101ec88d
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 12 19:42:51 2024 -0700
Refactor: Miscellaneous batch task cleanup (#16730)
Changes
- No functional change
- Remove unused method `IndexTuningConfig.withPartitionsSpec()`
- Remove unused method `ParallelIndexTuningConfig.withPartitionsSpec()`
- Remove redundant method `CompactTask.emitIngestionModeMetrics()`
- Remove Clock argument from
`CompactionTask.createDataSchemasForInterval()` as it was only needed
for one test which was just verifying the value passed by the test itself.
The code now uses a `Stopwatch`
instead and test simply verifies that the metric has been emitted.
- Other minor cleanup changes
---
.../msq/indexing/MSQCompactionRunnerTest.java | 1 -
.../druid/indexing/common/task/CompactionTask.java | 70 ++++++----------
.../druid/indexing/common/task/IndexTask.java | 25 ------
.../common/task/NativeCompactionRunner.java | 14 ++--
.../batch/parallel/ParallelIndexTuningConfig.java | 39 ---------
.../task/ClientCompactionTaskQuerySerdeTest.java | 5 +-
.../common/task/CompactionTaskParallelRunTest.java | 45 ++++------
.../common/task/CompactionTaskRunTest.java | 66 +++++----------
.../indexing/common/task/CompactionTaskTest.java | 96 ++++------------------
.../AbstractParallelIndexSupervisorTaskTest.java | 20 -----
.../ParallelIndexSupervisorTaskKillTest.java | 35 ++------
.../ParallelIndexSupervisorTaskResourceTest.java | 20 +----
.../task/batch/parallel/PartialCompactionTest.java | 3 +-
13 files changed, 104 insertions(+), 335 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 35eca8cfcb4..6f1a4396ada 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -369,7 +369,6 @@ public class MSQCompactionRunnerTest
new ClientCompactionTaskTransformSpec(dimFilter);
final CompactionTask.Builder builder = new CompactionTask.Builder(
DATA_SOURCE,
- null,
null
);
IndexSpec indexSpec = createIndexSpec();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index fe4d09d8481..8659eb0f397 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -48,7 +48,6 @@ import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -62,6 +61,7 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -100,7 +100,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -125,8 +124,6 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
{
public static final String TYPE = "compact";
private static final Logger log = new Logger(CompactionTask.class);
- private static final Clock UTC_CLOCK = Clock.systemUTC();
-
/**
* The CompactionTask creates and runs multiple IndexTask instances. When
the {@link AppenderatorsManager}
@@ -449,27 +446,12 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}
- @VisibleForTesting
- void emitCompactIngestionModeMetrics(
- ServiceEmitter emitter,
- boolean isDropExisting
- )
- {
-
- if (emitter == null) {
- return;
- }
- emitMetric(emitter, "ingest/count", 1);
- }
-
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
- // emit metric for compact ingestion mode:
- emitCompactIngestionModeMetrics(toolbox.getEmitter(),
ioConfig.isDropExisting());
+ emitMetric(toolbox.getEmitter(), "ingest/count", 1);
final Map<Interval, DataSchema> intervalDataSchemas =
createDataSchemasForIntervals(
- UTC_CLOCK,
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
@@ -489,13 +471,13 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
}
/**
- * Generate dataschema for segments in each interval
- * @return
- * @throws IOException
+ * Generate dataschema for segments in each interval.
+ *
+ * @throws IOException if an exception occurs whie retrieving used segments
to
+ * determine schemas.
*/
@VisibleForTesting
static Map<Interval, DataSchema> createDataSchemasForIntervals(
- final Clock clock,
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
@@ -506,13 +488,13 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
final ServiceMetricEvent.Builder metricBuilder
) throws IOException
{
- final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
retrieveRelevantTimelineHolders(
+ final Iterable<DataSegment> timelineSegments =
retrieveRelevantTimelineHolders(
toolbox,
segmentProvider,
lockGranularityInUse
);
- if (timelineSegments.isEmpty()) {
+ if (!timelineSegments.iterator().hasNext()) {
return Collections.emptyMap();
}
@@ -524,7 +506,7 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
Comparators.intervalsByStartThenEnd()
);
- for (final DataSegment dataSegment :
VersionedIntervalTimeline.getAllObjects(timelineSegments)) {
+ for (final DataSegment dataSegment : timelineSegments) {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new
ArrayList<>())
.add(dataSegment);
}
@@ -557,7 +539,6 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse =
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
final DataSchema dataSchema = createDataSchema(
- clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
@@ -576,18 +557,17 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
} else {
// given segment granularity
final DataSchema dataSchema = createDataSchema(
- clock,
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(
Iterables.transform(
- VersionedIntervalTimeline.getAllObjects(timelineSegments),
+ timelineSegments,
DataSegment::getInterval
)
),
lazyFetchSegments(
- VersionedIntervalTimeline.getAllObjects(timelineSegments),
+ timelineSegments,
toolbox.getSegmentCacheManager(),
toolbox.getIndexIO()
),
@@ -600,7 +580,7 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
}
}
- private static List<TimelineObjectHolder<String, DataSegment>>
retrieveRelevantTimelineHolders(
+ private static Iterable<DataSegment> retrieveRelevantTimelineHolders(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
LockGranularity lockGranularityInUse
@@ -612,11 +592,10 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments =
SegmentTimeline
.forSegments(usedSegments)
.lookup(segmentProvider.interval);
- return timelineSegments;
+ return VersionedIntervalTimeline.getAllObjects(timelineSegments);
}
private static DataSchema createDataSchema(
- Clock clock,
ServiceEmitter emitter,
ServiceMetricEvent.Builder metricBuilder,
String dataSource,
@@ -636,24 +615,30 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
dimensionsSpec == null,
metricsSpec == null
);
- long start = clock.millis();
+
+ final Stopwatch stopwatch = Stopwatch.createStarted();
try {
existingSegmentAnalyzer.fetchAndProcessIfNeeded();
}
finally {
if (emitter != null) {
-
emitter.emit(metricBuilder.setMetric("compact/segmentAnalyzer/fetchAndProcessMillis",
clock.millis() - start));
+ emitter.emit(
+ metricBuilder.setMetric(
+ "compact/segmentAnalyzer/fetchAndProcessMillis",
+ stopwatch.millisElapsed()
+ )
+ );
}
}
final Granularity queryGranularityToUse;
if (granularitySpec.getQueryGranularity() == null) {
queryGranularityToUse = existingSegmentAnalyzer.getQueryGranularity();
- log.info("Generate compaction task spec with segments original query
granularity [%s]", queryGranularityToUse);
+ log.info("Generate compaction task spec with segments original query
granularity[%s]", queryGranularityToUse);
} else {
queryGranularityToUse = granularitySpec.getQueryGranularity();
log.info(
- "Generate compaction task spec with new query granularity overrided
from input [%s]",
+ "Generate compaction task spec with new query granularity overrided
from input[%s].",
queryGranularityToUse
);
}
@@ -1033,7 +1018,6 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
{
private final String dataSource;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
- private final RetryPolicyFactory retryPolicyFactory;
private CompactionIOConfig ioConfig;
@Nullable
@@ -1054,13 +1038,11 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
public Builder(
String dataSource,
- SegmentCacheManagerFactory segmentCacheManagerFactory,
- RetryPolicyFactory retryPolicyFactory
+ SegmentCacheManagerFactory segmentCacheManagerFactory
)
{
this.dataSource = dataSource;
this.segmentCacheManagerFactory = segmentCacheManagerFactory;
- this.retryPolicyFactory = retryPolicyFactory;
}
public Builder interval(Interval interval)
@@ -1288,7 +1270,9 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
);
}
- @Override
+ /**
+ * Creates a copy of this tuning config with the partition spec changed.
+ */
public CompactionTuningConfig withPartitionsSpec(PartitionsSpec
partitionsSpec)
{
return new CompactionTuningConfig(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index cc253f46a52..dc6c07b6b83 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1471,31 +1471,6 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler, Pe
);
}
- public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
- {
- return new IndexTuningConfig(
- appendableIndexSpec,
- maxRowsInMemory,
- maxBytesInMemory,
- skipBytesInMemoryOverheadCheck,
- partitionsSpec,
- indexSpec,
- indexSpecForIntermediatePersists,
- maxPendingPersists,
- forceGuaranteedRollup,
- reportParseExceptions,
- pushTimeout,
- basePersistDirectory,
- segmentWriteOutMediumFactory,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions,
- maxColumnsToMerge,
- awaitSegmentAvailabilityTimeoutMillis,
- numPersistThreads
- );
- }
-
@JsonProperty
@Override
public AppendableIndexSpec getAppendableIndexSpec()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 722c6010b20..f2eacb8c1c6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -56,11 +56,14 @@ import java.util.stream.IntStream;
public class NativeCompactionRunner implements CompactionRunner
{
- private static final Logger log = new Logger(NativeCompactionRunner.class);
public static final String TYPE = "native";
+
+ private static final Logger log = new Logger(NativeCompactionRunner.class);
private static final boolean STORE_COMPACTION_STATE = true;
+
@JsonIgnore
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
+
@JsonIgnore
private final CurrentSubTaskHolder currentSubTaskHolder = new
CurrentSubTaskHolder(
(taskObject, config) -> {
@@ -183,7 +186,6 @@ public class NativeCompactionRunner implements
CompactionRunner
final PartitionConfigurationManager partitionConfigurationManager =
new
NativeCompactionRunner.PartitionConfigurationManager(compactionTask.getTuningConfig());
-
final List<ParallelIndexIngestionSpec> ingestionSpecs =
createIngestionSpecs(
intervalDataSchemaMap,
taskToolbox,
@@ -278,8 +280,11 @@ public class NativeCompactionRunner implements
CompactionRunner
return failCnt == 0 ? TaskStatus.success(compactionTaskId) :
TaskStatus.failure(compactionTaskId, msg);
}
- @VisibleForTesting
- ParallelIndexSupervisorTask newTask(CompactionTask compactionTask, String
baseSequenceName, ParallelIndexIngestionSpec ingestionSpec)
+ private ParallelIndexSupervisorTask newTask(
+ CompactionTask compactionTask,
+ String baseSequenceName,
+ ParallelIndexIngestionSpec ingestionSpec
+ )
{
return new ParallelIndexSupervisorTask(
compactionTask.getId(),
@@ -305,7 +310,6 @@ public class NativeCompactionRunner implements
CompactionRunner
@VisibleForTesting
static class PartitionConfigurationManager
{
- @Nullable
private final CompactionTask.CompactionTuningConfig tuningConfig;
PartitionConfigurationManager(@Nullable
CompactionTask.CompactionTuningConfig tuningConfig)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 7e33261ee7c..d97d9beff34 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -275,45 +275,6 @@ public class ParallelIndexTuningConfig extends
IndexTuningConfig
return maxAllowedLockCount;
}
- @Override
- public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec
partitionsSpec)
- {
- return new ParallelIndexTuningConfig(
- null,
- null,
- getAppendableIndexSpec(),
- getMaxRowsInMemory(),
- getMaxBytesInMemory(),
- isSkipBytesInMemoryOverheadCheck(),
- null,
- null,
- getSplitHintSpec(),
- partitionsSpec,
- getIndexSpec(),
- getIndexSpecForIntermediatePersists(),
- getMaxPendingPersists(),
- isForceGuaranteedRollup(),
- isReportParseExceptions(),
- getPushTimeout(),
- getSegmentWriteOutMediumFactory(),
- null,
- getMaxNumConcurrentSubTasks(),
- getMaxRetry(),
- getTaskStatusCheckPeriodMs(),
- getChatHandlerTimeout(),
- getChatHandlerNumRetries(),
- getMaxNumSegmentsToMerge(),
- getTotalNumMergeTasks(),
- isLogParseExceptions(),
- getMaxParseExceptions(),
- getMaxSavedParseExceptions(),
- getMaxColumnsToMerge(),
- getAwaitSegmentAvailabilityTimeoutMillis(),
- getMaxAllowedLockCount(),
- getNumPersistThreads()
- );
- }
-
@Override
public boolean equals(Object o)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 3d6c8085c98..fcded3ab9ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -43,8 +43,6 @@ import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TestUtils;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@@ -340,8 +338,7 @@ public class ClientCompactionTaskQuerySerdeTest
{
CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder(
"datasource",
- new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER),
- new RetryPolicyFactory(new RetryPolicyConfig())
+ new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER)
)
.inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"), true)
.tuningConfig(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index ba9a6e3e2be..188ea3cdd07 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -41,8 +41,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
@@ -114,7 +112,6 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
}
private static final String DATA_SOURCE = "test";
- private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new
RetryPolicyFactory(new RetryPolicyConfig());
private static final Interval INTERVAL_TO_INDEX =
Intervals.of("2014-01-01/2014-01-02");
private final LockGranularity lockGranularity;
@@ -160,8 +157,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -215,8 +211,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -281,8 +276,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -332,8 +326,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
@@ -395,8 +388,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -449,8 +441,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -500,8 +491,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -551,8 +541,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -582,8 +571,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -639,8 +627,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -702,8 +689,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -752,8 +738,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
runIndexTask(null, true);
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -844,8 +829,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
// Set the dropExisting flag to true in the IOConfig of the compaction
task
@@ -891,8 +875,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final Builder builder = new Builder(
DATA_SOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 599a24fac80..54902d5f7c6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -45,8 +45,6 @@ import
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
@@ -180,7 +178,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
private static final String DATA_SOURCE = "test";
- private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new
RetryPolicyFactory(new RetryPolicyConfig());
private final OverlordClient overlordClient;
private final CoordinatorClient coordinatorClient;
private final SegmentCacheManagerFactory segmentCacheManagerFactory;
@@ -284,8 +281,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -352,8 +348,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -452,8 +447,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask1 = builder
@@ -547,8 +541,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -659,8 +652,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity
@@ -729,8 +721,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask1 = builder
@@ -764,8 +755,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity
@@ -809,8 +799,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity
@@ -869,8 +858,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity
@@ -935,8 +923,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity
@@ -1004,8 +991,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day queryGranularity
@@ -1058,8 +1044,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// day segmentGranularity and day queryGranularity
@@ -1097,8 +1082,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask1 = builder
@@ -1150,8 +1134,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -1212,8 +1195,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// Setup partial compaction:
@@ -1368,8 +1350,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
// Setup partial interval compaction:
@@ -1476,8 +1457,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final Interval partialInterval =
Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
@@ -1548,8 +1528,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -1603,8 +1582,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -1694,8 +1672,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
@@ -1826,8 +1803,7 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask compactionTask = builder
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 48a7932a241..b138a25469f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -57,8 +57,6 @@ import
org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
@@ -84,8 +82,6 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -148,15 +144,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -198,7 +191,6 @@ public class CompactionTaskTest
private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
private static final CoordinatorClient COORDINATOR_CLIENT = new
TestCoordinatorClient(SEGMENT_MAP);
private static final ObjectMapper OBJECT_MAPPER =
setupInjectablesInObjectMapper(new DefaultObjectMapper());
- private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new
RetryPolicyFactory(new RetryPolicyConfig());
private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT =
"Conflicting segment granularities found %s(segmentGranularity) and
%s(granularitySpec.segmentGranularity).\n"
+ "Remove `segmentGranularity` and set the
`granularitySpec.segmentGranularity` to the expected granularity";
@@ -375,8 +367,6 @@ public class CompactionTaskTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
- @Mock
- private Clock clock;
private StubServiceEmitter emitter;
@Before
@@ -389,7 +379,6 @@ public class CompactionTaskTest
testIndexIO,
SEGMENT_MAP
);
- Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
segmentCacheManagerFactory = new
SegmentCacheManagerFactory(TestIndex.INDEX_IO, OBJECT_MAPPER);
}
@@ -398,8 +387,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -408,8 +396,7 @@ public class CompactionTaskTest
final Builder builder2 = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder2.tuningConfig(createTuningConfig());
@@ -421,33 +408,12 @@ public class CompactionTaskTest
);
}
- @Test
- public void testCompactionTaskEmitter()
- {
- final Builder builder = new Builder(
- DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
- );
- builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
- builder.tuningConfig(createTuningConfig());
- builder.segmentGranularity(Granularities.HOUR);
- final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
-
- // null emitter should work
- taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(null,
false);
- // non-null should also work
- ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new
NoopEmitter());
-
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter,
false);
-
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter,
true);
- }
-
@Test(expected = IAE.class)
public void
testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -477,7 +443,7 @@ public class CompactionTaskTest
new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1",
"foo", null));
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -495,8 +461,7 @@ public class CompactionTaskTest
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new
CountAggregatorFactory("cnt")};
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -513,8 +478,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -542,8 +506,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
@@ -558,8 +521,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
.inputSpec(
@@ -579,8 +541,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
.segments(SEGMENTS)
@@ -598,8 +559,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
@@ -675,14 +635,12 @@ public class CompactionTaskTest
toolbox.getRowIngestionMetersFactory(),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY,
toolbox.getAppenderatorsManager()
);
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask expectedFromJson = builder
@@ -702,8 +660,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory,
- RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
.inputSpec(
@@ -910,7 +867,6 @@ public class CompactionTaskTest
public void testCreateIngestionSchema() throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -992,7 +948,6 @@ public class CompactionTaskTest
null
);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1075,7 +1030,6 @@ public class CompactionTaskTest
null
);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1158,7 +1112,6 @@ public class CompactionTaskTest
null
);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1229,7 +1182,6 @@ public class CompactionTaskTest
);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1280,7 +1232,6 @@ public class CompactionTaskTest
};
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1324,7 +1275,6 @@ public class CompactionTaskTest
public void testCreateIngestionSchemaWithCustomSegments() throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1375,7 +1325,6 @@ public class CompactionTaskTest
// Remove one segment in the middle
segments.remove(segments.size() / 2);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE,
SpecificSegmentsSpec.fromSegments(segments)),
@@ -1406,7 +1355,6 @@ public class CompactionTaskTest
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
null));
final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE,
SpecificSegmentsSpec.fromSegments(segments)),
@@ -1435,7 +1383,7 @@ public class CompactionTaskTest
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
@SuppressWarnings("unused")
@@ -1448,7 +1396,6 @@ public class CompactionTaskTest
public void testSegmentGranularityAndNullQueryGranularity() throws
IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1493,7 +1440,6 @@ public class CompactionTaskTest
public void testQueryGranularityAndNullSegmentGranularity() throws
IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1535,7 +1481,6 @@ public class CompactionTaskTest
public void testQueryGranularityAndSegmentGranularityNonNull() throws
IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1580,14 +1525,13 @@ public class CompactionTaskTest
new PeriodGranularity(Period.months(3), null, null),
BatchIOConfig.DEFAULT_DROP_EXISTING
);
- emitter.verifyValue("compact/segmentAnalyzer/fetchAndProcessMillis",
10_000L);
+ emitter.verifyEmitted("compact/segmentAnalyzer/fetchAndProcessMillis", 1);
}
@Test
public void testNullGranularitySpec() throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1633,7 +1577,6 @@ public class CompactionTaskTest
throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1679,7 +1622,6 @@ public class CompactionTaskTest
throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1710,7 +1652,6 @@ public class CompactionTaskTest
throws IOException
{
final Map<Interval, DataSchema> dataSchemasForIntervals =
CompactionTask.createDataSchemasForIntervals(
- clock,
toolbox,
LockGranularity.TIME_CHUNK,
new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1752,7 +1693,7 @@ public class CompactionTaskTest
Granularities.ALL,
Granularities.MINUTE
);
-
Assert.assertTrue(Granularities.SECOND.equals(chooseFinestGranularityHelper(input)));
+ Assert.assertEquals(Granularities.SECOND,
chooseFinestGranularityHelper(input));
}
@Test
@@ -1769,7 +1710,7 @@ public class CompactionTaskTest
Granularities.NONE,
Granularities.MINUTE
);
-
Assert.assertTrue(Granularities.NONE.equals(chooseFinestGranularityHelper(input)));
+ Assert.assertEquals(Granularities.NONE,
chooseFinestGranularityHelper(input));
}
@Test
@@ -1789,7 +1730,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
.interval(Intervals.of("2000-01-01/2000-01-02"))
@@ -1802,7 +1743,7 @@ public class CompactionTaskTest
{
final Builder builder = new Builder(
DATA_SOURCE,
- segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+ segmentCacheManagerFactory
);
final CompactionTask task = builder
.interval(Intervals.of("2000-01-01/2000-01-02"))
@@ -2270,7 +2211,6 @@ public class CompactionTaskTest
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
- @JacksonInject RetryPolicyFactory retryPolicyFactory,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index ecc4f702d6a..8f68846cd64 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -64,7 +64,6 @@ import
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
@@ -754,25 +753,6 @@ public class AbstractParallelIndexSupervisorTaskTest
extends IngestionTestBase
.build();
}
- static class TestParallelIndexSupervisorTask extends
ParallelIndexSupervisorTask
- {
- TestParallelIndexSupervisorTask(
- String id,
- TaskResource taskResource,
- ParallelIndexIngestionSpec ingestionSchema,
- Map<String, Object> context
- )
- {
- super(
- id,
- null,
- taskResource,
- ingestionSchema,
- context
- );
- }
- }
-
static class LocalShuffleClient implements
ShuffleClient<GenericPartitionLocation>
{
private final IntermediaryDataManager intermediaryDataManager;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index a32aed819e0..14e1bab9540 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -40,13 +40,10 @@ import
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
-import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -59,8 +56,6 @@ import java.util.stream.Stream;
public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSupervisorTaskTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
public ParallelIndexSupervisorTaskKillTest()
{
@@ -81,7 +76,7 @@ public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSu
Intervals.of("2017/2018"),
new ParallelIndexIOConfig(
null,
- // Sub tasks would run forever
+ // Sub-tasks would run forever
new TestInputSource(Pair.of(new TestInput(Integer.MAX_VALUE,
TaskState.SUCCESS), 4)),
new NoopInputFormat(),
false,
@@ -93,16 +88,12 @@ public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSu
Thread.sleep(100);
}
task.stopGracefully(null);
- expectedException.expect(RuntimeException.class);
-
expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class));
- getIndexingServiceClient().waitToFinish(task, 3000L,
TimeUnit.MILLISECONDS);
- final SinglePhaseParallelIndexTaskRunner runner =
(SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();
- Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
- // completeSubTaskSpecs should be empty because no task has reported its
status to TaskMonitor
- Assert.assertTrue(runner.getCompleteSubTaskSpecs().isEmpty());
-
- Assert.assertEquals(4, runner.getTaskMonitor().getNumCanceledTasks());
+ Exception e = Assert.assertThrows(
+ RuntimeException.class,
+ () -> getIndexingServiceClient().waitToFinish(task, 3000L,
TimeUnit.MILLISECONDS)
+ );
+ Assert.assertTrue(e.getCause() instanceof ExecutionException);
}
@Test(timeout = 5000L)
@@ -273,28 +264,20 @@ public class ParallelIndexSupervisorTaskKillTest extends
AbstractParallelIndexSu
}
}
- private static class TestSupervisorTask extends
TestParallelIndexSupervisorTask
+ private static class TestSupervisorTask extends ParallelIndexSupervisorTask
{
private TestSupervisorTask(
ParallelIndexIngestionSpec ingestionSchema,
Map<String, Object> context
)
{
- super(
- null,
- null,
- ingestionSchema,
- context
- );
+ super(null, null, null, ingestionSchema, context);
}
@Override
SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox
toolbox)
{
- return new TestRunner(
- toolbox,
- this
- );
+ return new TestRunner(toolbox, this);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 772bdafb2b1..01d502de85c 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -290,19 +290,11 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
Assert.assertEquals(200, response.getStatus());
final ParallelIndexingPhaseProgress monitorStatus =
(ParallelIndexingPhaseProgress) response.getEntity();
- // numRunningTasks
+ // Verify the number of tasks in different states
Assert.assertEquals(runningTasks.size(), monitorStatus.getRunning());
-
- // numSucceededTasks
Assert.assertEquals(expectedSucceededTasks, monitorStatus.getSucceeded());
-
- // numFailedTasks
Assert.assertEquals(expectedFailedTask, monitorStatus.getFailed());
-
- // numCompleteTasks
Assert.assertEquals(expectedSucceededTasks + expectedFailedTask,
monitorStatus.getComplete());
-
- // numTotalTasks
Assert.assertEquals(runningTasks.size() + expectedSucceededTasks +
expectedFailedTask, monitorStatus.getTotal());
// runningSubTasks
@@ -407,7 +399,6 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
ParallelIndexIOConfig ioConfig
)
{
- // set up ingestion spec
final ParallelIndexIngestionSpec ingestionSpec = new
ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
@@ -460,7 +451,6 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
)
);
- // set up test tools
return new TestSupervisorTask(
null,
null,
@@ -503,7 +493,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
}
}
- private class TestSupervisorTask extends TestParallelIndexSupervisorTask
+ private class TestSupervisorTask extends ParallelIndexSupervisorTask
{
TestSupervisorTask(
String id,
@@ -514,6 +504,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
{
super(
id,
+ null,
taskResource,
ingestionSchema,
context
@@ -523,10 +514,7 @@ public class ParallelIndexSupervisorTaskResourceTest
extends AbstractParallelInd
@Override
SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox
toolbox)
{
- return new TestRunner(
- toolbox,
- this
- );
+ return new TestRunner(toolbox, this);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index a14d11d6f78..15201e884d3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -241,8 +241,7 @@ public class PartialCompactionTest extends
AbstractMultiPhaseParallelIndexingTes
{
return new Builder(
DATASOURCE,
- getSegmentCacheManagerFactory(),
- RETRY_POLICY_FACTORY
+ getSegmentCacheManagerFactory()
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]