This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c211dcc4b3 Clean up compaction logs on coordinator (#14875)
c211dcc4b3 is described below

commit c211dcc4b310eaafe0c9acc78d3d84c24a6770c9
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 21 17:30:41 2023 +0530

    Clean up compaction logs on coordinator (#14875)
    
    Changes:
    - Move logic of `NewestSegmentFirstIterator.needsCompaction` to 
`CompactionStatus`
    to improve testability and readability
    - Capture the list of checks performed to determine if compaction is needed 
in a readable
    manner in `CompactionStatus.CHECKS`
    - Make `CompactionSegmentIterator` iterate over instances of 
`SegmentsToCompact`
    instead of `List<DataSegment>`. This allows use of the `umbrellaInterval` 
later.
    - Replace usages of `QueueEntry` with `SegmentsToCompact`
    - Move `SegmentsToCompact` out of `NewestSegmentFirstIterator`
    - Simplify `CompactionStatistics`
    - Reduce level of less important logs to debug
    - No change made to tests to ensure correctness
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   9 +-
 .../apache/druid/indexing/common/task/Tasks.java   |   2 +-
 .../server/coordinator/CompactionStatistics.java   |  73 ----
 .../druid/server/coordinator/DruidCoordinator.java |   2 +-
 .../CompactionSegmentIterator.java                 |   6 +-
 .../CompactionSegmentSearchPolicy.java             |   3 +-
 .../CompactionStatistics.java}                     |  51 ++-
 .../coordinator/compact/CompactionStatus.java      | 352 ++++++++++++++++
 .../NewestSegmentFirstIterator.java                | 467 ++++-----------------
 .../NewestSegmentFirstPolicy.java                  |   2 +-
 .../coordinator/compact/SegmentsToCompact.java     | 123 ++++++
 .../server/coordinator/duty/CompactSegments.java   |  37 +-
 .../server/coordinator/DruidCoordinatorTest.java   |   2 +-
 .../NewestSegmentFirstIteratorTest.java            |  20 +-
 .../NewestSegmentFirstPolicyTest.java              | 121 +++---
 .../coordinator/duty/CompactSegmentsTest.java      |   2 +
 .../simulate/CoordinatorSimulationBuilder.java     |   4 +-
 .../java/org/apache/druid/cli/CliCoordinator.java  |   4 +-
 18 files changed, 713 insertions(+), 567 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 5310033521..52a6e0d975 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
+import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -142,8 +142,7 @@ public class NewestSegmentFirstPolicyBenchmark
   {
     final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, 
dataSources, Collections.emptyMap());
     for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
-      final List<DataSegment> segments = iterator.next();
-      blackhole.consume(segments);
+      blackhole.consume(iterator.next());
     }
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 90b7526977..28c80ac300 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -68,7 +68,7 @@ public class Tasks
    * This context is used in compaction. When it is set in the context, the 
segments created by the task
    * will fill 'lastCompactionState' in its metadata. This will be used to 
track what segments are compacted or not.
    * See {@link org.apache.druid.timeline.DataSegment} and {@link
-   * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for 
more details.
+   * org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator} 
for more details.
    */
   public static final String STORE_COMPACTION_STATE_KEY = 
