This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a618c5dd0de Refactor: Miscellaneous batch task cleanup (#16730)
a618c5dd0de is described below

commit a618c5dd0dea8917986166e1d4d988f7101ec88d
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 12 19:42:51 2024 -0700

    Refactor: Miscellaneous batch task cleanup (#16730)
    
    Changes
    - No functional change
    - Remove unused method `IndexTuningConfig.withPartitionsSpec()`
    - Remove unused method `ParallelIndexTuningConfig.withPartitionsSpec()`
    - Remove redundant method `CompactTask.emitIngestionModeMetrics()`
    - Remove Clock argument from 
`CompactionTask.createDataSchemasForInterval()` as it was only needed
    for one test which was just verifying the value passed by the test itself. 
The code now uses a `Stopwatch`
    instead and test simply verifies that the metric has been emitted.
    - Other minor cleanup changes
---
 .../msq/indexing/MSQCompactionRunnerTest.java      |  1 -
 .../druid/indexing/common/task/CompactionTask.java | 70 ++++++----------
 .../druid/indexing/common/task/IndexTask.java      | 25 ------
 .../common/task/NativeCompactionRunner.java        | 14 ++--
 .../batch/parallel/ParallelIndexTuningConfig.java  | 39 ---------
 .../task/ClientCompactionTaskQuerySerdeTest.java   |  5 +-
 .../common/task/CompactionTaskParallelRunTest.java | 45 ++++------
 .../common/task/CompactionTaskRunTest.java         | 66 +++++----------
 .../indexing/common/task/CompactionTaskTest.java   | 96 ++++------------------
 .../AbstractParallelIndexSupervisorTaskTest.java   | 20 -----
 .../ParallelIndexSupervisorTaskKillTest.java       | 35 ++------
 .../ParallelIndexSupervisorTaskResourceTest.java   | 20 +----
 .../task/batch/parallel/PartialCompactionTest.java |  3 +-
 13 files changed, 104 insertions(+), 335 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 35eca8cfcb4..6f1a4396ada 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -369,7 +369,6 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskTransformSpec(dimFilter);
     final CompactionTask.Builder builder = new CompactionTask.Builder(
         DATA_SOURCE,
-        null,
         null
     );
     IndexSpec indexSpec = createIndexSpec();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index fe4d09d8481..8659eb0f397 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -48,7 +48,6 @@ import org.apache.druid.indexer.Property;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -62,6 +61,7 @@ import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
@@ -100,7 +100,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -125,8 +124,6 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
 {
   public static final String TYPE = "compact";
   private static final Logger log = new Logger(CompactionTask.class);
-  private static final Clock UTC_CLOCK = Clock.systemUTC();
-
 
   /**
    * The CompactionTask creates and runs multiple IndexTask instances. When 
the {@link AppenderatorsManager}
@@ -449,27 +446,12 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
     return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
   }
 
-  @VisibleForTesting
-  void emitCompactIngestionModeMetrics(
-      ServiceEmitter emitter,
-      boolean isDropExisting
-  )
-  {
-
-    if (emitter == null) {
-      return;
-    }
-    emitMetric(emitter, "ingest/count", 1);
-  }
-
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
-    // emit metric for compact ingestion mode:
-    emitCompactIngestionModeMetrics(toolbox.getEmitter(), 
ioConfig.isDropExisting());
+    emitMetric(toolbox.getEmitter(), "ingest/count", 1);
 
     final Map<Interval, DataSchema> intervalDataSchemas = 
createDataSchemasForIntervals(
-        UTC_CLOCK,
         toolbox,
         getTaskLockHelper().getLockGranularityToUse(),
         segmentProvider,
@@ -489,13 +471,13 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
   }
 
   /**
-   * Generate dataschema for segments in each interval
-   * @return
-   * @throws IOException
+   * Generate dataschema for segments in each interval.
+   *
+   * @throws IOException if an exception occurs whie retrieving used segments 
to
+   * determine schemas.
    */
   @VisibleForTesting
   static Map<Interval, DataSchema> createDataSchemasForIntervals(
-      final Clock clock,
       final TaskToolbox toolbox,
       final LockGranularity lockGranularityInUse,
       final SegmentProvider segmentProvider,
@@ -506,13 +488,13 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       final ServiceMetricEvent.Builder metricBuilder
   ) throws IOException
   {
-    final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = 
retrieveRelevantTimelineHolders(
+    final Iterable<DataSegment> timelineSegments = 
retrieveRelevantTimelineHolders(
         toolbox,
         segmentProvider,
         lockGranularityInUse
     );
 
-    if (timelineSegments.isEmpty()) {
+    if (!timelineSegments.iterator().hasNext()) {
       return Collections.emptyMap();
     }
 
@@ -524,7 +506,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
           Comparators.intervalsByStartThenEnd()
       );
 
-      for (final DataSegment dataSegment : 
VersionedIntervalTimeline.getAllObjects(timelineSegments)) {
+      for (final DataSegment dataSegment : timelineSegments) {
         intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new 
ArrayList<>())
                           .add(dataSegment);
       }
@@ -557,7 +539,6 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
         // creates new granularitySpec and set segmentGranularity
         Granularity segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
         final DataSchema dataSchema = createDataSchema(
-            clock,
             toolbox.getEmitter(),
             metricBuilder,
             segmentProvider.dataSource,
@@ -576,18 +557,17 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
     } else {
       // given segment granularity
       final DataSchema dataSchema = createDataSchema(
-          clock,
           toolbox.getEmitter(),
           metricBuilder,
           segmentProvider.dataSource,
           JodaUtils.umbrellaInterval(
               Iterables.transform(
-                  VersionedIntervalTimeline.getAllObjects(timelineSegments),
+                  timelineSegments,
                   DataSegment::getInterval
               )
           ),
           lazyFetchSegments(
-              VersionedIntervalTimeline.getAllObjects(timelineSegments),
+              timelineSegments,
               toolbox.getSegmentCacheManager(),
               toolbox.getIndexIO()
           ),
@@ -600,7 +580,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
     }
   }
 
-  private static List<TimelineObjectHolder<String, DataSegment>> 
retrieveRelevantTimelineHolders(
+  private static Iterable<DataSegment> retrieveRelevantTimelineHolders(
       TaskToolbox toolbox,
       SegmentProvider segmentProvider,
       LockGranularity lockGranularityInUse
@@ -612,11 +592,10 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = 
SegmentTimeline
         .forSegments(usedSegments)
         .lookup(segmentProvider.interval);
-    return timelineSegments;
+    return VersionedIntervalTimeline.getAllObjects(timelineSegments);
   }
 
   private static DataSchema createDataSchema(
-      Clock clock,
       ServiceEmitter emitter,
       ServiceMetricEvent.Builder metricBuilder,
       String dataSource,
@@ -636,24 +615,30 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
         dimensionsSpec == null,
         metricsSpec == null
     );
-    long start = clock.millis();
+
+    final Stopwatch stopwatch = Stopwatch.createStarted();
     try {
       existingSegmentAnalyzer.fetchAndProcessIfNeeded();
     }
     finally {
       if (emitter != null) {
-        
emitter.emit(metricBuilder.setMetric("compact/segmentAnalyzer/fetchAndProcessMillis",
 clock.millis() - start));
+        emitter.emit(
+            metricBuilder.setMetric(
+                "compact/segmentAnalyzer/fetchAndProcessMillis",
+                stopwatch.millisElapsed()
+            )
+        );
       }
     }
 
     final Granularity queryGranularityToUse;
     if (granularitySpec.getQueryGranularity() == null) {
       queryGranularityToUse = existingSegmentAnalyzer.getQueryGranularity();
-      log.info("Generate compaction task spec with segments original query 
granularity [%s]", queryGranularityToUse);
+      log.info("Generate compaction task spec with segments original query 
granularity[%s]", queryGranularityToUse);
     } else {
       queryGranularityToUse = granularitySpec.getQueryGranularity();
       log.info(
-          "Generate compaction task spec with new query granularity overrided 
from input [%s]",
+          "Generate compaction task spec with new query granularity overrided 
from input[%s].",
           queryGranularityToUse
       );
     }
@@ -1033,7 +1018,6 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
   {
     private final String dataSource;
     private final SegmentCacheManagerFactory segmentCacheManagerFactory;
-    private final RetryPolicyFactory retryPolicyFactory;
 
     private CompactionIOConfig ioConfig;
     @Nullable
@@ -1054,13 +1038,11 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
 
     public Builder(
         String dataSource,
-        SegmentCacheManagerFactory segmentCacheManagerFactory,
-        RetryPolicyFactory retryPolicyFactory
+        SegmentCacheManagerFactory segmentCacheManagerFactory
     )
     {
       this.dataSource = dataSource;
       this.segmentCacheManagerFactory = segmentCacheManagerFactory;
-      this.retryPolicyFactory = retryPolicyFactory;
     }
 
     public Builder interval(Interval interval)
@@ -1288,7 +1270,9 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       );
     }
 
-    @Override
+    /**
+     * Creates a copy of this tuning config with the partition spec changed.
+     */
     public CompactionTuningConfig withPartitionsSpec(PartitionsSpec 
partitionsSpec)
     {
       return new CompactionTuningConfig(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index cc253f46a52..dc6c07b6b83 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1471,31 +1471,6 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler, Pe
       );
     }
 
-    public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
-    {
-      return new IndexTuningConfig(
-          appendableIndexSpec,
-          maxRowsInMemory,
-          maxBytesInMemory,
-          skipBytesInMemoryOverheadCheck,
-          partitionsSpec,
-          indexSpec,
-          indexSpecForIntermediatePersists,
-          maxPendingPersists,
-          forceGuaranteedRollup,
-          reportParseExceptions,
-          pushTimeout,
-          basePersistDirectory,
-          segmentWriteOutMediumFactory,
-          logParseExceptions,
-          maxParseExceptions,
-          maxSavedParseExceptions,
-          maxColumnsToMerge,
-          awaitSegmentAvailabilityTimeoutMillis,
-          numPersistThreads
-      );
-    }
-
     @JsonProperty
     @Override
     public AppendableIndexSpec getAppendableIndexSpec()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 722c6010b20..f2eacb8c1c6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -56,11 +56,14 @@ import java.util.stream.IntStream;
 
 public class NativeCompactionRunner implements CompactionRunner
 {
-  private static final Logger log = new Logger(NativeCompactionRunner.class);
   public static final String TYPE = "native";
+
+  private static final Logger log = new Logger(NativeCompactionRunner.class);
   private static final boolean STORE_COMPACTION_STATE = true;
+
   @JsonIgnore
   private final SegmentCacheManagerFactory segmentCacheManagerFactory;
+
   @JsonIgnore
   private final CurrentSubTaskHolder currentSubTaskHolder = new 
CurrentSubTaskHolder(
       (taskObject, config) -> {
@@ -183,7 +186,6 @@ public class NativeCompactionRunner implements 
CompactionRunner
     final PartitionConfigurationManager partitionConfigurationManager =
         new 
NativeCompactionRunner.PartitionConfigurationManager(compactionTask.getTuningConfig());
 
-
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
createIngestionSpecs(
         intervalDataSchemaMap,
         taskToolbox,
@@ -278,8 +280,11 @@ public class NativeCompactionRunner implements 
CompactionRunner
     return failCnt == 0 ? TaskStatus.success(compactionTaskId) : 
TaskStatus.failure(compactionTaskId, msg);
   }
 
-  @VisibleForTesting
-  ParallelIndexSupervisorTask newTask(CompactionTask compactionTask, String 
baseSequenceName, ParallelIndexIngestionSpec ingestionSpec)
+  private ParallelIndexSupervisorTask newTask(
+      CompactionTask compactionTask,
+      String baseSequenceName,
+      ParallelIndexIngestionSpec ingestionSpec
+  )
   {
     return new ParallelIndexSupervisorTask(
         compactionTask.getId(),
@@ -305,7 +310,6 @@ public class NativeCompactionRunner implements 
CompactionRunner
   @VisibleForTesting
   static class PartitionConfigurationManager
   {
-    @Nullable
     private final CompactionTask.CompactionTuningConfig tuningConfig;
 
     PartitionConfigurationManager(@Nullable 
CompactionTask.CompactionTuningConfig tuningConfig)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 7e33261ee7c..d97d9beff34 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -275,45 +275,6 @@ public class ParallelIndexTuningConfig extends 
IndexTuningConfig
     return maxAllowedLockCount;
   }
 
-  @Override
-  public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec 
partitionsSpec)
-  {
-    return new ParallelIndexTuningConfig(
-        null,
-        null,
-        getAppendableIndexSpec(),
-        getMaxRowsInMemory(),
-        getMaxBytesInMemory(),
-        isSkipBytesInMemoryOverheadCheck(),
-        null,
-        null,
-        getSplitHintSpec(),
-        partitionsSpec,
-        getIndexSpec(),
-        getIndexSpecForIntermediatePersists(),
-        getMaxPendingPersists(),
-        isForceGuaranteedRollup(),
-        isReportParseExceptions(),
-        getPushTimeout(),
-        getSegmentWriteOutMediumFactory(),
-        null,
-        getMaxNumConcurrentSubTasks(),
-        getMaxRetry(),
-        getTaskStatusCheckPeriodMs(),
-        getChatHandlerTimeout(),
-        getChatHandlerNumRetries(),
-        getMaxNumSegmentsToMerge(),
-        getTotalNumMergeTasks(),
-        isLogParseExceptions(),
-        getMaxParseExceptions(),
-        getMaxSavedParseExceptions(),
-        getMaxColumnsToMerge(),
-        getAwaitSegmentAvailabilityTimeoutMillis(),
-        getMaxAllowedLockCount(),
-        getNumPersistThreads()
-    );
-  }
-
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 3d6c8085c98..fcded3ab9ee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -43,8 +43,6 @@ import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TestUtils;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
@@ -340,8 +338,7 @@ public class ClientCompactionTaskQuerySerdeTest
   {
     CompactionTask.Builder compactionTaskBuilder = new CompactionTask.Builder(
         "datasource",
-        new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER),
-        new RetryPolicyFactory(new RetryPolicyConfig())
+        new SegmentCacheManagerFactory(TestIndex.INDEX_IO, MAPPER)
     )
         .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), 
"testSha256OfSortedSegmentIds"), true)
         .tuningConfig(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index ba9a6e3e2be..188ea3cdd07 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -41,8 +41,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.CompactionTask.Builder;
@@ -114,7 +112,6 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
   }
 
   private static final String DATA_SOURCE = "test";
-  private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new 
RetryPolicyFactory(new RetryPolicyConfig());
   private static final Interval INTERVAL_TO_INDEX = 
Intervals.of("2014-01-01/2014-01-02");
 
   private final LockGranularity lockGranularity;
@@ -160,8 +157,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -215,8 +211,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -281,8 +276,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -332,8 +326,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
 
     final CompactionTask compactionTask = builder
@@ -395,8 +388,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -449,8 +441,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -500,8 +491,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -551,8 +541,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -582,8 +571,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -639,8 +627,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -702,8 +689,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -752,8 +738,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
     runIndexTask(null, true);
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
@@ -844,8 +829,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         // Set the dropExisting flag to true in the IOConfig of the compaction 
task
@@ -891,8 +875,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 599a24fac80..54902d5f7c6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -45,8 +45,6 @@ import 
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.report.IngestionStatsAndErrors;
 import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -180,7 +178,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
   }
 
   private static final String DATA_SOURCE = "test";
-  private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new 
RetryPolicyFactory(new RetryPolicyConfig());
   private final OverlordClient overlordClient;
   private final CoordinatorClient coordinatorClient;
   private final SegmentCacheManagerFactory segmentCacheManagerFactory;
@@ -284,8 +281,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -352,8 +348,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -452,8 +447,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask1 = builder
@@ -547,8 +541,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -659,8 +652,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity
@@ -729,8 +721,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask1 = builder
@@ -764,8 +755,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity
@@ -809,8 +799,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity
@@ -869,8 +858,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity
@@ -935,8 +923,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity
@@ -1004,8 +991,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day queryGranularity
@@ -1058,8 +1044,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // day segmentGranularity and day queryGranularity
@@ -1097,8 +1082,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask1 = builder
@@ -1150,8 +1134,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -1212,8 +1195,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // Setup partial compaction:
@@ -1368,8 +1350,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     // Setup partial interval compaction:
@@ -1476,8 +1457,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final Interval partialInterval = 
Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
@@ -1548,8 +1528,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -1603,8 +1582,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -1694,8 +1672,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -1826,8 +1803,7 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask compactionTask = builder
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 48a7932a241..b138a25469f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -57,8 +57,6 @@ import 
org.apache.druid.guice.IndexingServiceTuningConfigModule;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
-import org.apache.druid.indexing.common.RetryPolicyConfig;
-import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
@@ -84,8 +82,6 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Comparators;
-import org.apache.druid.java.util.emitter.core.NoopEmitter;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -148,15 +144,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -198,7 +191,6 @@ public class CompactionTaskTest
   private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
   private static final CoordinatorClient COORDINATOR_CLIENT = new 
TestCoordinatorClient(SEGMENT_MAP);
   private static final ObjectMapper OBJECT_MAPPER = 
setupInjectablesInObjectMapper(new DefaultObjectMapper());
-  private static final RetryPolicyFactory RETRY_POLICY_FACTORY = new 
RetryPolicyFactory(new RetryPolicyConfig());
   private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT =
       "Conflicting segment granularities found %s(segmentGranularity) and 
%s(granularitySpec.segmentGranularity).\n"
       + "Remove `segmentGranularity` and set the 
`granularitySpec.segmentGranularity` to the expected granularity";
@@ -375,8 +367,6 @@ public class CompactionTaskTest
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
-  @Mock
-  private Clock clock;
   private StubServiceEmitter emitter;
 
   @Before
@@ -389,7 +379,6 @@ public class CompactionTaskTest
         testIndexIO,
         SEGMENT_MAP
     );
-    Mockito.when(clock.millis()).thenReturn(0L, 10_000L);
     segmentCacheManagerFactory = new 
SegmentCacheManagerFactory(TestIndex.INDEX_IO, OBJECT_MAPPER);
   }
 
@@ -398,8 +387,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -408,8 +396,7 @@ public class CompactionTaskTest
 
     final Builder builder2 = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder2.tuningConfig(createTuningConfig());
@@ -421,33 +408,12 @@ public class CompactionTaskTest
     );
   }
 
-  @Test
-  public void testCompactionTaskEmitter()
-  {
-    final Builder builder = new Builder(
-        DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
-    );
-    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
-    builder.tuningConfig(createTuningConfig());
-    builder.segmentGranularity(Granularities.HOUR);
-    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
-
-    // null emitter should work
-    taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(null, 
false);
-    // non-null should also work
-    ServiceEmitter noopEmitter = new ServiceEmitter("service", "host", new 
NoopEmitter());
-    
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, 
false);
-    
taskCreatedWithSegmentGranularity.emitCompactIngestionModeMetrics(noopEmitter, 
true);
-  }
-
   @Test(expected = IAE.class)
   public void 
testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -477,7 +443,7 @@ public class CompactionTaskTest
         new ClientCompactionTaskTransformSpec(new SelectorDimFilter("dim1", 
"foo", null));
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -495,8 +461,7 @@ public class CompactionTaskTest
     AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new 
CountAggregatorFactory("cnt")};
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -513,8 +478,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -542,8 +506,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
@@ -558,8 +521,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     final CompactionTask task = builder
         .inputSpec(
@@ -579,8 +541,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     final CompactionTask task = builder
         .segments(SEGMENTS)
@@ -598,8 +559,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask task = builder
@@ -675,14 +635,12 @@ public class CompactionTaskTest
         toolbox.getRowIngestionMetersFactory(),
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY,
         toolbox.getAppenderatorsManager()
     );
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     final CompactionTask expectedFromJson = builder
@@ -702,8 +660,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory,
-        RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     final CompactionTask task = builder
         .inputSpec(
@@ -910,7 +867,6 @@ public class CompactionTaskTest
   public void testCreateIngestionSchema() throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -992,7 +948,6 @@ public class CompactionTaskTest
         null
     );
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1075,7 +1030,6 @@ public class CompactionTaskTest
         null
     );
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1158,7 +1112,6 @@ public class CompactionTaskTest
         null
     );
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1229,7 +1182,6 @@ public class CompactionTaskTest
     );
 
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1280,7 +1232,6 @@ public class CompactionTaskTest
     };
 
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1324,7 +1275,6 @@ public class CompactionTaskTest
   public void testCreateIngestionSchemaWithCustomSegments() throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1375,7 +1325,6 @@ public class CompactionTaskTest
     // Remove one segment in the middle
     segments.remove(segments.size() / 2);
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, 
SpecificSegmentsSpec.fromSegments(segments)),
@@ -1406,7 +1355,6 @@ public class CompactionTaskTest
     
indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(),
 null));
     final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, 
SpecificSegmentsSpec.fromSegments(segments)),
@@ -1435,7 +1383,7 @@ public class CompactionTaskTest
 
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
 
     @SuppressWarnings("unused")
@@ -1448,7 +1396,6 @@ public class CompactionTaskTest
   public void testSegmentGranularityAndNullQueryGranularity() throws 
IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1493,7 +1440,6 @@ public class CompactionTaskTest
   public void testQueryGranularityAndNullSegmentGranularity() throws 
IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1535,7 +1481,6 @@ public class CompactionTaskTest
   public void testQueryGranularityAndSegmentGranularityNonNull() throws 
IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1580,14 +1525,13 @@ public class CompactionTaskTest
         new PeriodGranularity(Period.months(3), null, null),
         BatchIOConfig.DEFAULT_DROP_EXISTING
     );
-    emitter.verifyValue("compact/segmentAnalyzer/fetchAndProcessMillis", 
10_000L);
+    emitter.verifyEmitted("compact/segmentAnalyzer/fetchAndProcessMillis", 1);
   }
 
   @Test
   public void testNullGranularitySpec() throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1633,7 +1577,6 @@ public class CompactionTaskTest
       throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1679,7 +1622,6 @@ public class CompactionTaskTest
       throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1710,7 +1652,6 @@ public class CompactionTaskTest
       throws IOException
   {
     final Map<Interval, DataSchema> dataSchemasForIntervals = 
CompactionTask.createDataSchemasForIntervals(
-        clock,
         toolbox,
         LockGranularity.TIME_CHUNK,
         new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
@@ -1752,7 +1693,7 @@ public class CompactionTaskTest
         Granularities.ALL,
         Granularities.MINUTE
     );
-    
Assert.assertTrue(Granularities.SECOND.equals(chooseFinestGranularityHelper(input)));
+    Assert.assertEquals(Granularities.SECOND, 
chooseFinestGranularityHelper(input));
   }
 
   @Test
