This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c211dcc4b3 Clean up compaction logs on coordinator (#14875)
c211dcc4b3 is described below
commit c211dcc4b310eaafe0c9acc78d3d84c24a6770c9
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 21 17:30:41 2023 +0530
Clean up compaction logs on coordinator (#14875)
Changes:
- Move logic of `NewestSegmentFirstIterator.needsCompaction` to
`CompactionStatus`
to improve testability and readability
- Capture the list of checks performed to determine if compaction is needed
in a readable
manner in `CompactionStatus.CHECKS`
- Make `CompactionSegmentIterator` iterate over instances of
`SegmentsToCompact`
instead of `List<DataSegment>`. This allows use of the `umbrellaInterval`
later.
- Replace usages of `QueueEntry` with `SegmentsToCompact`
- Move `SegmentsToCompact` out of `NewestSegmentFirstIterator`
- Simplify `CompactionStatistics`
- Reduce level of less important logs to debug
- No change made to tests to ensure correctness
---
.../NewestSegmentFirstPolicyBenchmark.java | 9 +-
.../apache/druid/indexing/common/task/Tasks.java | 2 +-
.../server/coordinator/CompactionStatistics.java | 73 ----
.../druid/server/coordinator/DruidCoordinator.java | 2 +-
.../CompactionSegmentIterator.java | 6 +-
.../CompactionSegmentSearchPolicy.java | 3 +-
.../CompactionStatistics.java} | 51 ++-
.../coordinator/compact/CompactionStatus.java | 352 ++++++++++++++++
.../NewestSegmentFirstIterator.java | 467 ++++-----------------
.../NewestSegmentFirstPolicy.java | 2 +-
.../coordinator/compact/SegmentsToCompact.java | 123 ++++++
.../server/coordinator/duty/CompactSegments.java | 37 +-
.../server/coordinator/DruidCoordinatorTest.java | 2 +-
.../NewestSegmentFirstIteratorTest.java | 20 +-
.../NewestSegmentFirstPolicyTest.java | 121 +++---
.../coordinator/duty/CompactSegmentsTest.java | 2 +
.../simulate/CoordinatorSimulationBuilder.java | 4 +-
.../java/org/apache/druid/cli/CliCoordinator.java | 4 +-
18 files changed, 713 insertions(+), 567 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 5310033521..52a6e0d975 100644
---
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
+import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -142,8 +142,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs,
dataSources, Collections.emptyMap());
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
- final List<DataSegment> segments = iterator.next();
- blackhole.consume(segments);
+ blackhole.consume(iterator.next());
}
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index 90b7526977..28c80ac300 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -68,7 +68,7 @@ public class Tasks
* This context is used in compaction. When it is set in the context, the
segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to
track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
- * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for
more details.
+ * org.apache.druid.server.coordinator.compact.NewestSegmentFirstIterator}
for more details.
*/
public static final String STORE_COMPACTION_STATE_KEY =
"storeCompactionState";
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
b/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
deleted file mode 100644
index 676630ef09..0000000000
---
a/server/src/main/java/org/apache/druid/server/coordinator/CompactionStatistics.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator;
-
-public class CompactionStatistics
-{
- private long byteSum;
- private long segmentNumberCountSum;
- private long segmentIntervalCountSum;
-
- public CompactionStatistics(
- long byteSum,
- long segmentNumberCountSum,
- long segmentIntervalCountSum
- )
- {
- this.byteSum = byteSum;
- this.segmentNumberCountSum = segmentNumberCountSum;
- this.segmentIntervalCountSum = segmentIntervalCountSum;
- }
-
- public static CompactionStatistics initializeCompactionStatistics()
- {
- return new CompactionStatistics(0, 0, 0);
- }
-
- public long getByteSum()
- {
- return byteSum;
- }
-
- public long getSegmentNumberCountSum()
- {
- return segmentNumberCountSum;
- }
-
- public long getSegmentIntervalCountSum()
- {
- return segmentIntervalCountSum;
- }
-
- public void incrementCompactedByte(long incrementValue)
- {
- byteSum = byteSum + incrementValue;
- }
-
- public void incrementCompactedSegments(long incrementValue)
- {
- segmentNumberCountSum = segmentNumberCountSum + incrementValue;
- }
-
- public void incrementCompactedIntervals(long incrementValue)
- {
- segmentIntervalCountSum = segmentIntervalCountSum + incrementValue;
- }
-}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 363a5758d0..832f7790ad 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -62,10 +62,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
import org.apache.druid.server.coordinator.duty.CompactSegments;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
similarity index 90%
rename from
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
rename to
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
index 64f5c16a17..bab7ca8f92 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentIterator.java
@@ -17,20 +17,18 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
-import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.timeline.DataSegment;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
/**
* Segments in the lists which are the elements of this iterator are sorted
according to the natural segment order
* (see {@link DataSegment#compareTo}).
*/
-public interface CompactionSegmentIterator extends Iterator<List<DataSegment>>
+public interface CompactionSegmentIterator extends Iterator<SegmentsToCompact>
{
/**
* Return a map of dataSourceName to CompactionStatistics.
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
similarity index 92%
copy from
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
copy to
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
index 2cbaf31d69..5a006908c3 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
similarity index 53%
rename from
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
rename to
server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
index 2cbaf31d69..dd672ce448 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatistics.java
@@ -17,26 +17,41 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
-
-import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.timeline.SegmentTimeline;
-import org.joda.time.Interval;
-
-import java.util.List;
-import java.util.Map;
+package org.apache.druid.server.coordinator.compact;
/**
- * Segment searching policy used by {@link CompactSegments}.
+ * Used to track statistics for segments in different states of compaction.
*/
-public interface CompactionSegmentSearchPolicy
+public class CompactionStatistics
{
- /**
- * Reset the current states of this policy. This method should be called
whenever iterating starts.
- */
- CompactionSegmentIterator reset(
- Map<String, DataSourceCompactionConfig> compactionConfigs,
- Map<String, SegmentTimeline> dataSources,
- Map<String, List<Interval>> skipIntervals
- );
+ private long totalBytes;
+ private long numSegments;
+ private long numIntervals;
+
+ public static CompactionStatistics create()
+ {
+ return new CompactionStatistics();
+ }
+
+ public long getTotalBytes()
+ {
+ return totalBytes;
+ }
+
+ public long getNumSegments()
+ {
+ return numSegments;
+ }
+
+ public long getNumIntervals()
+ {
+ return numIntervals;
+ }
+
+ public void addFrom(SegmentsToCompact segments)
+ {
+ totalBytes += segments.getTotalBytes();
+ numIntervals += segments.getNumIntervals();
+ numSegments += segments.size();
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
new file mode 100644
index 0000000000..862f2e7c5b
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.compact;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.utils.CollectionUtils;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Represents the status of compaction for a given list of candidate segments.
+ */
+public class CompactionStatus
+{
+ private static final CompactionStatus COMPLETE = new CompactionStatus(true,
null);
+
+ /**
+ * List of checks performed to determine if compaction is already complete.
+ * <p>
+ * The order of the checks must be honored while evaluating them.
+ */
+ private static final List<Function<Evaluator, CompactionStatus>> CHECKS =
Arrays.asList(
+ Evaluator::segmentsHaveBeenCompactedAtLeastOnce,
+ Evaluator::allCandidatesHaveSameCompactionState,
+ Evaluator::partitionsSpecIsUpToDate,
+ Evaluator::indexSpecIsUpToDate,
+ Evaluator::segmentGranularityIsUpToDate,
+ Evaluator::queryGranularityIsUpToDate,
+ Evaluator::rollupIsUpToDate,
+ Evaluator::dimensionsSpecIsUpToDate,
+ Evaluator::metricsSpecIsUpToDate,
+ Evaluator::transformSpecFilterIsUpToDate
+ );
+
+ private final boolean complete;
+ private final String reasonToCompact;
+
+ private CompactionStatus(boolean complete, String reason)
+ {
+ this.complete = complete;
+ this.reasonToCompact = reason;
+ }
+
+ public boolean isComplete()
+ {
+ return complete;
+ }
+
+ public String getReasonToCompact()
+ {
+ return reasonToCompact;
+ }
+
+ private static CompactionStatus incomplete(String reasonFormat, Object...
args)
+ {
+ return new CompactionStatus(false, StringUtils.format(reasonFormat, args));
+ }
+
+ private static CompactionStatus completeIfEqual(String field, Object
configured, Object current)
+ {
+ if (configured == null || configured.equals(current)) {
+ return COMPLETE;
+ } else {
+ return configChanged(field, configured, current);
+ }
+ }
+
+ private static CompactionStatus configChanged(String field, Object
configured, Object current)
+ {
+ return CompactionStatus.incomplete(
+ "Configured %s[%s] is different from current %s[%s]",
+ field, configured, field, current
+ );
+ }
+
+ /**
+ * Determines the CompactionStatus of the given candidate segments by
evaluating
+ * the {@link #CHECKS} one by one. If any check returns an incomplete status,
+ * further checks are not performed and the incomplete status is returned.
+ */
+ static CompactionStatus of(
+ SegmentsToCompact candidateSegments,
+ DataSourceCompactionConfig config,
+ ObjectMapper objectMapper
+ )
+ {
+ final Evaluator evaluator = new Evaluator(candidateSegments, config,
objectMapper);
+ return CHECKS.stream().map(f -> f.apply(evaluator))
+ .filter(status -> !status.isComplete())
+ .findFirst().orElse(COMPLETE);
+ }
+
+ static PartitionsSpec
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
+ {
+ final PartitionsSpec partitionsSpecFromTuningConfig =
tuningConfig.getPartitionsSpec();
+ if (partitionsSpecFromTuningConfig == null) {
+ final long maxTotalRows =
Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
+ return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(),
maxTotalRows);
+ } else if (partitionsSpecFromTuningConfig instanceof
DynamicPartitionsSpec) {
+ return new DynamicPartitionsSpec(
+ partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
+ ((DynamicPartitionsSpec)
partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
+ );
+ } else {
+ return partitionsSpecFromTuningConfig;
+ }
+ }
+
+ /**
+ * Evaluates {@link #CHECKS} to determine the compaction status.
+ */
+ private static class Evaluator
+ {
+ private final ObjectMapper objectMapper;
+ private final DataSourceCompactionConfig compactionConfig;
+ private final SegmentsToCompact candidateSegments;
+ private final CompactionState lastCompactionState;
+ private final ClientCompactionTaskQueryTuningConfig tuningConfig;
+ private final ClientCompactionTaskGranularitySpec existingGranularitySpec;
+ private final UserCompactionTaskGranularityConfig
configuredGranularitySpec;
+
+ private Evaluator(
+ SegmentsToCompact candidateSegments,
+ DataSourceCompactionConfig compactionConfig,
+ ObjectMapper objectMapper
+ )
+ {
+ Preconditions.checkArgument(!candidateSegments.isEmpty(), "Empty
candidates");
+
+ this.candidateSegments = candidateSegments;
+ this.objectMapper = objectMapper;
+ this.lastCompactionState =
candidateSegments.getFirst().getLastCompactionState();
+ this.compactionConfig = compactionConfig;
+ this.tuningConfig = ClientCompactionTaskQueryTuningConfig.from(
+ compactionConfig.getTuningConfig(),
+ compactionConfig.getMaxRowsPerSegment(),
+ null
+ );
+
+ this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
+ if (lastCompactionState == null) {
+ this.existingGranularitySpec = null;
+ } else {
+ this.existingGranularitySpec = convertIfNotNull(
+ lastCompactionState.getGranularitySpec(),
+ ClientCompactionTaskGranularitySpec.class
+ );
+ }
+ }
+
+ private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
+ {
+ if (lastCompactionState == null) {
+ return CompactionStatus.incomplete("Not compacted yet");
+ } else {
+ return COMPLETE;
+ }
+ }
+
+ private CompactionStatus allCandidatesHaveSameCompactionState()
+ {
+ final boolean allHaveSameCompactionState =
candidateSegments.getSegments().stream().allMatch(
+ segment ->
lastCompactionState.equals(segment.getLastCompactionState())
+ );
+ if (allHaveSameCompactionState) {
+ return COMPLETE;
+ } else {
+ return CompactionStatus.incomplete("Candidate segments have different
last compaction states.");
+ }
+ }
+
+ private CompactionStatus partitionsSpecIsUpToDate()
+ {
+ return CompactionStatus.completeIfEqual(
+ "partitionsSpec",
+ findPartitionsSpecFromConfig(tuningConfig),
+ lastCompactionState.getPartitionsSpec()
+ );
+ }
+
+ private CompactionStatus indexSpecIsUpToDate()
+ {
+ return CompactionStatus.completeIfEqual(
+ "indexSpec",
+ Configs.valueOrDefault(tuningConfig.getIndexSpec(),
IndexSpec.DEFAULT),
+ objectMapper.convertValue(lastCompactionState.getIndexSpec(),
IndexSpec.class)
+ );
+ }
+
+ private CompactionStatus segmentGranularityIsUpToDate()
+ {
+ if (configuredGranularitySpec == null
+ || configuredGranularitySpec.getSegmentGranularity() == null) {
+ return COMPLETE;
+ }
+
+ final Granularity configuredSegmentGranularity =
configuredGranularitySpec.getSegmentGranularity();
+ final Granularity existingSegmentGranularity
+ = existingGranularitySpec == null ? null :
existingGranularitySpec.getSegmentGranularity();
+
+ if (configuredSegmentGranularity.equals(existingSegmentGranularity)) {
+ return COMPLETE;
+ } else if (existingSegmentGranularity == null) {
+ // Candidate segments were compacted without segment granularity
specified
+ // Check if the segments already have the desired segment granularity
+ boolean needsCompaction =
candidateSegments.getSegments().stream().anyMatch(
+ segment ->
!configuredSegmentGranularity.isAligned(segment.getInterval())
+ );
+ if (needsCompaction) {
+ return CompactionStatus.incomplete(
+ "Configured segmentGranularity[%s] does not align with segment
intervals.",
+ configuredSegmentGranularity
+ );
+ }
+ } else {
+ return CompactionStatus.configChanged(
+ "segmentGranularity",
+ configuredSegmentGranularity,
+ existingSegmentGranularity
+ );
+ }
+
+ return COMPLETE;
+ }
+
+ private CompactionStatus rollupIsUpToDate()
+ {
+ if (configuredGranularitySpec == null) {
+ return COMPLETE;
+ } else {
+ return CompactionStatus.completeIfEqual(
+ "rollup",
+ configuredGranularitySpec.isRollup(),
+ existingGranularitySpec == null ? null :
existingGranularitySpec.isRollup()
+ );
+ }
+ }
+
+ private CompactionStatus queryGranularityIsUpToDate()
+ {
+ if (configuredGranularitySpec == null) {
+ return COMPLETE;
+ } else {
+ return CompactionStatus.completeIfEqual(
+ "queryGranularity",
+ configuredGranularitySpec.getQueryGranularity(),
+ existingGranularitySpec == null ? null :
existingGranularitySpec.getQueryGranularity()
+ );
+ }
+ }
+
+ private CompactionStatus dimensionsSpecIsUpToDate()
+ {
+ if (compactionConfig.getDimensionsSpec() == null) {
+ return COMPLETE;
+ } else {
+ final DimensionsSpec existingDimensionsSpec =
lastCompactionState.getDimensionsSpec();
+ return CompactionStatus.completeIfEqual(
+ "dimensionsSpec",
+ compactionConfig.getDimensionsSpec().getDimensions(),
+ existingDimensionsSpec == null ? null :
existingDimensionsSpec.getDimensions()
+ );
+ }
+ }
+
+ private CompactionStatus metricsSpecIsUpToDate()
+ {
+ final AggregatorFactory[] configuredMetricsSpec =
compactionConfig.getMetricsSpec();
+ if (ArrayUtils.isEmpty(configuredMetricsSpec)) {
+ return COMPLETE;
+ }
+
+ final List<Object> metricSpecList = lastCompactionState.getMetricsSpec();
+ final AggregatorFactory[] existingMetricsSpec
+ = CollectionUtils.isNullOrEmpty(metricSpecList)
+ ? null : objectMapper.convertValue(metricSpecList,
AggregatorFactory[].class);
+
+ if (existingMetricsSpec == null ||
!Arrays.deepEquals(configuredMetricsSpec, existingMetricsSpec)) {
+ return CompactionStatus.configChanged(
+ "metricsSpec",
+ Arrays.toString(configuredMetricsSpec),
+ Arrays.toString(existingMetricsSpec)
+ );
+ } else {
+ return COMPLETE;
+ }
+ }
+
+ private CompactionStatus transformSpecFilterIsUpToDate()
+ {
+ if (compactionConfig.getTransformSpec() == null) {
+ return COMPLETE;
+ }
+
+ ClientCompactionTaskTransformSpec existingTransformSpec =
convertIfNotNull(
+ lastCompactionState.getTransformSpec(),
+ ClientCompactionTaskTransformSpec.class
+ );
+ return CompactionStatus.completeIfEqual(
+ "transformSpec filter",
+ compactionConfig.getTransformSpec().getFilter(),
+ existingTransformSpec == null ? null :
existingTransformSpec.getFilter()
+ );
+ }
+
+ @Nullable
+ private <T> T convertIfNotNull(Object object, Class<T> clazz)
+ {
+ if (object == null) {
+ return null;
+ } else {
+ return objectMapper.convertValue(object, clazz);
+ }
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
similarity index 51%
rename from
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
rename to
server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
index a0a656f384..f9059dca67 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIterator.java
@@ -17,21 +17,13 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
-import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
-import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
-import org.apache.druid.data.input.impl.DimensionSchema;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -39,13 +31,7 @@ import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
-import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
@@ -61,7 +47,6 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -70,7 +55,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
@@ -84,21 +68,19 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
private final ObjectMapper objectMapper;
private final Map<String, DataSourceCompactionConfig> compactionConfigs;
- private final Map<String, CompactionStatistics> compactedSegments = new
HashMap<>();
- private final Map<String, CompactionStatistics> skippedSegments = new
HashMap<>();
+ private final Map<String, CompactionStatistics> compactedSegmentStats = new
HashMap<>();
+ private final Map<String, CompactionStatistics> skippedSegmentStats = new
HashMap<>();
- // dataSource -> intervalToFind
- // searchIntervals keeps track of the current state of which interval should
be considered to search segments to
- // compact.
private final Map<String, CompactibleTimelineObjectHolderCursor>
timelineIterators;
+
// This is needed for datasource that has segmentGranularity configured
// If configured segmentGranularity in config is finer than current
segmentGranularity, the same set of segments
// can belong to multiple intervals in the timeline. We keep track of the
compacted intervals between each
// run of the compaction job and skip any interval that was already
previously compacted.
private final Map<String, Set<Interval>> intervalCompactedForDatasource =
new HashMap<>();
- private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
- (o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval,
o1.interval)
+ private final PriorityQueue<SegmentsToCompact> queue = new PriorityQueue<>(
+ (o1, o2) ->
Comparators.intervalsByStartThenEnd().compare(o2.getUmbrellaInterval(),
o1.getUmbrellaInterval())
);
NewestSegmentFirstIterator(
@@ -112,11 +94,11 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
this.compactionConfigs = compactionConfigs;
this.timelineIterators =
Maps.newHashMapWithExpectedSize(dataSources.size());
- dataSources.forEach((String dataSource, SegmentTimeline timeline) -> {
+ dataSources.forEach((dataSource, timeline) -> {
final DataSourceCompactionConfig config =
compactionConfigs.get(dataSource);
Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
- VersionedIntervalTimeline<String, DataSegment> originalTimeline = null;
+ SegmentTimeline originalTimeline = null;
if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
String temporaryVersion = DateTimes.nowUtc().toString();
Map<Interval, Set<DataSegment>> intervalToPartitionMap = new
HashMap<>();
@@ -175,7 +157,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
});
- compactionConfigs.forEach((String dataSourceName,
DataSourceCompactionConfig config) -> {
+ compactionConfigs.forEach((dataSourceName, config) -> {
if (config == null) {
throw new ISE("Unknown dataSource[%s]", dataSourceName);
}
@@ -186,13 +168,13 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
@Override
public Map<String, CompactionStatistics> totalCompactedStatistics()
{
- return compactedSegments;
+ return compactedSegmentStats;
}
@Override
public Map<String, CompactionStatistics> totalSkippedStatistics()
{
- return skippedSegments;
+ return skippedSegmentStats;
}
@Override
@@ -202,27 +184,24 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
@Override
- public List<DataSegment> next()
+ public SegmentsToCompact next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
- final QueueEntry entry = queue.poll();
-
+ final SegmentsToCompact entry = queue.poll();
if (entry == null) {
throw new NoSuchElementException();
}
- final List<DataSegment> resultSegments = entry.segments;
-
+ final List<DataSegment> resultSegments = entry.getSegments();
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not
be empty");
final String dataSource = resultSegments.get(0).getDataSource();
-
updateQueue(dataSource, compactionConfigs.get(dataSource));
- return resultSegments;
+ return entry;
}
/**
@@ -232,23 +211,9 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
*/
private void updateQueue(String dataSourceName, DataSourceCompactionConfig
config)
{
- final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor = timelineIterators.get(
- dataSourceName
- );
-
- if (compactibleTimelineObjectHolderCursor == null) {
- log.warn("Cannot find timeline for dataSource[%s]. Skip this
dataSource", dataSourceName);
- return;
- }
-
- final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(
- dataSourceName,
- compactibleTimelineObjectHolderCursor,
- config
- );
-
+ final SegmentsToCompact segmentsToCompact =
findSegmentsToCompact(dataSourceName, config);
if (!segmentsToCompact.isEmpty()) {
- queue.add(new QueueEntry(segmentsToCompact.segments));
+ queue.add(segmentsToCompact);
}
}
@@ -260,13 +225,13 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
@Nullable
- private final VersionedIntervalTimeline<String, DataSegment>
originalTimeline;
+ private final SegmentTimeline originalTimeline;
CompactibleTimelineObjectHolderCursor(
- VersionedIntervalTimeline<String, DataSegment> timeline,
+ SegmentTimeline timeline,
List<Interval> totalIntervalsToSearch,
- // originalTimeline can be nullable if timeline was not modified
- @Nullable VersionedIntervalTimeline<String, DataSegment>
originalTimeline
+ // originalTimeline can be null if timeline was not modified
+ @Nullable SegmentTimeline originalTimeline
)
{
this.holders = totalIntervalsToSearch
@@ -313,284 +278,93 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
List<DataSegment> candidates =
Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
.collect(Collectors.toList());
- if (originalTimeline != null) {
- Interval umbrellaInterval =
JodaUtils.umbrellaInterval(candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
- return
Lists.newArrayList(originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval,
Partitions.ONLY_COMPLETE));
- }
- return candidates;
- }
- }
- @VisibleForTesting
- static PartitionsSpec
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
- {
- final PartitionsSpec partitionsSpecFromTuningConfig =
tuningConfig.getPartitionsSpec();
- if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
- return new DynamicPartitionsSpec(
- partitionsSpecFromTuningConfig.getMaxRowsPerSegment(),
- ((DynamicPartitionsSpec)
partitionsSpecFromTuningConfig).getMaxTotalRowsOr(Long.MAX_VALUE)
- );
- } else {
- final long maxTotalRows = tuningConfig.getMaxTotalRows() != null
- ? tuningConfig.getMaxTotalRows()
- : Long.MAX_VALUE;
- return partitionsSpecFromTuningConfig == null
- ? new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(),
maxTotalRows)
- : partitionsSpecFromTuningConfig;
- }
- }
-
- private boolean needsCompaction(DataSourceCompactionConfig config,
SegmentsToCompact candidates)
- {
- Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
- final ClientCompactionTaskQueryTuningConfig tuningConfig =
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null);
- final PartitionsSpec partitionsSpecFromConfig =
findPartitionsSpecFromConfig(tuningConfig);
- final CompactionState lastCompactionState =
candidates.segments.get(0).getLastCompactionState();
- if (lastCompactionState == null) {
- log.info("Candidate segment[%s] is not compacted yet. Needs
compaction.", candidates.segments.get(0).getId());
- return true;
- }
-
- final boolean allCandidatesHaveSameLastCompactionState = candidates
- .segments
- .stream()
- .allMatch(segment ->
lastCompactionState.equals(segment.getLastCompactionState()));
-
- if (!allCandidatesHaveSameLastCompactionState) {
- log.info(
- "[%s] Candidate segments were compacted with different partitions
spec. Needs compaction.",
- candidates.segments.size()
- );
- log.debugSegments(
- candidates.segments,
- "Candidate segments compacted with different partiton spec"
- );
-
- return true;
- }
-
- final PartitionsSpec segmentPartitionsSpec =
lastCompactionState.getPartitionsSpec();
- final IndexSpec segmentIndexSpec =
objectMapper.convertValue(lastCompactionState.getIndexSpec(), IndexSpec.class);
- final IndexSpec configuredIndexSpec;
- if (tuningConfig.getIndexSpec() == null) {
- configuredIndexSpec = IndexSpec.DEFAULT;
- } else {
- configuredIndexSpec = tuningConfig.getIndexSpec();
- }
- if (!Objects.equals(partitionsSpecFromConfig, segmentPartitionsSpec)) {
- log.info(
- "Configured partitionsSpec[%s] is differenet from "
- + "the partitionsSpec[%s] of segments. Needs compaction.",
- partitionsSpecFromConfig,
- segmentPartitionsSpec
- );
- return true;
- }
- // segmentIndexSpec cannot be null.
- if (!segmentIndexSpec.equals(configuredIndexSpec)) {
- log.info(
- "Configured indexSpec[%s] is different from the one[%s] of segments.
Needs compaction",
- configuredIndexSpec,
- segmentIndexSpec
- );
- return true;
- }
-
- if (config.getGranularitySpec() != null) {
-
- final ClientCompactionTaskGranularitySpec existingGranularitySpec =
lastCompactionState.getGranularitySpec() != null ?
-
objectMapper.convertValue(lastCompactionState.getGranularitySpec(),
ClientCompactionTaskGranularitySpec.class) :
- null;
- // Checks for segmentGranularity
- if (config.getGranularitySpec().getSegmentGranularity() != null) {
- final Granularity existingSegmentGranularity = existingGranularitySpec
!= null ?
-
existingGranularitySpec.getSegmentGranularity() :
- null;
- if (existingSegmentGranularity == null) {
- // Candidate segments were all compacted without segment granularity
set.
- // We need to check if all segments have the same segment
granularity as the configured segment granularity.
- boolean needsCompaction = candidates.segments.stream()
- .anyMatch(segment ->
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
- if (needsCompaction) {
- log.info(
- "Segments were previously compacted but without
segmentGranularity in auto compaction."
- + " Configured segmentGranularity[%s] is different from
granularity implied by segment intervals. Needs compaction",
- config.getGranularitySpec().getSegmentGranularity()
- );
- return true;
- }
-
- } else if
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
{
- log.info(
- "Configured segmentGranularity[%s] is different from the
segmentGranularity[%s] of segments. Needs compaction",
- config.getGranularitySpec().getSegmentGranularity(),
- existingSegmentGranularity
- );
- return true;
- }
- }
-
- // Checks for rollup
- if (config.getGranularitySpec().isRollup() != null) {
- final Boolean existingRollup = existingGranularitySpec != null ?
- existingGranularitySpec.isRollup() :
- null;
- if (existingRollup == null ||
!config.getGranularitySpec().isRollup().equals(existingRollup)) {
- log.info(
- "Configured rollup[%s] is different from the rollup[%s] of
segments. Needs compaction",
- config.getGranularitySpec().isRollup(),
- existingRollup
- );
- return true;
- }
- }
-
- // Checks for queryGranularity
- if (config.getGranularitySpec().getQueryGranularity() != null) {
-
- final Granularity existingQueryGranularity = existingGranularitySpec
!= null ?
-
existingGranularitySpec.getQueryGranularity() :
- null;
- if
(!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity))
{
- log.info(
- "Configured queryGranularity[%s] is different from the
queryGranularity[%s] of segments. Needs compaction",
- config.getGranularitySpec().getQueryGranularity(),
- existingQueryGranularity
- );
- return true;
- }
- }
- }
-
- if (config.getDimensionsSpec() != null) {
- final DimensionsSpec existingDimensionsSpec =
lastCompactionState.getDimensionsSpec();
- // Checks for list of dimensions
- if (config.getDimensionsSpec().getDimensions() != null) {
- final List<DimensionSchema> existingDimensions =
existingDimensionsSpec != null ?
-
existingDimensionsSpec.getDimensions() :
- null;
- if
(!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) {
- log.info(
- "Configured dimensionsSpec is different from the dimensionsSpec
of segments. Needs compaction"
- );
- return true;
- }
- }
- }
-
- if (config.getTransformSpec() != null) {
- final ClientCompactionTaskTransformSpec existingTransformSpec =
lastCompactionState.getTransformSpec() != null ?
-
objectMapper.convertValue(lastCompactionState.getTransformSpec(),
ClientCompactionTaskTransformSpec.class) :
- null;
- // Checks for filters
- if (config.getTransformSpec().getFilter() != null) {
- final DimFilter existingFilters = existingTransformSpec != null ?
- existingTransformSpec.getFilter() :
- null;
- if (!config.getTransformSpec().getFilter().equals(existingFilters)) {
- log.info(
- "Configured filter[%s] is different from the filter[%s] of
segments. Needs compaction",
- config.getTransformSpec().getFilter(),
- existingFilters
- );
- return true;
- }
- }
- }
-
- if (ArrayUtils.isNotEmpty(config.getMetricsSpec())) {
- final AggregatorFactory[] existingMetricsSpec =
lastCompactionState.getMetricsSpec() == null ||
lastCompactionState.getMetricsSpec().isEmpty() ?
- null :
-
objectMapper.convertValue(lastCompactionState.getMetricsSpec(),
AggregatorFactory[].class);
- if (existingMetricsSpec == null ||
!Arrays.deepEquals(config.getMetricsSpec(), existingMetricsSpec)) {
- log.info(
- "Configured metricsSpec[%s] is different from the metricsSpec[%s]
of segments. Needs compaction",
- Arrays.toString(config.getMetricsSpec()),
- Arrays.toString(existingMetricsSpec)
+ if (originalTimeline == null) {
+ return candidates;
+ } else {
+ Interval umbrellaInterval = JodaUtils.umbrellaInterval(
+
candidates.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ );
+ return Lists.newArrayList(
+
originalTimeline.findNonOvershadowedObjectsInInterval(umbrellaInterval,
Partitions.ONLY_COMPLETE)
);
- return true;
}
}
-
- return false;
}
/**
- * Find segments to compact together for the given intervalToSearch. It
progressively searches the given
- * intervalToSearch in time order (latest first). The timeline lookup
duration is one day. It means, the timeline is
- * looked up for the last one day of the given intervalToSearch, and the
next day is searched again if the size of
- * found segments are not enough to compact. This is repeated until enough
amount of segments are found.
+ * Finds segments to compact together for the given datasource.
*
- * @return segments to compact
+ * @return An empty {@link SegmentsToCompact} if there are no eligible
candidates.
*/
private SegmentsToCompact findSegmentsToCompact(
final String dataSourceName,
- final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor,
final DataSourceCompactionConfig config
)
{
+ final CompactibleTimelineObjectHolderCursor
compactibleTimelineObjectHolderCursor
+ = timelineIterators.get(dataSourceName);
+ if (compactibleTimelineObjectHolderCursor == null) {
+ log.warn("Skipping dataSource[%s] as there is no timeline for it.",
dataSourceName);
+ return SegmentsToCompact.empty();
+ }
+
final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleTimelineObjectHolderCursor.hasNext()) {
List<DataSegment> segments =
compactibleTimelineObjectHolderCursor.next();
- final SegmentsToCompact candidates = new SegmentsToCompact(segments);
- if (!candidates.isEmpty()) {
- final boolean isCompactibleSize = candidates.getTotalSize() <=
inputSegmentSize;
- final boolean needsCompaction = needsCompaction(
- config,
- candidates
+ if (segments.isEmpty()) {
+ throw new ISE("No segment is found?");
+ }
+
+ final SegmentsToCompact candidates = SegmentsToCompact.from(segments);
+ final Interval interval = candidates.getUmbrellaInterval();
+
+ final CompactionStatus compactionStatus =
CompactionStatus.of(candidates, config, objectMapper);
+ if (!compactionStatus.isComplete()) {
+ log.debug(
+ "Datasource[%s], interval[%s] has [%d] segments that need to be
compacted because [%s].",
+ dataSourceName, interval, candidates.size(),
compactionStatus.getReasonToCompact()
);
+ }
- if (isCompactibleSize && needsCompaction) {
- if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
- Interval interval = candidates.getUmbrellaInterval();
- Set<Interval> intervalsCompacted =
intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new
HashSet<>());
- // Skip this candidates if we have compacted the interval already
- if (intervalsCompacted.contains(interval)) {
- continue;
- }
- intervalsCompacted.add(interval);
- }
- return candidates;
+ if (compactionStatus.isComplete()) {
+ addSegmentStatsTo(compactedSegmentStats, dataSourceName, candidates);
+ } else if (candidates.getTotalBytes() > inputSegmentSize) {
+ addSegmentStatsTo(skippedSegmentStats, dataSourceName, candidates);
+ log.warn(
+ "Skipping compaction for datasource[%s], interval[%s] as total
segment size[%d]"
+ + " is larger than allowed inputSegmentSize[%d].",
+ dataSourceName, interval, candidates.getTotalBytes(),
inputSegmentSize
+ );
+ } else if (config.getGranularitySpec() != null
+ && config.getGranularitySpec().getSegmentGranularity() !=
null) {
+ Set<Interval> compactedIntervals = intervalCompactedForDatasource
+ .computeIfAbsent(dataSourceName, k -> new HashSet<>());
+
+ if (compactedIntervals.contains(interval)) {
+ // Skip these candidate segments as we have already compacted this
interval
} else {
- if (!needsCompaction) {
- // Collect statistic for segments that is already compacted
- collectSegmentStatistics(compactedSegments, dataSourceName,
candidates);
- } else {
- // Collect statistic for segments that is skipped
- // Note that if segments does not need compaction then we do not
double count here
- collectSegmentStatistics(skippedSegments, dataSourceName,
candidates);
- log.warn(
- "total segment size[%d] for datasource[%s] and interval[%s] is
larger than inputSegmentSize[%d]."
- + " Continue to the next interval.",
- candidates.getTotalSize(),
- candidates.segments.get(0).getDataSource(),
- candidates.segments.get(0).getInterval(),
- inputSegmentSize
- );
- }
+ compactedIntervals.add(interval);
+ return candidates;
}
} else {
- throw new ISE("No segment is found?");
+ return candidates;
}
}
- log.info("All segments look good! Nothing to compact");
- return new SegmentsToCompact();
+
+ log.debug("All segments look good! Nothing to compact");
+ return SegmentsToCompact.empty();
}
- private void collectSegmentStatistics(
+ private void addSegmentStatsTo(
Map<String, CompactionStatistics> statisticsMap,
String dataSourceName,
- SegmentsToCompact segments)
+ SegmentsToCompact segments
+ )
{
- CompactionStatistics statistics = statisticsMap.computeIfAbsent(
- dataSourceName,
- v -> CompactionStatistics.initializeCompactionStatistics()
- );
- statistics.incrementCompactedByte(segments.getTotalSize());
- statistics.incrementCompactedIntervals(segments.getNumberOfIntervals());
- statistics.incrementCompactedSegments(segments.getNumberOfSegments());
+ statisticsMap.computeIfAbsent(dataSourceName, v ->
CompactionStatistics.create())
+ .addFrom(segments);
}
/**
@@ -621,10 +395,12 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
skipIntervals
);
- // Calcuate stats of all skipped segments
+ // Collect stats for all skipped segments
for (Interval skipInterval : fullSkipIntervals) {
- final List<DataSegment> segments = new
ArrayList<>(timeline.findNonOvershadowedObjectsInInterval(skipInterval,
Partitions.ONLY_COMPLETE));
- collectSegmentStatistics(skippedSegments, dataSourceName, new
SegmentsToCompact(segments));
+ final List<DataSegment> segments = new ArrayList<>(
+ timeline.findNonOvershadowedObjectsInInterval(skipInterval,
Partitions.ONLY_COMPLETE)
+ );
+ addSegmentStatsTo(skippedSegmentStats, dataSourceName,
SegmentsToCompact.from(segments));
}
final Interval totalInterval = new
Interval(first.getInterval().getStart(), last.getInterval().getEnd());
@@ -749,81 +525,4 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
return filteredIntervals;
}
- private static class QueueEntry
- {
- private final Interval interval; // whole interval for all segments
- private final List<DataSegment> segments;
-
- private QueueEntry(List<DataSegment> segments)
- {
- Preconditions.checkArgument(segments != null && !segments.isEmpty());
- DateTime minStart = DateTimes.MAX, maxEnd = DateTimes.MIN;
- for (DataSegment segment : segments) {
- if (segment.getInterval().getStart().compareTo(minStart) < 0) {
- minStart = segment.getInterval().getStart();
- }
- if (segment.getInterval().getEnd().compareTo(maxEnd) > 0) {
- maxEnd = segment.getInterval().getEnd();
- }
- }
- this.interval = new Interval(minStart, maxEnd);
- this.segments = segments;
- }
-
- private String getDataSource()
- {
- return segments.get(0).getDataSource();
- }
- }
-
- private static class SegmentsToCompact
- {
- private final List<DataSegment> segments;
- private final long totalSize;
-
- private SegmentsToCompact()
- {
- this(Collections.emptyList());
- }
-
- private SegmentsToCompact(List<DataSegment> segments)
- {
- this.segments = segments;
- this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
- }
-
- private boolean isEmpty()
- {
- return segments.isEmpty();
- }
-
- private long getTotalSize()
- {
- return totalSize;
- }
-
- private long getNumberOfSegments()
- {
- return segments.size();
- }
-
- private Interval getUmbrellaInterval()
- {
- return
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
- }
-
- private long getNumberOfIntervals()
- {
- return
segments.stream().map(DataSegment::getInterval).distinct().count();
- }
-
- @Override
- public String toString()
- {
- return "SegmentsToCompact{" +
- "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
- ", totalSize=" + totalSize +
- '}';
- }
- }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
similarity index 97%
rename from
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
rename to
server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
index ce4f0e1066..20f6d92044 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
new file mode 100644
index 0000000000..1bc53b7dbe
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/compact/SegmentsToCompact.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.compact;
+
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+/**
+ * List of segments to compact.
+ */
+public class SegmentsToCompact
+{
+ private static final SegmentsToCompact EMPTY_INSTANCE = new
SegmentsToCompact();
+
+ private final List<DataSegment> segments;
+ private final Interval umbrellaInterval;
+ private final long totalBytes;
+ private final int numIntervals;
+
+ static SegmentsToCompact empty()
+ {
+ return EMPTY_INSTANCE;
+ }
+
+ public static SegmentsToCompact from(List<DataSegment> segments)
+ {
+ if (segments == null || segments.isEmpty()) {
+ return empty();
+ } else {
+ return new SegmentsToCompact(segments);
+ }
+ }
+
+ private SegmentsToCompact()
+ {
+ this.segments = Collections.emptyList();
+ this.totalBytes = 0L;
+ this.numIntervals = 0;
+ this.umbrellaInterval = null;
+ }
+
+ private SegmentsToCompact(List<DataSegment> segments)
+ {
+ this.segments = segments;
+ this.totalBytes = segments.stream().mapToLong(DataSegment::getSize).sum();
+ this.umbrellaInterval = JodaUtils.umbrellaInterval(
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList())
+ );
+ this.numIntervals = (int)
segments.stream().map(DataSegment::getInterval).distinct().count();
+ }
+
+ public List<DataSegment> getSegments()
+ {
+ return segments;
+ }
+
+ public DataSegment getFirst()
+ {
+ if (segments.isEmpty()) {
+ throw new NoSuchElementException("No segment to compact");
+ } else {
+ return segments.get(0);
+ }
+ }
+
+ public boolean isEmpty()
+ {
+ return segments.isEmpty();
+ }
+
+ public long getTotalBytes()
+ {
+ return totalBytes;
+ }
+
+ public int size()
+ {
+ return segments.size();
+ }
+
+ public Interval getUmbrellaInterval()
+ {
+ return umbrellaInterval;
+ }
+
+ public long getNumIntervals()
+ {
+ return numIntervals;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentsToCompact{" +
+ "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+ ", totalSize=" + totalBytes +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 6ba1e3919a..0c08da7c8d 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -45,10 +45,13 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
-import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.compact.CompactionSegmentIterator;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.CompactionStatistics;
+import org.apache.druid.server.coordinator.compact.SegmentsToCompact;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
@@ -364,7 +367,8 @@ public class CompactSegments implements
CoordinatorCustomDuty
int numCompactionTasksAndSubtasks = 0;
while (iterator.hasNext() && numCompactionTasksAndSubtasks <
numAvailableCompactionTaskSlots) {
- final List<DataSegment> segmentsToCompact = iterator.next();
+ final SegmentsToCompact entry = iterator.next();
+ final List<DataSegment> segmentsToCompact = entry.getSegments();
if (segmentsToCompact.isEmpty()) {
throw new ISE("segmentsToCompact is empty?");
}
@@ -403,11 +407,13 @@ public class CompactSegments implements
CoordinatorCustomDuty
catch (IllegalArgumentException iae) {
// This case can happen if the existing segment interval result in
complicated periods.
// Fall back to setting segmentGranularity as null
- LOG.warn("Cannot determine segmentGranularity from interval [%s]",
interval);
+ LOG.warn("Cannot determine segmentGranularity from interval[%s].",
interval);
}
} else {
LOG.warn(
- "segmentsToCompact does not have the same interval. Fallback to
not setting segmentGranularity for auto compaction task");
+ "Not setting 'segmentGranularity' for auto-compaction task as"
+ + " the segments to compact do not have the same interval."
+ );
}
} else {
segmentGranularityToUse =
config.getGranularitySpec().getSegmentGranularity();
@@ -478,13 +484,17 @@ public class CompactSegments implements
CoordinatorCustomDuty
newAutoCompactionContext(config.getTaskContext())
);
- LOG.info("Submitted a compactionTask[%s] for [%d] segments", taskId,
segmentsToCompact.size());
- LOG.infoSegments(segmentsToCompact, "Compacting segments");
+ LOG.info(
+ "Submitted a compaction task[%s] for [%d] segments in
datasource[%s], umbrella interval[%s].",
+ taskId, segmentsToCompact.size(), dataSourceName,
entry.getUmbrellaInterval()
+ );
+ LOG.debugSegments(segmentsToCompact, "Compacting segments");
// Count the compaction task itself + its sub tasks
numSubmittedTasks++;
numCompactionTasksAndSubtasks +=
findMaxNumTaskSlotsUsedByOneCompactionTask(config.getTuningConfig());
}
+ LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks);
return numSubmittedTasks;
}
@@ -505,7 +515,8 @@ public class CompactSegments implements
CoordinatorCustomDuty
{
// Mark all the segments remaining in the iterator as "awaiting compaction"
while (iterator.hasNext()) {
- final List<DataSegment> segmentsToCompact = iterator.next();
+ final SegmentsToCompact entry = iterator.next();
+ final List<DataSegment> segmentsToCompact = entry.getSegments();
if (!segmentsToCompact.isEmpty()) {
final String dataSourceName = segmentsToCompact.get(0).getDataSource();
AutoCompactionSnapshot.Builder snapshotBuilder =
currentRunAutoCompactionSnapshotBuilders.computeIfAbsent(
@@ -536,9 +547,9 @@ public class CompactSegments implements
CoordinatorCustomDuty
dataSource,
k -> new AutoCompactionSnapshot.Builder(k,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
-
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getByteSum());
-
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getSegmentNumberCountSum());
-
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getSegmentIntervalCountSum());
+
builder.incrementBytesCompacted(dataSourceCompactedStatistics.getTotalBytes());
+
builder.incrementSegmentCountCompacted(dataSourceCompactedStatistics.getNumSegments());
+
builder.incrementIntervalCountCompacted(dataSourceCompactedStatistics.getNumIntervals());
}
// Statistics of all segments considered skipped after this run
@@ -550,9 +561,9 @@ public class CompactSegments implements
CoordinatorCustomDuty
dataSource,
k -> new AutoCompactionSnapshot.Builder(k,
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING)
);
- builder.incrementBytesSkipped(dataSourceSkippedStatistics.getByteSum());
-
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getSegmentNumberCountSum());
-
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getSegmentIntervalCountSum());
+
builder.incrementBytesSkipped(dataSourceSkippedStatistics.getTotalBytes());
+
builder.incrementSegmentCountSkipped(dataSourceSkippedStatistics.getNumSegments());
+
builder.incrementIntervalCountSkipped(dataSourceSkippedStatistics.getNumIntervals());
}
final Map<String, AutoCompactionSnapshot>
currentAutoCompactionSnapshotPerDataSource = new HashMap<>();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 0e75f95a1f..249dea2ce9 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -52,13 +52,13 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
similarity index 94%
rename from
server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
rename to
server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
index 35021a56a2..b4ea5d69e0 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstIteratorTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -100,7 +100,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -190,7 +190,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -235,7 +235,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -280,7 +280,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -325,7 +325,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -370,7 +370,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -415,7 +415,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
@@ -460,7 +460,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
- NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
+ CompactionStatus.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment(), null)
)
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
similarity index 93%
rename from
server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
rename to
server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
index e2df83ac9a..04f8f8993d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.server.coordinator.duty;
+package org.apache.druid.server.coordinator.compact;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
@@ -26,7 +26,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -58,7 +57,7 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
-import org.assertj.core.api.Assertions;
+import org.apache.druid.utils.Streams;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -72,6 +71,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
@@ -208,7 +208,7 @@ public class NewestSegmentFirstPolicyTest
Interval lastInterval = null;
while (iterator.hasNext()) {
- final List<DataSegment> segments = iterator.next();
+ final List<DataSegment> segments = iterator.next().getSegments();
lastInterval = segments.get(0).getInterval();
Interval prevInterval = null;
@@ -264,7 +264,7 @@ public class NewestSegmentFirstPolicyTest
Interval lastInterval = null;
while (iterator.hasNext()) {
- final List<DataSegment> segments = iterator.next();
+ final List<DataSegment> segments = iterator.next().getSegments();
lastInterval = segments.get(0).getInterval();
Interval prevInterval = null;
@@ -352,9 +352,13 @@ public class NewestSegmentFirstPolicyTest
);
expectedSegmentsToCompact2.sort(Comparator.naturalOrder());
- Assertions.assertThat(iterator)
- .toIterable()
- .containsExactly(expectedSegmentsToCompact,
expectedSegmentsToCompact2);
+ Set<List<DataSegment>> observedSegments =
Streams.sequentialStreamFrom(iterator)
+
.map(SegmentsToCompact::getSegments)
+
.collect(Collectors.toSet());
+ Assert.assertEquals(
+ observedSegments,
+ ImmutableSet.of(expectedSegmentsToCompact, expectedSegmentsToCompact2)
+ );
}
@Test
@@ -419,7 +423,13 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+ Set<DataSegment> observedSegmentsToCompact =
Streams.sequentialStreamFrom(iterator)
+ .flatMap(s ->
s.getSegments().stream())
+
.collect(Collectors.toSet());
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ observedSegmentsToCompact
+ );
}
@Test
@@ -446,7 +456,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
- List<DataSegment> actual = iterator.next();
+ List<DataSegment> actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
Assert.assertFalse(iterator.hasNext());
@@ -472,7 +482,13 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+ Set<DataSegment> observedSegmentsToCompact =
Streams.sequentialStreamFrom(iterator)
+ .flatMap(s ->
s.getSegments().stream())
+
.collect(Collectors.toSet());
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ observedSegmentsToCompact
+ );
}
@Test
@@ -585,7 +601,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// Month of Nov
Assert.assertTrue(iterator.hasNext());
@@ -594,7 +610,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// Month of Oct
Assert.assertTrue(iterator.hasNext());
@@ -603,7 +619,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -631,7 +647,7 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"),
Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
- List<DataSegment> actual = iterator.next();
+ List<DataSegment> actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
// Month of Jan
@@ -639,7 +655,7 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"),
Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
- actual = iterator.next();
+ actual = iterator.next().getSegments();
Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
// No more
@@ -663,7 +679,10 @@ public class NewestSegmentFirstPolicyTest
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
Partitions.ONLY_COMPLETE)
);
Assert.assertTrue(iterator.hasNext());
- Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next()));
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next().getSegments())
+ );
// Iterator should return only once since all the "minute" interval of the
iterator contains the same interval
Assert.assertFalse(iterator.hasNext());
}
@@ -689,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -701,7 +720,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -734,7 +753,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -767,7 +786,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -798,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -810,7 +829,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -841,7 +860,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -853,7 +872,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -893,7 +912,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -905,7 +924,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -944,7 +963,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -956,7 +975,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -996,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1004,7 +1023,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1016,7 +1035,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1056,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1064,7 +1083,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1076,7 +1095,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
// Dimensions=["foo", "bar"] for interval
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1130,7 +1149,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1138,7 +1157,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1146,7 +1165,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1175,7 +1194,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
// filter=SelectorDimFilter("dim1", "foo", null) for interval
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1250,7 +1269,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1258,7 +1277,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1266,7 +1285,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1299,7 +1318,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
// metricsSpec={CountAggregatorFactory("cnt")} for interval
2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1374,7 +1393,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1382,7 +1401,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
Assert.assertTrue(iterator.hasNext());
expectedSegmentsToCompact = new ArrayList<>(
@@ -1390,7 +1409,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1436,7 +1455,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1448,7 +1467,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = IndexSpec.builder().withBitmapSerdeFactory(new
ConciseBitmapSerdeFactory()).build();
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec,
new TypeReference<Map<String, Object>>() {});
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final SegmentTimeline timeline = createTimeline(
@@ -1487,7 +1506,7 @@ public class NewestSegmentFirstPolicyTest
);
Assert.assertEquals(
ImmutableSet.copyOf(expectedSegmentsToCompact),
- ImmutableSet.copyOf(iterator.next())
+ ImmutableSet.copyOf(iterator.next().getSegments())
);
// No more
Assert.assertFalse(iterator.hasNext());
@@ -1497,7 +1516,7 @@ public class NewestSegmentFirstPolicyTest
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
{
NullHandling.initializeForTests();
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
+ PartitionsSpec partitionsSpec =
CompactionStatus.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null, null));
final SegmentTimeline timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
@@ -1691,7 +1710,7 @@ public class NewestSegmentFirstPolicyTest
{
Interval expectedSegmentIntervalStart = to;
while (iterator.hasNext()) {
- final List<DataSegment> segments = iterator.next();
+ final List<DataSegment> segments = iterator.next().getSegments();
final Interval firstInterval = segments.get(0).getInterval();
Assert.assertTrue(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 1843adf9c2..6884d25975 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -75,6 +75,8 @@ import
org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.CompactionState;
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 5ad7ffab36..776c2f836c 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -47,9 +47,9 @@ import
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyF
import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
import
org.apache.druid.server.coordinator.balancer.DiskNormalizedCostBalancerStrategyFactory;
import
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.rules.Rule;
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index dcc8c1a95a..5327f0b3a7 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -81,7 +81,8 @@ import
org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
import
org.apache.druid.server.coordinator.balancer.CachingCostBalancerStrategyConfig;
-import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
+import
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
+import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@@ -92,7 +93,6 @@ import
org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
-import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]