"storeCompactionState";
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
deleted file mode 100644
index 676630ef09..0000000000
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-public class CompactionStatistics
-{
-  private long byteSum;
-  private long segmentNumberCountSum;
-  private long segmentIntervalCountSum;
-
-  public CompactionStatistics(
-      long byteSum,
-      long segmentNumberCountSum,
-      long segmentIntervalCountSum
-  )
-  {
-    this.byteSum = byteSum;
-    this.segmentNumberCountSum = segmentNumberCountSum;
-    this.segmentIntervalCountSum = segmentIntervalCountSum;
-  }
-
-  public static CompactionStatistics initializeCompactionStatistics()
-  {
-    return new CompactionStatistics(0, 0, 0);
-  }
-
-  public long getByteSum()
-  {
-    return byteSum;
-  }
-
-  public long getSegmentNumberCountSum()
-  {
-    return segmentNumberCountSum;
-  }
-
-  public long getSegmentIntervalCountSum()
-  {
-    return segmentIntervalCountSum;
-  }
-
-  public void incrementCompactedByte(long incrementValue)
-  {
-    byteSum = byteSum + incrementValue;
-  }
-
-  public void incrementCompactedSegments(long incrementValue)
-  {
-    segmentNumberCountSum = segmentNumberCountSum + incrementValue;
-  }
-
-  public void incrementCompactedIntervals(long incrementValue)
-  {
-    segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 363a5758d0..832f7790ad 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -62,10 +62,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
 import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
similarity index 90%
rename from 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
rename to 
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
index 64f5c16a17..bab7ca8f92 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
@@ -17,20 +17,18 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
-import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.timeline.DataSegment;
 
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 /**
  * Segments in the lists which are the elements of this iterator are sorted 
according to the natural segment order
  * (see {@link DataSegment#compareTo}).
  */
-public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
+public interface CompactionSegmentIterator extends Iterator<SegmentsToCompact>
 {
   /**
    * Return a map of dataSourceName to CompactionStatistics.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
similarity index 92%
copy from 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
copy to 
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
index 2cbaf31d69..5a006908c3 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
similarity index 53%
rename from 
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
rename to 
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
index 2cbaf31d69..dd672ce448 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
@@ -17,26 +17,41 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
-
-import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
-
-import java.util.List;
-import java.util.Map;
+package org.apache.druid.server.coordinator.compact;
 
 /**
- * Segment searching policy used by {@link CompactSegments}.
+ * Used to track statistics for segments in different states of compaction.
  */
-public interface CompactionSegmentSearchPolicy
+public class CompactionStatistics
 {
-  /**
-   * Reset the current states of this policy. This method should be called 
whenever iterating starts.
-   */
-  CompactionSegmentIterator reset(
-      Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, SegmentTimeline> dataSources,
-      Map<String, List<Interval>> skipIntervals
-  );
+  private long totalBytes;
+  private long numSegments;
+  private long numIntervals;
+
+  public static CompactionStatistics create()
+  {
+    return new CompactionStatistics();
+  }
+
+  public long getTotalBytes()
+  {
+    return totalBytes;
+  }
+
+  public long getNumSegments()
+  {
+    return numSegments;
+  }
+
+  public long getNumIntervals()
+  {
+    return numIntervals;
+  }
+
+  public void addFrom(SegmentsToCompact segments)
+  {
+    totalBytes += segments.getTotalBytes();
+    numIntervals += segments.getNumIntervals();
+    numSegments += segments.size();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
new file mode 100644
index 0000000000..862f2e7c5b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.compact;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Represents the status of compaction for a given list of candidate segments.
+ */
+public class CompactionStatus
+{
+  private static final CompactionStatus COMPLETE = new CompactionStatus(true, 
null);
+
+  /**
+   * List of checks performed to determine if compaction is already complete.
+   * <p>
+   * The order of the checks must be honored while evaluating them.
+   */
+  private static final List<Function<Evaluator, CompactionStatus>> CHECKS = 
Arrays.asList(
+      Evaluator::segmentsHaveBeenCompactedAtLeastOnce,
+      Evaluator::allCandidatesHaveSameCompactionState,
+      Evaluator::partitionsSpecIsUpToDate,
+      Evaluator::indexSpecIsUpToDate,
+      Evaluator::segmentGranularityIsUpToDate,
+      Evaluator::queryGranularityIsUpToDate,
+      Evaluator::rollupIsUpToDate,
+      Evaluator::dimensionsSpecIsUpToDate,
+      Evaluator::metricsSpecIsUpToDate,
+      Evaluator::transformSpecFilterIsUpToDate
+  );
+
+  private final boolean complete;
+  private final String reasonToCompact;
+
+  private CompactionStatus(boolean complete, String reason)
+  {
+    this.complete = complete;
+    this.reasonToCompact = reason;
+  }
+
+  public boolean isComplete()
+  {
+    return complete;
+  }
+
+  public String getReasonToCompact()
+  {
+    return reasonToCompact;
+  }
+
+  private static CompactionStatus incomplete(String reasonFormat, Object... 
args)
+  {
+    return new CompactionStatus(false, StringUtils.format(reasonFormat, args));
+  }
+
+  private static CompactionStatus completeIfEqual(String field, Object 
configured, Object current)
+  {
+    if (configured == null || configured.equals(current)) {
+      return COMPLETE;
+    } else {
+      return configChanged(field, configured, current);
+    }
+  }
+
+  private static CompactionStatus configChanged(String field, Object 
configured, Object current)
+  {
+    return CompactionStatus.incomplete(
+        "Configured %s[%s] is different from current %s[%s]",
+        field, configured, field, current
+    );
+  }
+
+  /**
+   * Determines the CompactionStatus of the given candidate segments by 
evaluating
+   * the {@link #CHECKS} one by one. If any check returns an incomplete status,
+   * further checks are not performed and the incomplete status is returned.
+   */
+  static CompactionStatus of(
+      SegmentsToCompact candidateSegments,
+      DataSourceCompactionConfig config,
+      ObjectMapper objectMapper
+  )
+  {
+    final Evaluator evaluator = new Evaluator(candidateSegments, config, 
objectMapper);
+    return CHECKS.stream().map(f -> f.apply(evaluator))
+                 .filter(status -> !status.isComplete())
+                 .findFirst().orElse(COMPLETE);
+  }
+
+  static PartitionsSpec 
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
+  {
+    final PartitionsSpec partitionsSpecFromTuningConfig = 
tuningConfig.getPartitionsSpec();
+    if (partitionsSpecFromTuningConfig == null) {
+      final long maxTotalRows = 
Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
+      return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), 
maxTotalRows);
+    } else if (partitionsSpecFromTuningConfig instanceof 
DynamicPartitionsSpec) {
+      return new DynamicPartitionsSpec(
+          partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
+          ((DynamicPartitionsSpec) 
partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
+      );
+    } else {
+      return partitionsSpecFromTuningConfig;
+    }
+  }
+
+  /**
+   * Evaluates {@link #CHECKS} to determine the compaction status.
+   */
+  private static class Evaluator
+  {
+    private final ObjectMapper objectMapper;
+    private final DataSourceCompactionConfig compactionConfig;
+    private final SegmentsToCompact candidateSegments;
+    private final CompactionState lastCompactionState;
+    private final ClientCompactionTaskQueryTuningConfig tuningConfig;
+    private final ClientCompactionTaskGranularitySpec existingGranularitySpec;
+    private final UserCompactionTaskGranularityConfig 
configuredGranularitySpec;
+
+    private Evaluator(
+        SegmentsToCompact candidateSegments,
+        DataSourceCompactionConfig compactionConfig,
+        ObjectMapper objectMapper
+    )
+    {
+      Preconditions.checkArgument(!candidateSegments.isEmpty(), "Empty 
candidates");
+
+      this.candidateSegments = candidateSegments;
+      this.objectMapper = objectMapper;
+      this.lastCompactionState = 
candidateSegments.getFirst().getLastCompactionState();
+      this.compactionConfig = compactionConfig;
+      this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
+          compactionConfig.getTuningConfig(),
+          compactionConfig.getMaxRowsPerSegment(),
+          null
+      );
+
+      this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
+      if (lastCompactionState == null) {
+        this.existingGranularitySpec = null;
+      } else {
+        this.existingGranularitySpec = convertIfNotNull(
+            lastCompactionState.getGranularitySpec(),
+            ClientCompactionTaskGranularitySpec.class
+        );
+      }
+    }
+
+    private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
+    {
+      if (lastCompactionState == null) {
+        return CompactionStatus.incomplete("Not compacted yet");
+      } else {
+        return COMPLETE;
+      }
+    }
+
+    private CompactionStatus allCandidatesHaveSameCompactionState()
+    {
+      final boolean allHaveSameCompactionState = 
candidateSegments.getSegments().stream().allMatch(
+          segment -> 
lastCompactionState.equals(segment.getLastCompactionState())
+      );
+      if (allHaveSameCompactionState) {
+        return COMPLETE;
+      } else {
+        return CompactionStatus.incomplete("Candidate segments have different 
last compaction states.");
+      }
+    }
+
+    private CompactionStatus partitionsSpecIsUpToDate()
+    {
+      return CompactionStatus.completeIfEqual(
+          "partitionsSpec",
+          findPartitionsSpecFromConfig(tuningConfig),
+          lastCompactionState.getPartitionsSpec()
+      );
+    }
+
+    private CompactionStatus indexSpecIsUpToDate()
+    {
+      return CompactionStatus.completeIfEqual(
+          "indexSpec",
+          Configs.valueOrDefault(tuningConfig.getIndexSpec(), 
IndexSpec.DEFAULT),
+          objectMapper.convertValue(lastCompactionState.getIndexSpec(), 
IndexSpec.class)
+      );
+    }
+
+    private CompactionStatus segmentGranularityIsUpToDate()
+    {
+      if (configuredGranularitySpec == null
+          || configuredGranularitySpec.getSegmentGranularity() == null) {
+        return COMPLETE;
+      }
+
+      final Granularity configuredSegmentGranularity = 
configuredGranularitySpec.getSegmentGranularity();
+      final Granularity existingSegmentGranularity
+          = existingGranularitySpec == null ? null : 
existingGranularitySpec.getSegmentGranularity();
+
+      if (configuredSegmentGranularity.equals(existingSegmentGranularity)) {
+        return COMPLETE;
+      } else if (existingSegmentGranularity == null) {
+        // Candidate segments were compacted without segment granularity 
specified
+        // Check if the segments already have the desired segment granularity
+        boolean needsCompaction = 
candidateSegments.getSegments().stream().anyMatch(
+            segment -> 
!configuredSegmentGranularity.isAligned(segment.getInterval())
+        );
+        if (needsCompaction) {
+          return CompactionStatus.incomplete(
+              "Configured segmentGranularity[%s] does not align with segment 
intervals.",
+              configuredSegmentGranularity
+          );
+        }
+      } else {
+        return CompactionStatus.configChanged(
+            "segmentGranularity",
+            configuredSegmentGranularity,
+            existingSegmentGranularity
+        );
+      }
+
+      return COMPLETE;
+    }
+
+    private CompactionStatus rollupIsUpToDate()
+    {
+      if (configuredGranularitySpec == null) {
+        return COMPLETE;
+      } else {
+        return CompactionStatus.completeIfEqual(
+            "rollup",
+            configuredGranularitySpec.isRollup(),
+            existingGranularitySpec == null ? null : 
existingGranularitySpec.isRollup()
+        );
+      }
+    }
+
+    private CompactionStatus queryGranularityIsUpToDate()
+    {
+      if (configuredGranularitySpec == null) {
+        return COMPLETE;
+      } else {
+        return CompactionStatus.completeIfEqual(
+            "queryGranularity",
+            configuredGranularitySpec.getQueryGranularity(),
+            existingGranularitySpec == null ? null : 
existingGranularitySpec.getQueryGranularity()
+        );
+      }
+    }
+
+    private CompactionStatus dimensionsSpecIsUpToDate()
+    {
+      if (compactionConfig.getDimensionsSpec() == null) {
+        return COMPLETE;
+      } else {
+        final DimensionsSpec existingDimensionsSpec = 
lastCompactionState.getDimensionsSpec();
+        return CompactionStatus.completeIfEqual(
+            "dimensionsSpec",
+            compactionConfig.getDimensionsSpec().getDimensions(),
+            existingDimensionsSpec == null ? null : 
existingDimensionsSpec.getDimensions()
+        );
+      }
+    }
+
+    private CompactionStatus metricsSpecIsUpToDate()
+    {
+      final AggregatorFactory[] configuredMetricsSpec = 
compactionConfig.getMetricsSpec();
+      if (ArrayUtils.isEmpty(configuredMetricsSpec)) {
+        return COMPLETE;
+      }
+
+      final List<Object> metricSpecList = lastCompactionState.getMetricsSpec();
+      final AggregatorFactory[] existingMetricsSpec
+          = CollectionUtils.isNullOrEmpty(metricSpecList)
+            ? null : objectMapper.convertValue(metricSpecList, 
AggregatorFactory[].class);
+
+      if (existingMetricsSpec == null || 
!Arrays.deepEquals(configuredMetricsSpec, existingMetricsSpec)) {
+        return CompactionStatus.configChanged(
+            "metricsSpec",
+            Arrays.toString(configuredMetricsSpec),
+            Arrays.toString(existingMetricsSpec)
+        );
+      } else {
+        return COMPLETE;
+      }
+    }
+
+    private CompactionStatus transformSpecFilterIsUpToDate()
+    {
+      if (compactionConfig.getTransformSpec() == null) {
+        return COMPLETE;
+      }
+
+      ClientCompactionTaskTransformSpec existingTransformSpec = 
convertIfNotNull(
+          lastCompactionState.getTransformSpec(),
+          ClientCompactionTaskTransformSpec.class
+      );
+      return CompactionStatus.completeIfEqual(
+          "transformSpec filter",
+          compactionConfig.getTransformSpec().getFilter(),
+          existingTransformSpec == null ? null : 
existingTransformSpec.getFilter()
+      );
+    }
+
+    @Nullable
+    private <T> T convertIfNotNull(Object object, Class<T> clazz)
+    {
+      if (object == null) {
+        return null;
+      } else {
+        return objectMapper.convertValue(object, clazz);
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
similarity index 51%
rename from 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
rename to 
server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
index a0a656f384..f9059dca67 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
@@ -17,21 +17,13 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
-import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
-import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
-import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -39,13 +31,7 @@ import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentTimeline;
@@ -61,7 +47,6 @@ import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -70,7 +55,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -84,21 +68,19 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
 
   private final ObjectMapper objectMapper;
   private final Map<String, DataSourceCompactionConfig> compactionConfigs;
-  private final Map<String, CompactionStatistics> compactedSegments = new 
HashMap<>();
-  private final Map<String, CompactionStatistics> skippedSegments = new 
HashMap<>();
+  private final Map<String, CompactionStatistics> compactedSegmentStats = new 
HashMap<>();
+  private final Map<String, CompactionStatistics> skippedSegmentStats = new 
HashMap<>();
 
-  // dataSource -> intervalToFind
-  // searchIntervals keeps track of the current state of which interval should 
be considered to search segments to
-  // compact.
   private final Map<String, CompactibleTimelineObjectHolderCursor> 
timelineIterators;
+
   // This is needed for datasource that has segmentGranularity configured
   // If configured segmentGranularity in config is finer than current 
segmentGranularity, the same set of segments
   // can belong to multiple intervals in the timeline. We keep track of the 
compacted intervals between each
   // run of the compaction job and skip any interval that was already 
previously compacted.
   private final Map<String, Set<Interval>> intervalCompactedForDatasource = 
new HashMap<>();
 
-  private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
-      (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval, 
o1.interval)
+  private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
+      (o1, o2) -> 
Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(), 
o1.getUmbrellaInterval())
   );
 
   NewestSegmentFirstIterator(
@@ -112,11 +94,11 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     this.compactionConfigs = compactionConfigs;
     this.timelineIterators = 
Maps.newHashMapWithExpectedSize(dataSources.size());
 
-    dataSources.forEach((String dataSource, SegmentTimeline timeline) -> {
+    dataSources.forEach((dataSource, timeline) -> {
       final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSource);
       Granularity configuredSegmentGranularity = null;
       if (config != null && !timeline.isEmpty()) {
-        VersionedIntervalTimeline<String, DataSegment> originalTimeline = null;
+        SegmentTimeline originalTimeline = null;
         if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
           String temporaryVersion = DateTimes.nowUtc().toString();
           Map<Interval, Set<DataSegment>> intervalToPartitionMap = new 
HashMap<>();
@@ -175,7 +157,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       }
     });
 
-    compactionConfigs.forEach((String dataSourceName, 
DataSourceCompactionConfig config) -> {
+    compactionConfigs.forEach((dataSourceName, config) -> {
       if (config == null) {
         throw new ISE("Unknown dataSource[%s]", dataSourceName);
       }
@@ -186,13 +168,13 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   @Override
   public Map<String, CompactionStatistics> totalCompactedStatistics()
   {
-    return compactedSegments;
+    return compactedSegmentStats;
   }
 
   @Override
   public Map<String, CompactionStatistics> totalSkippedStatistics()
   {
-    return skippedSegments;
+    return skippedSegmentStats;
   }
 
   @Override
@@ -202,27 +184,24 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   }
 
   @Override
-  public List<DataSegment> next()
+  public SegmentsToCompact next()
   {
     if (!hasNext()) {
       throw new NoSuchElementException();
     }
 
-    final QueueEntry entry = queue.poll();
-
+    final SegmentsToCompact entry = queue.poll();
     if (entry == null) {
       throw new NoSuchElementException();
     }
 
-    final List<DataSegment> resultSegments = entry.segments;
-
+    final List<DataSegment> resultSegments = entry.getSegments();
     Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not 
be empty");
 
     final String dataSource = resultSegments.get(0).getDataSource();
-
     updateQueue(dataSource, compactionConfigs.get(dataSource));
 
-    return resultSegments;
+    return entry;
   }
 
   /**
@@ -232,23 +211,9 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
    */
   private void updateQueue(String dataSourceName, DataSourceCompactionConfig 
config)
   {
-    final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor = timelineIterators.get(
-        dataSourceName
-    );
-
-    if (compactibleTimelineObjectHolderCursor == null) {
-      log.warn("Cannot find timeline for dataSource[%s]. Skip this 
dataSource", dataSourceName);
-      return;
-    }
-
-    final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
-        dataSourceName,
-        compactibleTimelineObjectHolderCursor,
-        config
-    );
-
+    final SegmentsToCompact segmentsToCompact = 
findSegmentsToCompact(dataSourceName, config);
     if (!segmentsToCompact.isEmpty()) {
-      queue.add(new QueueEntry(segmentsToCompact.segments));
+      queue.add(segmentsToCompact);
     }
   }
 
@@ -260,13 +225,13 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   {
     private final List<TimelineObjectHolder<String, DataSegment>> holders;
     @Nullable
-    private final VersionedIntervalTimeline<String, DataSegment> 
originalTimeline;
+    private final SegmentTimeline originalTimeline;
 
     CompactibleTimelineObjectHolderCursor(
-        VersionedIntervalTimeline<String, DataSegment> timeline,
+        SegmentTimeline timeline,
         List<Interval> totalIntervalsToSearch,
-        // originalTimeline can be nullable if timeline was not modified
-        @Nullable VersionedIntervalTimeline<String, DataSegment> 
originalTimeline
+        // originalTimeline can be null if timeline was not modified
+        @Nullable SegmentTimeline originalTimeline
     )
     {
       this.holders = totalIntervalsToSearch
@@ -313,284 +278,93 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       List<DataSegment> candidates = 
Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
                                             .map(PartitionChunk::getObject)
                                             .collect(Collectors.toList());
-      if (originalTimeline != null) {
-        Interval umbrellaInterval = 
JodaUtils.umbrellaInterval(candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
-        return 
Lists.newArrayList(originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval,
 Partitions.ONLY_COMPLETE));
-      }
-      return candidates;
-    }
-  }
 
-  @VisibleForTesting
-  static PartitionsSpec 
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
-  {
-    final PartitionsSpec partitionsSpecFromTuningConfig = 
tuningConfig.getPartitionsSpec();
-    if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
-      return new DynamicPartitionsSpec(
-          partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
-          ((DynamicPartitionsSpec) 
partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
-      );
-    } else {
-      final long maxTotalRows = tuningConfig.getMaxTotalRows() != null
-                                ? tuningConfig.getMaxTotalRows()
-                                : Long.MAX_VALUE;
-      return partitionsSpecFromTuningConfig == null
-             ? new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), 
maxTotalRows)
-             : partitionsSpecFromTuningConfig;
-    }
-  }
-
-  private boolean needsCompaction(DataSourceCompactionConfig config, 
SegmentsToCompact candidates)
-  {
-    Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
-    final ClientCompactionTaskQueryTuningConfig tuningConfig =
-        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null);
-    final PartitionsSpec partitionsSpecFromConfig = 
findPartitionsSpecFromConfig(tuningConfig);
-    final CompactionState lastCompactionState = 
candidates.segments.get(0).getLastCompactionState();
-    if (lastCompactionState == null) {
-      log.info("Candidate segment[%s] is not compacted yet. Needs 
compaction.", candidates.segments.get(0).getId());
-      return true;
-    }
-
-    final boolean allCandidatesHaveSameLastCompactionState = candidates
-        .segments
-        .stream()
-        .allMatch(segment -> 
lastCompactionState.equals(segment.getLastCompactionState()));
-
-    if (!allCandidatesHaveSameLastCompactionState) {
-      log.info(
-          "[%s] Candidate segments were compacted with different partitions 
spec. Needs compaction.",
-          candidates.segments.size()
-      );
-      log.debugSegments(
-          candidates.segments,
-          "Candidate segments compacted with different partiton spec"
-      );
-
-      return true;
-    }
-
-    final PartitionsSpec segmentPartitionsSpec = 
lastCompactionState.getPartitionsSpec();
-    final IndexSpec segmentIndexSpec = 
objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class);
-    final IndexSpec configuredIndexSpec;
-    if (tuningConfig.getIndexSpec() == null) {
-      configuredIndexSpec = IndexSpec.DEFAULT;
-    } else {
-      configuredIndexSpec = tuningConfig.getIndexSpec();
-    }
-    if (!Objects.equals(partitionsSpecFromConfig, segmentPartitionsSpec)) {
-      log.info(
-          "Configured partitionsSpec[%s] is differenet from "
-          + "the partitionsSpec[%s] of segments. Needs compaction.",
-          partitionsSpecFromConfig,
-          segmentPartitionsSpec
-      );
-      return true;
-    }
-    // segmentIndexSpec cannot be null.
-    if (!segmentIndexSpec.equals(configuredIndexSpec)) {
-      log.info(
-          "Configured indexSpec[%s] is different from the one[%s] of segments. 
Needs compaction",
-          configuredIndexSpec,
-          segmentIndexSpec
-      );
-      return true;
-    }
-
-    if (config.getGranularitySpec() != null) {
-
-      final ClientCompactionTaskGranularitySpec existingGranularitySpec = 
lastCompactionState.getGranularitySpec() != null ?
-                                                                          
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), 
ClientCompactionTaskGranularitySpec.class) :
-                                                                          null;
-      // Checks for segmentGranularity
-      if (config.getGranularitySpec().getSegmentGranularity() != null) {
-        final Granularity existingSegmentGranularity = existingGranularitySpec 
!= null ?
-                                                       
existingGranularitySpec.getSegmentGranularity() :
-                                                       null;
-        if (existingSegmentGranularity == null) {
-          // Candidate segments were all compacted without segment granularity 
set.
-          // We need to check if all segments have the same segment 
granularity as the configured segment granularity.
-          boolean needsCompaction = candidates.segments.stream()
-                                                       .anyMatch(segment -> 
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
-          if (needsCompaction) {
-            log.info(
-                "Segments were previously compacted but without 
segmentGranularity in auto compaction."
-                + " Configured segmentGranularity[%s] is different from 
granularity implied by segment intervals. Needs compaction",
-                config.getGranularitySpec().getSegmentGranularity()
-            );
-            return true;
-          }
-
-        } else if 
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
 {
-          log.info(
-              "Configured segmentGranularity[%s] is different from the 
segmentGranularity[%s] of segments. Needs compaction",
-              config.getGranularitySpec().getSegmentGranularity(),
-              existingSegmentGranularity
-          );
-          return true;
-        }
-      }
-
-      // Checks for rollup
-      if (config.getGranularitySpec().isRollup() != null) {
-        final Boolean existingRollup = existingGranularitySpec != null ?
-                                       existingGranularitySpec.isRollup() :
-                                       null;
-        if (existingRollup == null || 
!config.getGranularitySpec().isRollup().equals(existingRollup)) {
-          log.info(
-              "Configured rollup[%s] is different from the rollup[%s] of 
segments. Needs compaction",
-              config.getGranularitySpec().isRollup(),
-              existingRollup
-          );
-          return true;
-        }
-      }
-
-      // Checks for queryGranularity
-      if (config.getGranularitySpec().getQueryGranularity() != null) {
-
-        final Granularity existingQueryGranularity = existingGranularitySpec 
!= null ?
-                                                     
existingGranularitySpec.getQueryGranularity() :
-                                                     null;
-        if 
(!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity))
 {
-          log.info(
-              "Configured queryGranularity[%s] is different from the 
queryGranularity[%s] of segments. Needs compaction",
-              config.getGranularitySpec().getQueryGranularity(),
-              existingQueryGranularity
-          );
-          return true;
-        }
-      }
-    }
-
-    if (config.getDimensionsSpec() != null) {
-      final DimensionsSpec existingDimensionsSpec = 
lastCompactionState.getDimensionsSpec();
-      // Checks for list of dimensions
-      if (config.getDimensionsSpec().getDimensions() != null) {
-        final List<DimensionSchema> existingDimensions = 
existingDimensionsSpec != null ?
-                                                         
existingDimensionsSpec.getDimensions() :
-                                                         null;
-        if 
(!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) {
-          log.info(
-              "Configured dimensionsSpec is different from the dimensionsSpec 
of segments. Needs compaction"
-          );
-          return true;
-        }
-      }
-    }
-
-    if (config.getTransformSpec() != null) {
-      final ClientCompactionTaskTransformSpec existingTransformSpec = 
lastCompactionState.getTransformSpec() != null ?
-                                                                      
objectMapper.convertValue(lastCompactionState.getTransformSpec(), 
ClientCompactionTaskTransformSpec.class) :
-                                                                      null;
-      // Checks for filters
-      if (config.getTransformSpec().getFilter() != null) {
-        final DimFilter existingFilters = existingTransformSpec != null ?
-                                          existingTransformSpec.getFilter() :
-                                          null;
-        if (!config.getTransformSpec().getFilter().equals(existingFilters)) {
-          log.info(
-              "Configured filter[%s] is different from the filter[%s] of 
segments. Needs compaction",
-              config.getTransformSpec().getFilter(),
-              existingFilters
-          );
-          return true;
-        }
-      }
-    }
-
-    if (ArrayUtils.isNotEmpty(config.getMetricsSpec())) {
-      final AggregatorFactory[] existingMetricsSpec = 
lastCompactionState.getMetricsSpec() == null || 
lastCompactionState.getMetricsSpec().isEmpty() ?
-                                                      null :
-                                                      
objectMapper.convertValue(lastCompactionState.getMetricsSpec(), 
AggregatorFactory[].class);
-      if (existingMetricsSpec == null || 
!Arrays.deepEquals(config.getMetricsSpec(), existingMetricsSpec)) {
-        log.info(
-            "Configured metricsSpec[%s] is different from the metricsSpec[%s] 
of segments. Needs compaction",
-            Arrays.toString(config.getMetricsSpec()),
-            Arrays.toString(existingMetricsSpec)
+      if (originalTimeline == null) {
+        return candidates;
+      } else {
+        Interval umbrellaInterval = JodaUtils.umbrellaInterval(
+            
candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+        );
+        return Lists.newArrayList(
+            
originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval, 
Partitions.ONLY_COMPLETE)
         );
-        return true;
       }
     }
-
-    return false;
   }
 
   /**
-   * Find segments to compact together for the given intervalToSearch. It 
progressively searches the given
-   * intervalToSearch in time order (latest first). The timeline lookup 
duration is one day. It means, the timeline is
-   * looked up for the last one day of the given intervalToSearch, and the 
next day is searched again if the size of
-   * found segments are not enough to compact. This is repeated until enough 
amount of segments are found.
+   * Finds segments to compact together for the given datasource.
    *
-   * @return segments to compact
+   * @return An empty {@link SegmentsToCompact} if there are no eligible 
candidates.
    */
   private SegmentsToCompact findSegmentsToCompact(
       final String dataSourceName,
-      final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor,
       final DataSourceCompactionConfig config
   )
   {
+    final CompactibleTimelineObjectHolderCursor 
compactibleTimelineObjectHolderCursor
+        = timelineIterators.get(dataSourceName);
+    if (compactibleTimelineObjectHolderCursor == null) {
+      log.warn("Skipping dataSource[%s] as there is no timeline for it.", 
dataSourceName);
+      return SegmentsToCompact.empty();
+    }
+
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
 
     while (compactibleTimelineObjectHolderCursor.hasNext()) {
       List<DataSegment> segments = 
compactibleTimelineObjectHolderCursor.next();
-      final SegmentsToCompact candidates = new SegmentsToCompact(segments);
-      if (!candidates.isEmpty()) {
-        final boolean isCompactibleSize = candidates.getTotalSize() <= 
inputSegmentSize;
-        final boolean needsCompaction = needsCompaction(
-            config,
-            candidates
+      if (segments.isEmpty()) {
+        throw new ISE("No segment is found?");
+      }
+
+      final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
+      final Interval interval = candidates.getUmbrellaInterval();
+
+      final CompactionStatus compactionStatus = 
CompactionStatus.of(candidates, config, objectMapper);
+      if (!compactionStatus.isComplete()) {
+        log.debug(
+            "Datasource[%s], interval[%s] has [%d] segments that need to be 
compacted because [%s].",
+            dataSourceName, interval, candidates.size(), 
compactionStatus.getReasonToCompact()
         );
+      }
 
-        if (isCompactibleSize && needsCompaction) {
-          if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
-            Interval interval = candidates.getUmbrellaInterval();
-            Set<Interval> intervalsCompacted = 
intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new 
HashSet<>());
-            // Skip this candidates if we have compacted the interval already
-            if (intervalsCompacted.contains(interval)) {
-              continue;
-            }
-            intervalsCompacted.add(interval);
-          }
-          return candidates;
+      if (compactionStatus.isComplete()) {
+        addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
+      } else if (candidates.getTotalBytes() > inputSegmentSize) {
+        addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
+        log.warn(
+            "Skipping compaction for datasource[%s], interval[%s] as total 
segment size[%d]"
+            + " is larger than allowed inputSegmentSize[%d].",
+            dataSourceName, interval, candidates.getTotalBytes(), 
inputSegmentSize
+        );
+      } else if (config.getGranularitySpec() != null
+                 && config.getGranularitySpec().getSegmentGranularity() != 
null) {
+        Set<Interval> compactedIntervals = intervalCompactedForDatasource
+            .computeIfAbsent(dataSourceName, k -> new HashSet<>());
+
+        if (compactedIntervals.contains(interval)) {
+          // Skip these candidate segments as we have already compacted this 
interval
         } else {
-          if (!needsCompaction) {
-            // Collect statistic for segments that is already compacted
-            collectSegmentStatistics(compactedSegments, dataSourceName, 
candidates);
-          } else {
-            // Collect statistic for segments that is skipped
-            // Note that if segments does not need compaction then we do not 
double count here
-            collectSegmentStatistics(skippedSegments, dataSourceName, 
candidates);
-            log.warn(
-                "total segment size[%d] for datasource[%s] and interval[%s] is 
larger than inputSegmentSize[%d]."
-                + " Continue to the next interval.",
-                candidates.getTotalSize(),
-                candidates.segments.get(0).getDataSource(),
-                candidates.segments.get(0).getInterval(),
-                inputSegmentSize
-            );
-          }
+          compactedIntervals.add(interval);
+          return candidates;
         }
       } else {
-        throw new ISE("No segment is found?");
+        return candidates;
       }
     }
-    log.info("All segments look good! Nothing to compact");
-    return new SegmentsToCompact();
+
+    log.debug("All segments look good! Nothing to compact");
+    return SegmentsToCompact.empty();
   }
 
-  private void collectSegmentStatistics(
+  private void addSegmentStatsTo(
       Map<String, CompactionStatistics> statisticsMap,
       String dataSourceName,
-      SegmentsToCompact segments)
+      SegmentsToCompact segments
+  )
   {
-    CompactionStatistics statistics = statisticsMap.computeIfAbsent(
-        dataSourceName,
-        v -> CompactionStatistics.initializeCompactionStatistics()
-    );
-    statistics.incrementCompactedByte(segments.getTotalSize());
-    statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
-    statistics.incrementCompactedSegments(segments.getNumberOfSegments());
+    statisticsMap.computeIfAbsent(dataSourceName, v -> 
CompactionStatistics.create())
+                 .addFrom(segments);
   }
 
   /**
@@ -621,10 +395,12 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
         skipIntervals
     );
 
-    // Calcuate stats of all skipped segments
+    // Collect stats for all skipped segments
     for (Interval skipInterval : fullSkipIntervals) {
-      final List<DataSegment> segments = new 
ArrayList<>(timeline.findNonOvershadowedObjectsInInterval(skipInterval, 
Partitions.ONLY_COMPLETE));
-      collectSegmentStatistics(skippedSegments, dataSourceName, new 
SegmentsToCompact(segments));
+      final List<DataSegment> segments = new ArrayList<>(
+          timeline.findNonOvershadowedObjectsInInterval(skipInterval, 
Partitions.ONLY_COMPLETE)
+      );
+      addSegmentStatsTo(skippedSegmentStats, dataSourceName, 
SegmentsToCompact.from(segments));
     }
 
     final Interval totalInterval = new 
Interval(first.getInterval().getStart(), last.getInterval().getEnd());
@@ -749,81 +525,4 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     return filteredIntervals;
   }
 
-  private static class QueueEntry
-  {
-    private final Interval interval; // whole interval for all segments
-    private final List<DataSegment> segments;
-
-    private QueueEntry(List<DataSegment> segments)
-    {
-      Preconditions.checkArgument(segments != null && !segments.isEmpty());
-      DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
-      for (DataSegment segment : segments) {
-        if (segment.getInterval().getStart().compareTo(minStart) < 0) {
-          minStart = segment.getInterval().getStart();
-        }
-        if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
-          maxEnd = segment.getInterval().getEnd();
-        }
-      }
-      this.interval = new Interval(minStart, maxEnd);
-      this.segments = segments;
-    }
-
-    private String getDataSource()
-    {
-      return segments.get(0).getDataSource();
-    }
-  }
-
-  private static class SegmentsToCompact
-  {
-    private final List<DataSegment> segments;
-    private final long totalSize;
-
-    private SegmentsToCompact()
-    {
-      this(Collections.emptyList());
-    }
-
-    private SegmentsToCompact(List<DataSegment> segments)
-    {
-      this.segments = segments;
-      this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
-    }
-
-    private boolean isEmpty()
-    {
-      return segments.isEmpty();
-    }
-
-    private long getTotalSize()
-    {
-      return totalSize;
-    }
-
-    private long getNumberOfSegments()
-    {
-      return segments.size();
-    }
-
-    private Interval getUmbrellaInterval()
-    {
-      return 
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
-    }
-
-    private long getNumberOfIntervals()
-    {
-      return 
segments.stream().map(DataSegment::getInterval).distinct().count();
-    }
-
-    @Override
-    public String toString()
-    {
-      return "SegmentsToCompact{" +
-             "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
-             ", totalSize=" + totalSize +
-             '}';
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
similarity index 97%
rename from 
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
rename to 
server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
index ce4f0e1066..20f6d92044 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
new file mode 100644
index 0000000000..1bc53b7dbe
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.compact;
+
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+/**
+ * List of segments to compact.
+ */
+public class SegmentsToCompact
+{
+  private static final SegmentsToCompact EMPTY_INSTANCE = new 
SegmentsToCompact();
+
+  private final List<DataSegment> segments;
+  private final Interval umbrellaInterval;
+  private final long totalBytes;
+  private final int numIntervals;
+
+  static SegmentsToCompact empty()
+  {
+    return EMPTY_INSTANCE;
+  }
+
+  public static SegmentsToCompact from(List<DataSegment> segments)
+  {
+    if (segments == null || segments.isEmpty()) {
+      return empty();
+    } else {
+      return new SegmentsToCompact(segments);
+    }
+  }
+
+  private SegmentsToCompact()
+  {
+    this.segments = Collections.emptyList();
+    this.totalBytes = 0L;
+    this.numIntervals = 0;
+    this.umbrellaInterval = null;
+  }
+
+  private SegmentsToCompact(List<DataSegment> segments)
+  {
+    this.segments = segments;
+    this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
+    this.umbrellaInterval = JodaUtils.umbrellaInterval(
+        
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+    );
+    this.numIntervals = (int) 
segments.stream().map(DataSegment::getInterval).distinct().count();
+  }
+
+  public List<DataSegment> getSegments()
+  {
+    return segments;
+  }
+
+  public DataSegment getFirst()
+  {
+    if (segments.isEmpty()) {
+      throw new NoSuchElementException("No segment to compact");
+    } else {
+      return segments.get(0);
+    }
+  }
+
+  public boolean isEmpty()
+  {
+    return segments.isEmpty();
+  }
+
+  public long getTotalBytes()
+  {
+    return totalBytes;
+  }
+
+  public int size()
+  {
+    return segments.size();
+  }
+
+  public Interval getUmbrellaInterval()
+  {
+    return umbrellaInterval;
+  }
+
+  public long getNumIntervals()
+  {
+    return numIntervals;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SegmentsToCompact{" +
+           "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+           ", totalSize=" + totalBytes +
+           '}';
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 6ba1e3919a..0c08da7c8d 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -45,10 +45,13 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
-import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.CompactionStatistics;
+import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
@@ -364,7 +367,8 @@ public class CompactSegments implements 
CoordinatorCustomDuty
     int numCompactionTasksAndSubtasks = 0;
 
     while (iterator.hasNext() && numCompactionTasksAndSubtasks < 
numAvailableCompactionTaskSlots) {
-      final List<DataSegment> segmentsToCompact = iterator.next();
+      final SegmentsToCompact entry = iterator.next();
+      final List<DataSegment> segmentsToCompact = entry.getSegments();
       if (segmentsToCompact.isEmpty()) {
         throw new ISE("segmentsToCompact is empty?");
       }
@@ -403,11 +407,13 @@ public class CompactSegments implements 
CoordinatorCustomDuty
           catch (IllegalArgumentException iae) {
             // This case can happen if the existing segment interval result in 
complicated periods.
             // Fall back to setting segmentGranularity as null
-            LOG.warn("Cannot determine segmentGranularity from interval [%s]", 
interval);
+            LOG.warn("Cannot determine segmentGranularity from interval[%s].", 
interval);
           }
         } else {
           LOG.warn(
-              "segmentsToCompact does not have the same interval. Fallback to 
not setting segmentGranularity for auto compaction task");
+              "Not setting 'segmentGranularity' for auto-compaction task as"
+              + " the segments to compact do not have the same interval."
+          );
         }
       } else {
         segmentGranularityToUse = 
config.getGranularitySpec().getSegmentGranularity();
@@ -478,13 +484,17 @@ public class CompactSegments implements 
CoordinatorCustomDuty
           newAutoCompactionContext(config.getTaskContext())
       );
 
-      LOG.info("Submitted a compactionTask[%s] for [%d] segments", taskId, 
segmentsToCompact.size());
-      LOG.infoSegments(segmentsToCompact, "Compacting segments");
+      LOG.info(
+          "Submitted a compaction task[%s] for [%d] segments in 
datasource[%s], umbrella interval[%s].",
+          taskId, segmentsToCompact.size(), dataSourceName, 
entry.getUmbrellaInterval()
+      );
+      LOG.debugSegments(segmentsToCompact, "Compacting segments");
       // Count the compaction task itself + its sub tasks
       numSubmittedTasks++;
       numCompactionTasksAndSubtasks += 
findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig());
     }
 
+    LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks);
     return numSubmittedTasks;
   }
 
@@ -505,7 +515,8 @@ public class CompactSegments implements 
CoordinatorCustomDuty
   {
     // Mark all the segments remaining in the iterator as "awaiting compaction"
     while (iterator.hasNext()) {
-      final List<DataSegment> segmentsToCompact = iterator.next();
+      final SegmentsToCompact entry = iterator.next();
+      final List<DataSegment> segmentsToCompact = entry.getSegments();
       if (!segmentsToCompact.isEmpty()) {
         final String dataSourceName = segmentsToCompact.get(0).getDataSource();
         AutoCompactionSnapshot.Builder snapshotBuilder = 
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@@ -536,9 +547,9 @@ public class CompactSegments implements 
CoordinatorCustomDuty
           dataSource,
           k -> new AutoCompactionSnapshot.Builder(k, 
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
       );
-      
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
-      
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
-      
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+      
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
+      
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
+      
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
     }
 
     // Statistics of all segments considered skipped after this run
@@ -550,9 +561,9 @@ public class CompactSegments implements 
CoordinatorCustomDuty
           dataSource,
           k -> new AutoCompactionSnapshot.Builder(k, 
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
       );
-      builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
-      
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
-      
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+      
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes());
+      
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments());
+      
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
     }
 
     final Map<String, AutoCompactionSnapshot> 
currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 0e75f95a1f..249dea2ce9 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -52,13 +52,13 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.ServerType;
 import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
 import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
 import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
similarity index 94%
rename from 
server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
rename to 
server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
index 35021a56a2..b4ea5d69e0 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -100,7 +100,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -190,7 +190,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, 1000L),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -235,7 +235,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -280,7 +280,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -325,7 +325,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -370,7 +370,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -415,7 +415,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
@@ -460,7 +460,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new SingleDimensionPartitionsSpec(10000, null, "dim", false),
-        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+        CompactionStatus.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), null)
         )
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
similarity index 93%
rename from 
server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
rename to 
server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
index e2df83ac9a..04f8f8993d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.InjectableValues;
@@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -58,7 +57,7 @@ import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
-import org.assertj.core.api.Assertions;
+import org.apache.druid.utils.Streams;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -72,6 +71,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -208,7 +208,7 @@ public class NewestSegmentFirstPolicyTest
 
     Interval lastInterval = null;
     while (iterator.hasNext()) {
-      final List<DataSegment> segments = iterator.next();
+      final List<DataSegment> segments = iterator.next().getSegments();
       lastInterval = segments.get(0).getInterval();
 
       Interval prevInterval = null;
@@ -264,7 +264,7 @@ public class NewestSegmentFirstPolicyTest
 
     Interval lastInterval = null;
     while (iterator.hasNext()) {
-      final List<DataSegment> segments = iterator.next();
+      final List<DataSegment> segments = iterator.next().getSegments();
       lastInterval = segments.get(0).getInterval();
 
       Interval prevInterval = null;
@@ -352,9 +352,13 @@ public class NewestSegmentFirstPolicyTest
     );
     expectedSegmentsToCompact2.sort(Comparator.naturalOrder());
 
-    Assertions.assertThat(iterator)
-              .toIterable()
-              .containsExactly(expectedSegmentsToCompact, 
expectedSegmentsToCompact2);
+    Set<List<DataSegment>> observedSegments = 
Streams.sequentialStreamFrom(iterator)
+                                                     
.map(SegmentsToCompact::getSegments)
+                                                     
.collect(Collectors.toSet());
+    Assert.assertEquals(
+        observedSegments,
+        ImmutableSet.of(expectedSegmentsToCompact, expectedSegmentsToCompact2)
+    );
   }
 
   @Test
@@ -419,7 +423,13 @@ public class NewestSegmentFirstPolicyTest
     );
 
     Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+    Set<DataSegment> observedSegmentsToCompact = 
Streams.sequentialStreamFrom(iterator)
+                                                        .flatMap(s -> 
s.getSegments().stream())
+                                                        
.collect(Collectors.toSet());
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        observedSegmentsToCompact
+    );
   }
 
   @Test