@@ -1769,7 +1710,7 @@ public class CompactionTaskTest
         Granularities.NONE,
         Granularities.MINUTE
     );
-    
Assert.assertTrue(Granularities.NONE.equals(chooseFinestGranularityHelper(input)));
+    Assert.assertEquals(Granularities.NONE, 
chooseFinestGranularityHelper(input));
   }
 
   @Test
@@ -1789,7 +1730,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     final CompactionTask task = builder
         .interval(Intervals.of("2000-01-01/2000-01-02"))
@@ -1802,7 +1743,7 @@ public class CompactionTaskTest
   {
     final Builder builder = new Builder(
         DATA_SOURCE,
-        segmentCacheManagerFactory, RETRY_POLICY_FACTORY
+        segmentCacheManagerFactory
     );
     final CompactionTask task = builder
         .interval(Intervals.of("2000-01-01/2000-01-02"))
@@ -2270,7 +2211,6 @@ public class CompactionTaskTest
         @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
         @JacksonInject CoordinatorClient coordinatorClient,
         @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
-        @JacksonInject RetryPolicyFactory retryPolicyFactory,
         @JacksonInject AppenderatorsManager appenderatorsManager
     )
     {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index ecc4f702d6a..8f68846cd64 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -64,7 +64,6 @@ import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactor
 import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.overlord.Segments;
@@ -754,25 +753,6 @@ public class AbstractParallelIndexSupervisorTaskTest 
extends IngestionTestBase
         .build();
   }
 
-  static class TestParallelIndexSupervisorTask extends 
ParallelIndexSupervisorTask
-  {
-    TestParallelIndexSupervisorTask(
-        String id,
-        TaskResource taskResource,
-        ParallelIndexIngestionSpec ingestionSchema,
-        Map<String, Object> context
-    )
-    {
-      super(
-          id,
-          null,
-          taskResource,
-          ingestionSchema,
-          context
-      );
-    }
-  }
-
   static class LocalShuffleClient implements 
ShuffleClient<GenericPartitionLocation>
   {
     private final IntermediaryDataManager intermediaryDataManager;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index a32aed819e0..14e1bab9540 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -40,13 +40,10 @@ import 
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
-import org.hamcrest.CoreMatchers;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -59,8 +56,6 @@ import java.util.stream.Stream;
 
 public class ParallelIndexSupervisorTaskKillTest extends 
AbstractParallelIndexSupervisorTaskTest
 {
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
 
   public ParallelIndexSupervisorTaskKillTest()
   {
@@ -81,7 +76,7 @@ public class ParallelIndexSupervisorTaskKillTest extends 
AbstractParallelIndexSu
         Intervals.of("2017/2018"),
         new ParallelIndexIOConfig(
             null,
-            // Sub tasks would run forever
+            // Sub-tasks would run forever
             new TestInputSource(Pair.of(new TestInput(Integer.MAX_VALUE, 
TaskState.SUCCESS), 4)),
             new NoopInputFormat(),
             false,
@@ -93,16 +88,12 @@ public class ParallelIndexSupervisorTaskKillTest extends 
AbstractParallelIndexSu
       Thread.sleep(100);
     }
     task.stopGracefully(null);
-    expectedException.expect(RuntimeException.class);
-    
expectedException.expectCause(CoreMatchers.instanceOf(ExecutionException.class));
-    getIndexingServiceClient().waitToFinish(task, 3000L, 
TimeUnit.MILLISECONDS);
 
-    final SinglePhaseParallelIndexTaskRunner runner = 
(SinglePhaseParallelIndexTaskRunner) task.getCurrentRunner();
-    Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
-    // completeSubTaskSpecs should be empty because no task has reported its 
status to TaskMonitor
-    Assert.assertTrue(runner.getCompleteSubTaskSpecs().isEmpty());
-
-    Assert.assertEquals(4, runner.getTaskMonitor().getNumCanceledTasks());
+    Exception e = Assert.assertThrows(
+        RuntimeException.class,
+        () -> getIndexingServiceClient().waitToFinish(task, 3000L, 
TimeUnit.MILLISECONDS)
+    );
+    Assert.assertTrue(e.getCause() instanceof ExecutionException);
   }
 
   @Test(timeout = 5000L)
@@ -273,28 +264,20 @@ public class ParallelIndexSupervisorTaskKillTest extends 
AbstractParallelIndexSu
     }
   }
 