@@ -446,7 +456,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     Assert.assertTrue(iterator.hasNext());
-    List<DataSegment> actual = iterator.next();
+    List<DataSegment> actual = iterator.next().getSegments();
     Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
     Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
     Assert.assertFalse(iterator.hasNext());
@@ -472,7 +482,13 @@ public class NewestSegmentFirstPolicyTest
     );
 
     Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+    Set<DataSegment> observedSegmentsToCompact = 
Streams.sequentialStreamFrom(iterator)
+                                                        .flatMap(s -> 
s.getSegments().stream())
+                                                        
.collect(Collectors.toSet());
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        observedSegmentsToCompact
+    );
   }
 
   @Test
@@ -585,7 +601,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // Month of Nov
     Assert.assertTrue(iterator.hasNext());
@@ -594,7 +610,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // Month of Oct
     Assert.assertTrue(iterator.hasNext());
@@ -603,7 +619,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -631,7 +647,7 @@ public class NewestSegmentFirstPolicyTest
         
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"),
 Partitions.ONLY_COMPLETE)
     );
     Assert.assertTrue(iterator.hasNext());
-    List<DataSegment> actual = iterator.next();
+    List<DataSegment> actual = iterator.next().getSegments();
     Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
     Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
     // Month of Jan
@@ -639,7 +655,7 @@ public class NewestSegmentFirstPolicyTest
         
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"),
 Partitions.ONLY_COMPLETE)
     );
     Assert.assertTrue(iterator.hasNext());
-    actual = iterator.next();
+    actual = iterator.next().getSegments();
     Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
     Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(actual));
     // No more
@@ -663,7 +679,10 @@ public class NewestSegmentFirstPolicyTest
         
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
 Partitions.ONLY_COMPLETE)
     );
     Assert.assertTrue(iterator.hasNext());
-    Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact), 
ImmutableSet.copyOf(iterator.next()));
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next().getSegments())
+    );
     // Iterator should return only once since all the "minute" interval of the 
iterator contains the same interval
     Assert.assertFalse(iterator.hasNext());
   }
@@ -689,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -701,7 +720,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -734,7 +753,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -767,7 +786,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -798,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -810,7 +829,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -841,7 +860,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -853,7 +872,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -893,7 +912,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -905,7 +924,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -944,7 +963,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -956,7 +975,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -996,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1004,7 +1023,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1016,7 +1035,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // queryGranularity=DAY for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1056,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1064,7 +1083,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1076,7 +1095,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // Dimensions=["foo", "bar"] for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1130,7 +1149,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1138,7 +1157,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1146,7 +1165,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1175,7 +1194,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // filter=SelectorDimFilter("dim1", "foo", null) for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1250,7 +1269,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1258,7 +1277,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1266,7 +1285,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1299,7 +1318,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // metricsSpec={CountAggregatorFactory("cnt")} for interval 
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1374,7 +1393,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1382,7 +1401,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     Assert.assertTrue(iterator.hasNext());
     expectedSegmentsToCompact = new ArrayList<>(
@@ -1390,7 +1409,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1436,7 +1455,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1448,7 +1467,7 @@ public class NewestSegmentFirstPolicyTest
     // Different indexSpec as what is set in the auto compaction config
     IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new 
ConciseBitmapSerdeFactory()).build();
     Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, 