-  private static class TestSupervisorTask extends 
TestParallelIndexSupervisorTask
+  private static class TestSupervisorTask extends ParallelIndexSupervisorTask
   {
     private TestSupervisorTask(
         ParallelIndexIngestionSpec ingestionSchema,
         Map<String, Object> context
     )
     {
-      super(
-          null,
-          null,
-          ingestionSchema,
-          context
-      );
+      super(null, null, null, ingestionSchema, context);
     }
 
     @Override
     SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox 
toolbox)
     {
-      return new TestRunner(
-          toolbox,
-          this
-      );
+      return new TestRunner(toolbox, this);
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 772bdafb2b1..01d502de85c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -290,19 +290,11 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
     Assert.assertEquals(200, response.getStatus());
     final ParallelIndexingPhaseProgress monitorStatus = 
(ParallelIndexingPhaseProgress) response.getEntity();
 
-    // numRunningTasks
+    // Verify the number of tasks in different states
     Assert.assertEquals(runningTasks.size(), monitorStatus.getRunning());
-
-    // numSucceededTasks
     Assert.assertEquals(expectedSucceededTasks, monitorStatus.getSucceeded());
-
-    // numFailedTasks
     Assert.assertEquals(expectedFailedTask, monitorStatus.getFailed());
-
-    // numCompleteTasks
     Assert.assertEquals(expectedSucceededTasks + expectedFailedTask, 
monitorStatus.getComplete());
-
-    // numTotalTasks
     Assert.assertEquals(runningTasks.size() + expectedSucceededTasks + 
expectedFailedTask, monitorStatus.getTotal());
 
     // runningSubTasks
@@ -407,7 +399,6 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
       ParallelIndexIOConfig ioConfig
   )
   {
-    // set up ingestion spec
     final ParallelIndexIngestionSpec ingestionSpec = new 
ParallelIndexIngestionSpec(
         new DataSchema(
             "dataSource",
@@ -460,7 +451,6 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
         )
     );
 
-    // set up test tools
     return new TestSupervisorTask(
         null,
         null,
@@ -503,7 +493,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
     }
   }
 
-  private class TestSupervisorTask extends TestParallelIndexSupervisorTask
+  private class TestSupervisorTask extends ParallelIndexSupervisorTask
   {
     TestSupervisorTask(
         String id,
@@ -514,6 +504,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
     {
       super(
           id,
+          null,
           taskResource,
           ingestionSchema,
           context
@@ -523,10 +514,7 @@ public class ParallelIndexSupervisorTaskResourceTest 
extends AbstractParallelInd
     @Override
     SinglePhaseParallelIndexTaskRunner createSinglePhaseTaskRunner(TaskToolbox 
toolbox)
     {
-      return new TestRunner(
-          toolbox,
-          this
-      );
+      return new TestRunner(toolbox, this);
     }
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
index a14d11d6f78..15201e884d3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialCompactionTest.java
@@ -241,8 +241,7 @@ public class PartialCompactionTest extends 
AbstractMultiPhaseParallelIndexingTes
   {
     return new Builder(
         DATASOURCE,
-        getSegmentCacheManagerFactory(),
-        RETRY_POLICY_FACTORY
+        getSegmentCacheManagerFactory()
     );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to