new TypeReference<Map<String, Object>>() {});
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final SegmentTimeline timeline = createTimeline(
@@ -1487,7 +1506,7 @@ public class NewestSegmentFirstPolicyTest
     );
     Assert.assertEquals(
         ImmutableSet.copyOf(expectedSegmentsToCompact),
-        ImmutableSet.copyOf(iterator.next())
+        ImmutableSet.copyOf(iterator.next().getSegments())
     );
     // No more
     Assert.assertFalse(iterator.hasNext());
@@ -1497,7 +1516,7 @@ public class NewestSegmentFirstPolicyTest
   public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
   {
     NullHandling.initializeForTests();
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
+    PartitionsSpec partitionsSpec = 
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null, null));
     final SegmentTimeline timeline = createTimeline(
         new SegmentGenerateSpec(
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@@ -1691,7 +1710,7 @@ public class NewestSegmentFirstPolicyTest
   {
     Interval expectedSegmentIntervalStart = to;
     while (iterator.hasNext()) {
-      final List<DataSegment> segments = iterator.next();
+      final List<DataSegment> segments = iterator.next().getSegments();
 
       final Interval firstInterval = segments.get(0).getInterval();
       Assert.assertTrue(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 1843adf9c2..6884d25975 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -75,6 +75,8 @@ import 
org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.CompactionState;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 5ad7ffab36..776c2f836c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -47,9 +47,9 @@ import 
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyF
 import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.balancer.DiskNormalizedCostBalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
 import org.apache.druid.server.coordinator.rules.Rule;
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index dcc8c1a95a..5327f0b3a7 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -81,7 +81,8 @@ import 
org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
+import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@@ -92,7 +93,6 @@ import 
org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
 import org.apache.druid.server.coordinator.duty.KillRules;
 import org.apache.druid.server.coordinator.duty.KillSupervisors;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CompactionResource;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to