This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6541178 Support segmentGranularity for auto-compaction (#10843)
6541178 is described below
commit 6541178c21839530a42af4b4675a9bc680bffca6
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Feb 12 03:03:20 2021 -0800
Support segmentGranularity for auto-compaction (#10843)
* Support segmentGranularity for auto-compaction
* Support segmentGranularity for auto-compaction
* Support segmentGranularity for auto-compaction
* Support segmentGranularity for auto-compaction
* resolve conflict
* Support segmentGranularity for auto-compaction
* Support segmentGranularity for auto-compaction
* fix tests
* fix more tests
* fix checkstyle
* add unit tests
* fix checkstyle
* fix checkstyle
* fix checkstyle
* add unit tests
* add integration tests
* fix checkstyle
* fix checkstyle
* fix failing tests
* address comments
* address comments
* fix tests
* fix tests
* fix test
* fix test
* fix test
* fix test
* fix test
* fix test
* fix test
* fix test
---
.../NewestSegmentFirstPolicyBenchmark.java | 1 +
.../org/apache/druid/timeline/CompactionState.java | 19 +-
.../druid/timeline/partition/PartitionChunk.java | 2 +-
.../org/apache/druid/timeline/DataSegmentTest.java | 4 +-
.../common/task/AbstractBatchIndexTask.java | 6 +-
.../druid/indexing/common/task/CompactionTask.java | 35 ++-
.../druid/indexing/common/task/IndexTask.java | 3 +-
.../parallel/ParallelIndexSupervisorTask.java | 3 +-
.../task/ClientCompactionTaskQuerySerdeTest.java | 18 ++
.../common/task/CompactionTaskParallelRunTest.java | 84 +++++--
.../common/task/CompactionTaskRunTest.java | 81 +++++--
.../indexing/common/task/CompactionTaskTest.java | 41 ++++
.../coordinator/duty/ITAutoCompactionTest.java | 80 ++++++-
.../druid/tests/indexer/ITCompactionTaskTest.java | 75 +++++--
...edia_compaction_task_with_granularity_spec.json | 17 ++
...a_compaction_task_with_segment_granularity.json | 15 ++
.../client/indexing/ClientCompactionTaskQuery.java | 14 +-
.../ClientCompactionTaskQueryGranularitySpec.java | 95 ++++++++
.../client/indexing/HttpIndexingServiceClient.java | 4 +-
.../client/indexing/IndexingServiceClient.java | 1 +
.../granularity/ArbitraryGranularitySpec.java | 1 -
.../indexing/granularity/BaseGranularitySpec.java | 22 +-
.../indexing/granularity/GranularitySpec.java | 4 +
.../granularity/UniformGranularitySpec.java | 5 -
.../coordinator/DataSourceCompactionConfig.java | 34 +++
.../server/coordinator/duty/CompactSegments.java | 33 +++
.../duty/NewestSegmentFirstIterator.java | 133 +++++++++++-
.../client/indexing/NoopIndexingServiceClient.java | 1 +
.../granularity/ArbitraryGranularityTest.java | 27 +++
.../granularity/UniformGranularityTest.java | 33 ++-
.../DataSourceCompactionConfigTest.java | 98 +++++++++
.../coordinator/duty/CompactSegmentsTest.java | 241 ++++++++++++++++++++-
.../duty/NewestSegmentFirstIteratorTest.java | 10 +
.../duty/NewestSegmentFirstPolicyTest.java | 216 ++++++++++++++++--
.../druid/sql/calcite/schema/SystemSchemaTest.java | 3 +-
35 files changed, 1336 insertions(+), 123 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 91c1409..e744bf9 100644
---
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -99,6 +99,7 @@ public class NewestSegmentFirstPolicyBenchmark
null,
null,
null,
+ null,
null
)
);
diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java
b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
index c30f427..8588717 100644
--- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java
+++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
@@ -43,15 +43,20 @@ public class CompactionState
// org.apache.druid.segment.IndexSpec cannot be used here because it's in
the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;
+ // org.apache.druid.segment.indexing.granularity.GranularitySpec cannot be
used here because it's in the
+ // 'server' module which has a dependency on the 'core' module where this
class is.
+ private final Map<String, Object> granularitySpec;
@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
- @JsonProperty("indexSpec") Map<String, Object> indexSpec
+ @JsonProperty("indexSpec") Map<String, Object> indexSpec,
+ @JsonProperty("granularitySpec") Map<String, Object> granularitySpec
)
{
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec;
+ this.granularitySpec = granularitySpec;
}
@JsonProperty
@@ -66,6 +71,12 @@ public class CompactionState
return indexSpec;
}
+ @JsonProperty
+ public Map<String, Object> getGranularitySpec()
+ {
+ return granularitySpec;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -77,13 +88,14 @@ public class CompactionState
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
- Objects.equals(indexSpec, that.indexSpec);
+ Objects.equals(indexSpec, that.indexSpec) &&
+ Objects.equals(granularitySpec, that.granularitySpec);
}
@Override
public int hashCode()
{
- return Objects.hash(partitionsSpec, indexSpec);
+ return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
}
@Override
@@ -92,6 +104,7 @@ public class CompactionState
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
+ ", granularitySpec=" + granularitySpec +
'}';
}
}
diff --git
a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
index 10b43b8..54aaf03 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionChunk.java
@@ -58,7 +58,7 @@ public interface PartitionChunk<T> extends
Comparable<PartitionChunk<T>>
* Returns true if this chunk is the end of the partition. Most commonly,
that means it represents the range
* [X, infinity] for some concrete X.
*
- * @return true if the chunk is the beginning of the partition
+ * @return true if the chunk is the end of the partition
*/
boolean isEnd();
diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 5483642..66c3400 100644
--- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -120,6 +120,7 @@ public class DataSegmentTest
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+ ImmutableMap.of(),
ImmutableMap.of()
),
TEST_VERSION,
@@ -231,7 +232,8 @@ public class DataSegmentTest
{
final CompactionState compactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
- Collections.singletonMap("test", "map")
+ Collections.singletonMap("test", "map"),
+ Collections.singletonMap("test2", "map2")
);
final DataSegment segment1 = DataSegment.builder()
.dataSource("foo")
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 4a7fa0d..6f7cdf6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -473,12 +473,14 @@ public abstract class AbstractBatchIndexTask extends
AbstractTask
public static Function<Set<DataSegment>, Set<DataSegment>>
compactionStateAnnotateFunction(
boolean storeCompactionState,
TaskToolbox toolbox,
- IndexTuningConfig tuningConfig
+ IndexTuningConfig tuningConfig,
+ GranularitySpec granularitySpec
)
{
if (storeCompactionState) {
final Map<String, Object> indexSpecMap =
tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
- final CompactionState compactionState = new
CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap);
+ final Map<String, Object> granularitySpecMap =
granularitySpec.asMap(toolbox.getJsonMapper());
+ final CompactionState compactionState = new
CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap,
granularitySpecMap);
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 530a400..5d971c3 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -141,6 +141,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final Granularity segmentGranularity;
@Nullable
+ private final GranularitySpec granularitySpec;
+ @Nullable
private final ParallelIndexTuningConfig tuningConfig;
@JsonIgnore
private final SegmentProvider segmentProvider;
@@ -172,7 +174,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec
dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[]
metricsSpec,
- @JsonProperty("segmentGranularity") @Nullable final Granularity
segmentGranularity,
+ @JsonProperty("segmentGranularity") @Deprecated @Nullable final
Granularity segmentGranularity,
+ @JsonProperty("granularitySpec") @Nullable final GranularitySpec
granularitySpec,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@@ -202,6 +205,16 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
+ if (granularitySpec == null && segmentGranularity != null) {
+ this.granularitySpec = new UniformGranularitySpec(
+ segmentGranularity,
+ null,
+ null,
+ null
+ );
+ } else {
+ this.granularitySpec = granularitySpec;
+ }
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) :
null;
this.segmentProvider = new SegmentProvider(dataSource,
this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new
PartitionConfigurationManager(this.tuningConfig);
@@ -288,7 +301,14 @@ public class CompactionTask extends AbstractBatchIndexTask
@Override
public Granularity getSegmentGranularity()
{
- return segmentGranularity;
+ return granularitySpec == null ? null :
granularitySpec.getSegmentGranularity();
+ }
+
+ @JsonProperty
+ @Nullable
+ public GranularitySpec getGranularitySpec()
+ {
+ return granularitySpec;
}
@Nullable
@@ -348,7 +368,7 @@ public class CompactionTask extends AbstractBatchIndexTask
partitionConfigurationManager,
dimensionsSpec,
metricsSpec,
- segmentGranularity,
+ getSegmentGranularity(),
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
retryPolicyFactory
@@ -892,6 +912,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private Granularity segmentGranularity;
@Nullable
+ private GranularitySpec granularitySpec;
+ @Nullable
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;
@@ -941,6 +963,12 @@ public class CompactionTask extends AbstractBatchIndexTask
return this;
}
+ public Builder granularitySpec(GranularitySpec granularitySpec)
+ {
+ this.granularitySpec = granularitySpec;
+ return this;
+ }
+
public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
@@ -966,6 +994,7 @@ public class CompactionTask extends AbstractBatchIndexTask
dimensionsSpec,
metricsSpec,
segmentGranularity,
+ granularitySpec,
tuningConfig,
context,
segmentLoaderFactory,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 88779ab..517c08a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -904,7 +904,8 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
- ingestionSchema.getTuningConfig()
+ ingestionSchema.getTuningConfig(),
+ ingestionSchema.getDataSchema().getGranularitySpec()
);
// Probably we can publish atomicUpdateGroup along with segments.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index bd64fbd..af4352b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -929,7 +929,8 @@ public class ParallelIndexSupervisorTask extends
AbstractBatchIndexTask implemen
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction =
compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
- ingestionSchema.getTuningConfig()
+ ingestionSchema.getTuningConfig(),
+ ingestionSchema.getDataSchema().getGranularitySpec()
);
final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten,
segmentsToPublish, commitMetadata) ->
toolbox.getTaskActionClient().submit(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index ef3cf68..1e9628f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient;
@@ -45,11 +46,13 @@ import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningC
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
@@ -113,6 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
+ new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY,
Granularities.HOUR, true),
ImmutableMap.of("key", "value")
);
@@ -186,6 +190,18 @@ public class ClientCompactionTaskQuerySerdeTest
query.getTuningConfig().getTotalNumMergeTasks().intValue(),
task.getTuningConfig().getTotalNumMergeTasks()
);
+ Assert.assertEquals(
+ query.getGranularitySpec().getQueryGranularity(),
+ task.getGranularitySpec().getQueryGranularity()
+ );
+ Assert.assertEquals(
+ query.getGranularitySpec().getSegmentGranularity(),
+ task.getGranularitySpec().getSegmentGranularity()
+ );
+ Assert.assertEquals(
+ query.getGranularitySpec().isRollup(),
+ task.getGranularitySpec().isRollup()
+ );
Assert.assertEquals(query.getContext(), task.getContext());
}
@@ -243,6 +259,7 @@ public class ClientCompactionTaskQuerySerdeTest
null
)
)
+ .granularitySpec(new UniformGranularitySpec(Granularities.DAY,
Granularities.HOUR, null))
.build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -284,6 +301,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
+ new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY,
Granularities.HOUR, true),
new HashMap<>()
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 7b7005b..7fe8781 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -129,7 +129,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
}
@Test
- public void testRunParallelWithDynamicPartitioningMatchCompactionState()
+ public void testRunParallelWithDynamicPartitioningMatchCompactionState()
throws Exception
{
runIndexTask(null, true);
@@ -144,22 +144,33 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
- final CompactionState expectedState = new CompactionState(
- new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
- );
for (DataSegment segment : compactedSegments) {
Assert.assertSame(
lockGranularity == LockGranularity.TIME_CHUNK ?
NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
segment.getShardSpec().getClass()
);
// Expect compaction state to exist as store compaction state by default
+ CompactionState expectedState = new CompactionState(
+ new DynamicPartitionsSpec(null, Long.MAX_VALUE),
+
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+ getObjectMapper().readValue(
+ getObjectMapper().writeValueAsString(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ true,
+ ImmutableList.of(segment.getInterval())
+ )
+ ),
+ Map.class
+ )
+ );
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
- public void testRunParallelWithHashPartitioningMatchCompactionState()
+ public void testRunParallelWithHashPartitioningMatchCompactionState() throws
Exception
{
// Hash partitioning is not supported with segment lock yet
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -176,19 +187,30 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
- final CompactionState expectedState = new CompactionState(
- new HashedPartitionsSpec(null, 3, null),
-
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
- );
for (DataSegment segment : compactedSegments) {
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(HashBasedNumberedShardSpec.class,
segment.getShardSpec().getClass());
+ CompactionState expectedState = new CompactionState(
+ new HashedPartitionsSpec(null, 3, null),
+
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+ getObjectMapper().readValue(
+ getObjectMapper().writeValueAsString(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ true,
+ ImmutableList.of(segment.getInterval())
+ )
+ ),
+ Map.class
+ )
+ );
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
- public void testRunParallelWithRangePartitioning()
+ public void testRunParallelWithRangePartitioning() throws Exception
{
// Range partitioning is not supported with segment lock yet
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -205,19 +227,30 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
- final CompactionState expectedState = new CompactionState(
- new SingleDimensionPartitionsSpec(7, null, "dim", false),
-
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
- );
for (DataSegment segment : compactedSegments) {
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class,
segment.getShardSpec().getClass());
+ CompactionState expectedState = new CompactionState(
+ new SingleDimensionPartitionsSpec(7, null, "dim", false),
+
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+ getObjectMapper().readValue(
+ getObjectMapper().writeValueAsString(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ true,
+ ImmutableList.of(segment.getInterval())
+ )
+ ),
+ Map.class
+ )
+ );
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
@Test
- public void testRunParallelWithRangePartitioningWithSingleTask()
+ public void testRunParallelWithRangePartitioningWithSingleTask() throws
Exception
{
// Range partitioning is not supported with segment lock yet
Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
@@ -234,13 +267,24 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
- final CompactionState expectedState = new CompactionState(
- new SingleDimensionPartitionsSpec(7, null, "dim", false),
-
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
- );
for (DataSegment segment : compactedSegments) {
// Expect compaction state to exist as store compaction state by default
Assert.assertSame(SingleDimensionShardSpec.class,
segment.getShardSpec().getClass());
+ CompactionState expectedState = new CompactionState(
+ new SingleDimensionPartitionsSpec(7, null, "dim", false),
+
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+ getObjectMapper().readValue(
+ getObjectMapper().writeValueAsString(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ true,
+ ImmutableList.of(segment.getInterval())
+ )
+ ),
+ Map.class
+ )
+ );
Assert.assertEquals(expectedState, segment.getLastCompactionState());
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index c3f30ec..bd1f819 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -88,7 +89,6 @@ import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -131,9 +131,6 @@ public class CompactionTaskRunTest extends IngestionTestBase
0
);
- // Expecte compaction state to exist after compaction as we store compaction
state by default
- private static CompactionState DEFAULT_COMPACTION_STATE;
-
private static final List<String> TEST_ROWS = ImmutableList.of(
"2014-01-01T00:00:10Z,a,1\n",
"2014-01-01T00:00:10Z,b,2\n",
@@ -186,14 +183,26 @@ public class CompactionTaskRunTest extends
IngestionTestBase
this.lockGranularity = lockGranularity;
}
- @BeforeClass
- public static void setupClass() throws JsonProcessingException
+ public static CompactionState getDefaultCompactionState(Granularity
segmentGranularity,
+ Granularity
queryGranularity,
+ List<Interval>
intervals) throws JsonProcessingException
{
ObjectMapper mapper = new DefaultObjectMapper();
-
- DEFAULT_COMPACTION_STATE = new CompactionState(
+ // Expected compaction state to exist after compaction as we store
compaction state by default
+ return new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
- mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class)
+ mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
+ mapper.readValue(
+ mapper.writeValueAsString(
+ new UniformGranularitySpec(
+ segmentGranularity,
+ queryGranularity,
+ true,
+ intervals
+ )
+ ),
+ Map.class
+ )
);
}
@@ -238,7 +247,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1))),
+ segments.get(i).getLastCompactionState()
+ );
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
@@ -311,10 +323,6 @@ public class CompactionTaskRunTest extends
IngestionTestBase
final List<DataSegment> segments = resultPair.rhs;
Assert.assertEquals(6, segments.size());
- final CompactionState expectedState = new CompactionState(
- new HashedPartitionsSpec(null, 3, null),
-
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
- );
for (int i = 0; i < 3; i++) {
final Interval interval =
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1);
@@ -324,6 +332,21 @@ public class CompactionTaskRunTest extends
IngestionTestBase
interval,
segments.get(segmentIdx).getInterval()
);
+ CompactionState expectedState = new CompactionState(
+ new HashedPartitionsSpec(null, 3, null),
+
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+ getObjectMapper().readValue(
+ getObjectMapper().writeValueAsString(
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ true,
+
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1))
+ )
+ ),
+ Map.class
+ )
+ );
Assert.assertEquals(expectedState,
segments.get(segmentIdx).getLastCompactionState());
Assert.assertSame(HashBasedNumberedShardSpec.class,
segments.get(segmentIdx).getShardSpec().getClass());
}
@@ -361,7 +384,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1))),
+ segments.get(i).getLastCompactionState()
+ );
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2,
(short) 1, (short) 1),
@@ -388,7 +414,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1))),
+ segments.get(i).getLastCompactionState()
+ );
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(
@@ -487,7 +516,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i
+ 1))),
+ segments.get(i).getLastCompactionState()
+ );
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new
NumberedOverwriteShardSpec(PartitionIds.NON_ROOT_GEN_START_PARTITION_ID, 0, 2,
(short) 1, (short) 1),
@@ -526,7 +558,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"),
segments.get(0).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(0).getShardSpec());
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(0).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.DAY, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01T00:00:00/2014-01-01T03:00:00"))),
+ segments.get(0).getLastCompactionState()
+ );
// hour segmentGranularity
final CompactionTask compactionTask2 = builder
@@ -544,7 +579,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
for (int i = 0; i < 3; i++) {
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00",
i, i + 1), segments.get(i).getInterval());
Assert.assertEquals(new NumberedShardSpec(0, 1),
segments.get(i).getShardSpec());
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.NONE,
ImmutableList.of(Intervals.of("2014-01-01/2014-01-02"))),
+ segments.get(i).getLastCompactionState()
+ );
}
}
@@ -781,7 +819,10 @@ public class CompactionTaskRunTest extends
IngestionTestBase
Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", i, i + 1),
segments.get(i).getInterval()
);
- Assert.assertEquals(DEFAULT_COMPACTION_STATE,
segments.get(i).getLastCompactionState());
+ Assert.assertEquals(
+ getDefaultCompactionState(Granularities.HOUR, Granularities.MINUTE,
ImmutableList.of()),
+ segments.get(i).getLastCompactionState()
+ );
if (lockGranularity == LockGranularity.SEGMENT) {
Assert.assertEquals(
new NumberedOverwriteShardSpec(32768, 0, 2, (short) 1, (short) 1),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 21b7f73..71b603e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -356,6 +356,47 @@ public class CompactionTaskTest
}
@Test
+ public void testCreateCompactionTaskWithGranularitySpec()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.tuningConfig(createTuningConfig());
+ builder.segmentGranularity(Granularities.HOUR);
+ final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+
+ final Builder builder2 = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+ builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder2.tuningConfig(createTuningConfig());
+ builder2.granularitySpec(new UniformGranularitySpec(Granularities.HOUR,
Granularities.DAY, null));
+ final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
+
Assert.assertEquals(taskCreatedWithGranularitySpec.getSegmentGranularity(),
taskCreatedWithSegmentGranularity.getSegmentGranularity());
+ }
+
+ @Test
+ public void
testCreateCompactionTaskWithGranularitySpecOverrideSegmentGranularity()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.tuningConfig(createTuningConfig());
+ builder.segmentGranularity(Granularities.HOUR);
+ builder.granularitySpec(new UniformGranularitySpec(Granularities.MINUTE,
Granularities.DAY, null));
+ final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
+ Assert.assertEquals(Granularities.MINUTE,
taskCreatedWithSegmentGranularity.getSegmentGranularity());
+ }
+
+ @Test
public void testSerdeWithInterval() throws IOException
{
final Builder builder = new Builder(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 5d1d55b..91bac02 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -28,7 +28,11 @@ import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -41,7 +45,9 @@ import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
import org.joda.time.Period;
+import org.joda.time.chrono.ISOChronology;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
@@ -52,8 +58,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
@Test(groups = {TestNGGroup.COMPACTION})
@@ -160,7 +168,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new
HashedPartitionsSpec(null, 3, null);
- submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1);
+ submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -175,7 +183,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
"city",
false
);
- submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1);
+ submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -278,6 +286,55 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
+ @Test
+ public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
+ {
+ loadData(INDEX_TASK);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+
+ Granularity newGranularity = Granularities.YEAR;
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UniformGranularitySpec(newGranularity, null, null));
+
+ LOG.info("Auto compaction test with YEAR segment granularity");
+
+ List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(1);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(1, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ newGranularity = Granularities.DAY;
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UniformGranularitySpec(newGranularity, null, null));
+
+ LOG.info("Auto compaction test with DAY segment granularity");
+
+ // The earlier segment with YEAR granularity is still 'used' as it’s not
fully overshaowed.
+ // This is because we only have newer version on 2013-08-31 to
2013-09-01 and 2013-09-01 to 2013-09-02.
+ // The version for the YEAR segment is still the latest for 2013-01-01
to 2013-08-31 and 2013-09-02 to 2014-01-01.
+ // Hence, all three segments are available and the expected intervals
are combined from the DAY and YEAR segment granularities
+ // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and
2013-01-01 to 2014-01-01)
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(3);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(3, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+ }
+ }
+
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
@@ -314,13 +371,19 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
private void submitCompactionConfig(Integer maxRowsPerSegment, Period
skipOffsetFromLatest) throws Exception
{
- submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null),
skipOffsetFromLatest, 1);
+ submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null);
+ }
+
+ private void submitCompactionConfig(Integer maxRowsPerSegment, Period
skipOffsetFromLatest, GranularitySpec granularitySpec) throws Exception
+ {
+ submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null),
skipOffsetFromLatest, 1, granularitySpec);
}
private void submitCompactionConfig(
PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest,
- int maxNumConcurrentSubTasks
+ int maxNumConcurrentSubTasks,
+ GranularitySpec granularitySpec
) throws Exception
{
DataSourceCompactionConfig compactionConfig = new
DataSourceCompactionConfig(
@@ -348,6 +411,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
null,
1
),
+ granularitySpec,
null
);
compactionResource.submitCompactionConfig(compactionConfig);
@@ -415,11 +479,13 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
private void checkCompactionIntervals(List<String> expectedIntervals)
{
+ Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
ITRetryUtil.retryUntilTrue(
() -> {
- final List<String> actualIntervals =
coordinator.getSegmentIntervals(fullDatasourceName);
- actualIntervals.sort(null);
- return actualIntervals.equals(expectedIntervals);
+ final Set<String> actualIntervals = new
HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
+ System.out.println("ACTUAL: " + actualIntervals);
+ System.out.println("EXPECTED: " + expectedIntervalsSet);
+ return actualIntervals.equals(expectedIntervalsSet);
},
"Compaction interval check"
);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
index ed7b44e..f4ee559 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java
@@ -23,11 +23,14 @@ import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.Interval;
+import org.joda.time.chrono.ISOChronology;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@@ -37,7 +40,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE})
@Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -49,6 +55,8 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String COMPACTION_TASK =
"/indexer/wikipedia_compaction_task.json";
+ private static final String COMPACTION_TASK_WITH_SEGMENT_GRANULARITY =
"/indexer/wikipedia_compaction_task_with_segment_granularity.json";
+ private static final String COMPACTION_TASK_WITH_GRANULARITY_SPEC =
"/indexer/wikipedia_compaction_task_with_granularity_spec.json";
private static final String INDEX_TASK_WITH_TIMESTAMP =
"/indexer/wikipedia_with_timestamp_index_task.json";
@@ -66,24 +74,41 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
@Test
public void testCompaction() throws Exception
{
- loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE);
+ loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE, COMPACTION_TASK,
null);
+ }
+
+ @Test
+ public void testCompactionWithSegmentGranularity() throws Exception
+ {
+ loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE,
COMPACTION_TASK_WITH_SEGMENT_GRANULARITY, GranularityType.MONTH);
+ }
+
+ @Test
+ public void testCompactionWithGranularitySpec() throws Exception
+ {
+ loadDataAndCompact(INDEX_TASK, INDEX_QUERIES_RESOURCE,
COMPACTION_TASK_WITH_GRANULARITY_SPEC, GranularityType.MONTH);
}
@Test
public void testCompactionWithTimestampDimension() throws Exception
{
- loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE);
+ loadDataAndCompact(INDEX_TASK_WITH_TIMESTAMP, INDEX_QUERIES_RESOURCE,
COMPACTION_TASK, null);
}
- private void loadDataAndCompact(String indexTask, String queriesResource)
throws Exception
+ private void loadDataAndCompact(
+ String indexTask,
+ String queriesResource,
+ String compactionResource,
+ GranularityType newSegmentGranularity
+ ) throws Exception
{
loadData(indexTask);
// 4 segments across 2 days
checkNumberOfSegments(4);
- final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
- intervalsBeforeCompaction.sort(null);
+ List<String> expectedIntervalAfterCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ expectedIntervalAfterCompaction.sort(null);
try (final Closeable ignored = unloader(fullDatasourceName)) {
String queryResponseTemplate;
try {
@@ -102,12 +127,23 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
queryHelper.testQueriesFromString(queryResponseTemplate);
- compactData();
+ compactData(compactionResource, newSegmentGranularity);
// The original 4 segments should be compacted into 2 new segments
checkNumberOfSegments(2);
queryHelper.testQueriesFromString(queryResponseTemplate);
- checkCompactionIntervals(intervalsBeforeCompaction);
+
+
+ if (newSegmentGranularity != null) {
+ List<String> newIntervals = new ArrayList<>();
+ for (String interval : expectedIntervalAfterCompaction) {
+ for (Interval newinterval :
newSegmentGranularity.getDefaultGranularity().getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ newIntervals.add(newinterval.toString());
+ }
+ }
+ expectedIntervalAfterCompaction = newIntervals;
+ }
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
}
}
private void loadData(String indexTask) throws Exception
@@ -124,12 +160,19 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
);
}
- private void compactData() throws Exception
+ private void compactData(String compactionResource, GranularityType
newSegmentGranularity) throws Exception
{
- final String template = getResourceAsString(COMPACTION_TASK);
- String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%",
fullDatasourceName);
+ String template = getResourceAsString(compactionResource);
+ template = StringUtils.replace(template, "%%DATASOURCE%%",
fullDatasourceName);
+ if (newSegmentGranularity != null) {
+ template = StringUtils.replace(
+ template,
+ "%%SEGMENTGRANULARITY%%",
+ newSegmentGranularity.name()
+ );
+ }
- final String taskID = indexer.submitTask(taskSpec);
+ final String taskID = indexer.submitTask(template);
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
@@ -153,13 +196,13 @@ public class ITCompactionTaskTest extends
AbstractIndexerTest
private void checkCompactionIntervals(List<String> expectedIntervals)
{
+ Set<String> expectedIntervalsSet = new HashSet<>(expectedIntervals);
ITRetryUtil.retryUntilTrue(
() -> {
- final List<String> intervalsAfterCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
- intervalsAfterCompaction.sort(null);
- System.out.println("AFTER: " + intervalsAfterCompaction);
- System.out.println("EXPECTED: " + expectedIntervals);
- return intervalsAfterCompaction.equals(expectedIntervals);
+ final Set<String> intervalsAfterCompaction = new
HashSet<>(coordinator.getSegmentIntervals(fullDatasourceName));
+ System.out.println("ACTUAL: " + intervalsAfterCompaction);
+ System.out.println("EXPECTED: " + expectedIntervalsSet);
+ return intervalsAfterCompaction.equals(expectedIntervalsSet);
},
"Compaction interval check"
);
diff --git
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
new file mode 100644
index 0000000..828579b
--- /dev/null
+++
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_granularity_spec.json
@@ -0,0 +1,17 @@
+{
+ "type" : "compact",
+ "dataSource" : "%%DATASOURCE%%",
+ "ioConfig" : {
+ "type": "compact",
+ "inputSpec": {
+ "type": "interval",
+ "interval": "2013-08-31/2013-09-02"
+ }
+ },
+ "granularitySpec": {
+ "segmentGranularity": "%%SEGMENTGRANULARITY%%"
+ },
+ "context" : {
+ "storeCompactionState" : true
+ }
+}
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
new file mode 100644
index 0000000..254926f
--- /dev/null
+++
b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task_with_segment_granularity.json
@@ -0,0 +1,15 @@
+{
+ "type" : "compact",
+ "dataSource" : "%%DATASOURCE%%",
+ "ioConfig" : {
+ "type": "compact",
+ "inputSpec": {
+ "type": "interval",
+ "interval": "2013-08-31/2013-09-02"
+ }
+ },
+ "segmentGranularity": "%%SEGMENTGRANULARITY%%",
+ "context" : {
+ "storeCompactionState" : true
+ }
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index d0dd175..ea41555 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -38,6 +38,7 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
private final String dataSource;
private final ClientCompactionIOConfig ioConfig;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
+ private final ClientCompactionTaskQueryGranularitySpec granularitySpec;
private final Map<String, Object> context;
@JsonCreator
@@ -46,6 +47,7 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
@JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig
tuningConfig,
+ @JsonProperty("granularitySpec")
ClientCompactionTaskQueryGranularitySpec granularitySpec,
@JsonProperty("context") Map<String, Object> context
)
{
@@ -53,6 +55,7 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
this.dataSource = dataSource;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig;
+ this.granularitySpec = granularitySpec;
this.context = context;
}
@@ -90,11 +93,18 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
}
@JsonProperty
+ public ClientCompactionTaskQueryGranularitySpec getGranularitySpec()
+ {
+ return granularitySpec;
+ }
+
+ @JsonProperty
public Map<String, Object> getContext()
{
return context;
}
+
@Override
public boolean equals(Object o)
{
@@ -109,13 +119,14 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
Objects.equals(dataSource, that.dataSource) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
+ Objects.equals(granularitySpec, that.granularitySpec) &&
Objects.equals(context, that.context);
}
@Override
public int hashCode()
{
- return Objects.hash(id, dataSource, ioConfig, tuningConfig, context);
+ return Objects.hash(id, dataSource, ioConfig, tuningConfig,
granularitySpec, context);
}
@Override
@@ -126,6 +137,7 @@ public class ClientCompactionTaskQuery implements
ClientTaskQuery
", dataSource='" + dataSource + '\'' +
", ioConfig=" + ioConfig +
", tuningConfig=" + tuningConfig +
+ ", granularitySpec=" + granularitySpec +
", context=" + context +
'}';
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
new file mode 100644
index 0000000..6f12ea7
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryGranularitySpec.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
+
+import java.util.Objects;
+
+public class ClientCompactionTaskQueryGranularitySpec
+{
+ private final Granularity segmentGranularity;
+ private final Granularity queryGranularity;
+ private final boolean rollup;
+
+ @JsonCreator
+ public ClientCompactionTaskQueryGranularitySpec(
+ @JsonProperty("segmentGranularity") Granularity segmentGranularity,
+ @JsonProperty("queryGranularity") Granularity queryGranularity,
+ @JsonProperty("rollup") Boolean rollup
+ )
+ {
+ this.queryGranularity = queryGranularity == null ?
BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY : queryGranularity;
+ this.rollup = rollup == null ? BaseGranularitySpec.DEFAULT_ROLLUP : rollup;
+ this.segmentGranularity = segmentGranularity == null ?
BaseGranularitySpec.DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
+ }
+
+ @JsonProperty
+ public Granularity getSegmentGranularity()
+ {
+ return segmentGranularity;
+ }
+
+ @JsonProperty
+ public Granularity getQueryGranularity()
+ {
+ return queryGranularity;
+ }
+
+ @JsonProperty
+ public boolean isRollup()
+ {
+ return rollup;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientCompactionTaskQueryGranularitySpec{" +
+ "segmentGranularity=" + segmentGranularity +
+ ", queryGranularity=" + queryGranularity +
+ ", rollup=" + rollup +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientCompactionTaskQueryGranularitySpec that =
(ClientCompactionTaskQueryGranularitySpec) o;
+ return Objects.equals(segmentGranularity, that.segmentGranularity) &&
+ Objects.equals(queryGranularity, that.queryGranularity) &&
+ Objects.equals(rollup, that.rollup);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(segmentGranularity, queryGranularity, rollup);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 2b6e47b..321831f 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -78,7 +78,8 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
String idPrefix,
List<DataSegment> segments,
int compactionTaskPriority,
- ClientCompactionTaskQueryTuningConfig tuningConfig,
+ @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+ @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
@Nullable Map<String, Object> context
)
{
@@ -99,6 +100,7 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
dataSource,
new
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
tuningConfig,
+ granularitySpec,
context
);
return runTask(taskId, taskQuery);
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 14d4359..8eb8034 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -40,6 +40,7 @@ public interface IndexingServiceClient
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+ @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
@Nullable Map<String, Object> context
);
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
index 0f97749..8bf6396 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
@@ -143,5 +143,4 @@ public class ArbitraryGranularitySpec extends
BaseGranularitySpec
{
return lookupTableBucketByDateTime;
}
-
}
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
index 9ff114b..779952b 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -20,10 +20,14 @@
package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -31,10 +35,15 @@ import org.joda.time.Interval;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
-abstract class BaseGranularitySpec implements GranularitySpec
+public abstract class BaseGranularitySpec implements GranularitySpec
{
+ public static final Boolean DEFAULT_ROLLUP = Boolean.TRUE;
+ public static final Granularity DEFAULT_SEGMENT_GRANULARITY =
Granularities.DAY;
+ public static final Granularity DEFAULT_QUERY_GRANULARITY =
Granularities.NONE;
+
protected List<Interval> inputIntervals;
protected final Boolean rollup;
@@ -45,7 +54,7 @@ abstract class BaseGranularitySpec implements GranularitySpec
} else {
this.inputIntervals = Collections.emptyList();
}
- this.rollup = rollup == null ? Boolean.TRUE : rollup;
+ this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
}
@Override
@@ -76,6 +85,15 @@ abstract class BaseGranularitySpec implements GranularitySpec
protected abstract LookupIntervalBuckets getLookupTableBuckets();
+ @Override
+ public Map<String, Object> asMap(ObjectMapper objectMapper)
+ {
+ return objectMapper.convertValue(
+ this,
+ new TypeReference<Map<String, Object>>() {}
+ );
+ }
+
/**
* This is a helper class to facilitate sharing the code for
sortedBucketIntervals among
* the various GranularitySpec implementations. In particular, the
UniformGranularitySpec
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
index c3bb579..148f039 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
@@ -21,12 +21,14 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
/**
@@ -79,4 +81,6 @@ public interface GranularitySpec
Granularity getQueryGranularity();
GranularitySpec withIntervals(List<Interval> inputIntervals);
+
+ Map<String, Object> asMap(ObjectMapper objectMapper);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
index 5fe6a9e..cd7c7a4 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
@@ -30,9 +29,6 @@ import java.util.List;
public class UniformGranularitySpec extends BaseGranularitySpec
{
- private static final Granularity DEFAULT_SEGMENT_GRANULARITY =
Granularities.DAY;
- private static final Granularity DEFAULT_QUERY_GRANULARITY =
Granularities.NONE;
-
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
private final IntervalsByGranularity intervalsByGranularity;
@@ -144,5 +140,4 @@ public class UniformGranularitySpec extends
BaseGranularitySpec
{
return lookupTableBucketByDateTime;
}
-
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index c5a1263..692eb81 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.segment.indexing.granularity.BaseGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Period;
import javax.annotation.Nullable;
@@ -46,6 +49,7 @@ public class DataSourceCompactionConfig
private final Integer maxRowsPerSegment;
private final Period skipOffsetFromLatest;
private final UserCompactionTaskQueryTuningConfig tuningConfig;
+ private final GranularitySpec granularitySpec;
private final Map<String, Object> taskContext;
@JsonCreator
@@ -56,6 +60,7 @@ public class DataSourceCompactionConfig
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer
maxRowsPerSegment,
@JsonProperty("skipOffsetFromLatest") @Nullable Period
skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable
UserCompactionTaskQueryTuningConfig tuningConfig,
+ @JsonProperty("granularitySpec") @Nullable GranularitySpec
granularitySpec,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
@@ -69,6 +74,24 @@ public class DataSourceCompactionConfig
this.maxRowsPerSegment = maxRowsPerSegment;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ?
DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
+ if (granularitySpec != null) {
+ Preconditions.checkArgument(
+ granularitySpec instanceof UniformGranularitySpec,
+ "Auto compaction granularitySpec only supports uniform type"
+ );
+ Preconditions.checkArgument(
+ granularitySpec.isRollup() == BaseGranularitySpec.DEFAULT_ROLLUP,
+ "Auto compaction granularitySpec only supports default rollup value"
+ );
+ Preconditions.checkArgument(
+
granularitySpec.getQueryGranularity().equals(BaseGranularitySpec.DEFAULT_QUERY_GRANULARITY),
+ "Auto compaction granularitySpec only supports default query
granularity value");
+ Preconditions.checkArgument(
+ granularitySpec.inputIntervals().isEmpty(),
+ "Auto compaction granularitySpec does not supports interval value"
+ );
+ }
+ this.granularitySpec = granularitySpec;
this.taskContext = taskContext;
}
@@ -113,6 +136,13 @@ public class DataSourceCompactionConfig
@JsonProperty
@Nullable
+ public GranularitySpec getGranularitySpec()
+ {
+ return granularitySpec;
+ }
+
+ @JsonProperty
+ @Nullable
public Map<String, Object> getTaskContext()
{
return taskContext;
@@ -131,8 +161,10 @@ public class DataSourceCompactionConfig
return taskPriority == that.taskPriority &&
inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
Objects.equals(dataSource, that.dataSource) &&
+ Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
+ Objects.equals(granularitySpec, that.granularitySpec) &&
Objects.equals(taskContext, that.taskContext);
}
@@ -143,8 +175,10 @@ public class DataSourceCompactionConfig
dataSource,
taskPriority,
inputSegmentSizeBytes,
+ maxRowsPerSegment,
skipOffsetFromLatest,
tuningConfig,
+ granularitySpec,
taskContext
);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 3b7ee31..dcd7fab 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -24,12 +24,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CompactionStatistics;
@@ -123,8 +125,27 @@ public class CompactSegments implements CoordinatorDuty
}
if (COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) {
final ClientCompactionTaskQuery compactionTaskQuery =
(ClientCompactionTaskQuery) response.getPayload();
+ DataSourceCompactionConfig dataSourceCompactionConfig =
compactionConfigs.get(status.getDataSource());
+ if (dataSourceCompactionConfig != null &&
dataSourceCompactionConfig.getGranularitySpec() != null) {
+ Granularity configuredSegmentGranularity =
dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity();
+ if (configuredSegmentGranularity != null
+ && compactionTaskQuery.getGranularitySpec() != null
+ &&
!configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity()))
{
+ // We will cancel active compaction task if segmentGranularity
changes and we will need to
+ // re-compact the interval
+ LOG.info("Canceled task[%s] as task segmentGranularity is [%s]
but compaction config "
+ + "segmentGranularity is [%s]",
+ status.getId(),
+
compactionTaskQuery.getGranularitySpec().getSegmentGranularity(),
+ configuredSegmentGranularity);
+ indexingServiceClient.cancelTask(status.getId());
+ continue;
+ }
+ }
+ // Skip interval as the current active compaction task is good
final Interval interval =
compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k
-> new ArrayList<>()).add(interval);
+ // Since we keep the current active compaction task running, we
count the active task slots
numEstimatedNonCompleteCompactionTasks +=
findMaxNumTaskSlotsUsedByOneCompactionTask(
compactionTaskQuery.getTuningConfig()
);
@@ -289,12 +310,24 @@ public class CompactSegments implements CoordinatorDuty
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
final DataSourceCompactionConfig config =
compactionConfigs.get(dataSourceName);
+ ClientCompactionTaskQueryGranularitySpec queryGranularitySpec;
+ if (config.getGranularitySpec() != null) {
+ queryGranularitySpec = new ClientCompactionTaskQueryGranularitySpec(
+ config.getGranularitySpec().getSegmentGranularity(),
+ config.getGranularitySpec().getQueryGranularity(),
+ config.getGranularitySpec().isRollup()
+ );
+ } else {
+ queryGranularitySpec = null;
+ }
+
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment()),
+ queryGranularitySpec,
newAutoCompactionContext(config.getTaskContext())
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index c03c4b4..90bfb66 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -28,19 +28,27 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.NumberedPartitionChunk;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.Streams;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -51,12 +59,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -75,6 +85,11 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
// searchIntervals keeps track of the current state of which interval should
be considered to search segments to
// compact.
private final Map<String, CompactibleTimelineObjectHolderCursor>
timelineIterators;
+ // This is needed for datasource that has segmentGranularity configured
+ // If configured segmentGranularity in config is finer than current
segmentGranularity, the same set of segments
+ // can belong to multiple intervals in the timeline. We keep track of the
compacted intervals between each
+ // run of the compaction job and skip any interval that was already
previously compacted.
+ private final Map<String, Set<Interval>> intervalCompactedForDatasource =
new HashMap<>();
private final PriorityQueue<QueueEntry> queue = new PriorityQueue<>(
(o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o2.interval,
o1.interval)
@@ -93,12 +108,53 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
dataSources.forEach((String dataSource, VersionedIntervalTimeline<String,
DataSegment> timeline) -> {
final DataSourceCompactionConfig config =
compactionConfigs.get(dataSource);
-
+ Granularity configuredSegmentGranularity = null;
if (config != null && !timeline.isEmpty()) {
+ Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs = new
HashMap<>();
+ if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
+ Map<Interval, Set<DataSegment>> intervalToPartitionMap = new
HashMap<>();
+ configuredSegmentGranularity =
config.getGranularitySpec().getSegmentGranularity();
+ // Create a new timeline to hold segments in the new configured
segment granularity
+ VersionedIntervalTimeline<String, DataSegment>
timelineWithConfiguredSegmentGranularity = new
VersionedIntervalTimeline<>(Comparator.naturalOrder());
+ Set<DataSegment> segments =
timeline.findNonOvershadowedObjectsInInterval(Intervals.ETERNITY,
Partitions.ONLY_COMPLETE);
+ for (DataSegment segment : segments) {
+ // Convert original segmentGranularity to new granularities bucket
by configuredSegmentGranularity
+ // For example, if the original is interval of
2020-01-28/2020-02-03 with WEEK granularity
+ // and the configuredSegmentGranularity is MONTH, the segment will
be split to two segments
+ // of 2020-01/2020-02 and 2020-02/2020-03.
+ for (Interval interval :
configuredSegmentGranularity.getIterable(segment.getInterval())) {
+ intervalToPartitionMap.computeIfAbsent(interval, k -> new
HashSet<>()).add(segment);
+ }
+ }
+ for (Map.Entry<Interval, Set<DataSegment>> partitionsPerInterval :
intervalToPartitionMap.entrySet()) {
+ Interval interval = partitionsPerInterval.getKey();
+ int partitionNum = 0;
+ Set<DataSegment> segmentSet = partitionsPerInterval.getValue();
+ int partitions = segmentSet.size();
+ for (DataSegment segment : segmentSet) {
+ DataSegment segmentsForCompact = segment.withShardSpec(new
NumberedShardSpec(partitionNum, partitions));
+ // PartitionHolder can only holds chunks of one partition space
+ // However, partition in the new timeline
(timelineWithConfiguredSegmentGranularity) can be hold multiple
+ // partitions of the original timeline (when the new
segmentGranularity is larger than the original
+ // segmentGranularity). Hence, we group all the segments of the
original timeline into intervals bucket
+ // by the new configuredSegmentGranularity. We then convert each
segment into a new partition space so that
+ // there is no duplicate partitionNum across all segments of
each new Interval. We will have to save the
+ // original ShardSpec to convert the segment back when returning
from the iterator.
+ originalShardSpecs.put(new Pair<>(interval,
segmentsForCompact.getId()), segment.getShardSpec());
+ timelineWithConfiguredSegmentGranularity.add(
+ interval,
+ segmentsForCompact.getVersion(),
+ NumberedPartitionChunk.make(partitionNum, partitions,
segmentsForCompact)
+ );
+ partitionNum += 1;
+ }
+ }
+ timeline = timelineWithConfiguredSegmentGranularity;
+ }
final List<Interval> searchIntervals =
- findInitialSearchInterval(timeline,
config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
+ findInitialSearchInterval(timeline,
config.getSkipOffsetFromLatest(), configuredSegmentGranularity,
skipIntervals.get(dataSource));
if (!searchIntervals.isEmpty()) {
- timelineIterators.put(dataSource, new
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
+ timelineIterators.put(dataSource, new
CompactibleTimelineObjectHolderCursor(timeline, searchIntervals,
originalShardSpecs));
}
}
});
@@ -187,10 +243,12 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
private static class CompactibleTimelineObjectHolderCursor implements
Iterator<List<DataSegment>>
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
+ private final Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs;
CompactibleTimelineObjectHolderCursor(
VersionedIntervalTimeline<String, DataSegment> timeline,
- List<Interval> totalIntervalsToSearch
+ List<Interval> totalIntervalsToSearch,
+ Map<Pair<Interval, SegmentId>, ShardSpec> originalShardSpecs
)
{
this.holders = totalIntervalsToSearch
@@ -201,6 +259,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
.filter(holder -> isCompactibleHolder(interval, holder))
)
.collect(Collectors.toList());
+ this.originalShardSpecs = originalShardSpecs;
}
private boolean isCompactibleHolder(Interval interval,
TimelineObjectHolder<String, DataSegment> holder)
@@ -220,6 +279,14 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
return partitionBytes > 0;
}
+ private DataSegment transformShardSpecIfNeeded(DataSegment dataSegment,
Interval interval)
+ {
+ if (originalShardSpecs != null && !originalShardSpecs.isEmpty()) {
+ return dataSegment.withShardSpec(originalShardSpecs.get(new
Pair<>(interval, dataSegment.getId())));
+ }
+ return dataSegment;
+ }
+
@Override
public boolean hasNext()
{
@@ -232,8 +299,10 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
if (holders.isEmpty()) {
throw new NoSuchElementException();
}
- return Streams.sequentialStreamFrom(holders.remove(holders.size() -
1).getObject())
+ TimelineObjectHolder<String, DataSegment> timelineObjectHolder =
holders.remove(holders.size() - 1);
+ return Streams.sequentialStreamFrom(timelineObjectHolder.getObject())
.map(PartitionChunk::getObject)
+ .map(dataSegment ->
transformShardSpecIfNeeded(dataSegment, timelineObjectHolder.getTrueInterval()))
.collect(Collectors.toList());
}
}
@@ -257,10 +326,11 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
}
- private boolean needsCompaction(ClientCompactionTaskQueryTuningConfig
tuningConfig, SegmentsToCompact candidates)
+ private boolean needsCompaction(DataSourceCompactionConfig config,
SegmentsToCompact candidates)
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
-
+ final ClientCompactionTaskQueryTuningConfig tuningConfig =
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment());
final PartitionsSpec partitionsSpecFromConfig =
findPartitinosSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState =
candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
@@ -314,6 +384,22 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
needsCompaction = true;
}
+ // Only checks for segmentGranularity as auto compaction currently only
supports segmentGranularity
+ final Granularity segmentGranularity =
lastCompactionState.getGranularitySpec() != null ?
+
objectMapper.convertValue(lastCompactionState.getGranularitySpec(),
GranularitySpec.class).getSegmentGranularity() :
+ null;
+
+ if (config.getGranularitySpec() != null &&
+ config.getGranularitySpec().getSegmentGranularity() != null &&
+
!config.getGranularitySpec().getSegmentGranularity().equals(segmentGranularity))
{
+ log.info(
+ "Configured granularitySpec[%s] is different from the one[%s] of
segments. Needs compaction",
+ config.getGranularitySpec(),
+ segmentGranularity
+ );
+ needsCompaction = true;
+ }
+
return needsCompaction;
}
@@ -334,16 +420,25 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
final long inputSegmentSize = config.getInputSegmentSizeBytes();
while (compactibleTimelineObjectHolderCursor.hasNext()) {
- final SegmentsToCompact candidates = new
SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
-
+ List<DataSegment> segments =
compactibleTimelineObjectHolderCursor.next();
+ final SegmentsToCompact candidates = new SegmentsToCompact(segments);
if (!candidates.isEmpty()) {
final boolean isCompactibleSize = candidates.getTotalSize() <=
inputSegmentSize;
final boolean needsCompaction = needsCompaction(
-
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment()),
+ config,
candidates
);
if (isCompactibleSize && needsCompaction) {
+ if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
+ Interval interval = candidates.getUmbrellaInterval();
+ Set<Interval> intervalsCompacted =
intervalCompactedForDatasource.computeIfAbsent(dataSourceName, k -> new
HashSet<>());
+ // Skip this candidates if we have compacted the interval already
+ if (intervalsCompacted.contains(interval)) {
+ continue;
+ }
+ intervalsCompacted.add(interval);
+ }
return candidates;
} else {
if (!needsCompaction) {
@@ -396,6 +491,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
private static List<Interval> findInitialSearchInterval(
VersionedIntervalTimeline<String, DataSegment> timeline,
Period skipOffset,
+ Granularity configuredSegmentGranularity,
@Nullable List<Interval> skipIntervals
)
{
@@ -407,6 +503,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
last.getInterval().getEnd(),
skipOffset,
+ configuredSegmentGranularity,
skipIntervals
);
@@ -447,19 +544,27 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
static List<Interval> sortAndAddSkipIntervalFromLatest(
DateTime latest,
Period skipOffset,
+ Granularity configuredSegmentGranularity,
@Nullable List<Interval> skipIntervals
)
{
final List<Interval> nonNullSkipIntervals = skipIntervals == null
? new ArrayList<>(1)
: new
ArrayList<>(skipIntervals.size());
+ final Interval skipFromLatest;
+ if (configuredSegmentGranularity != null) {
+ DateTime skipFromLastest = new DateTime(latest,
latest.getZone()).minus(skipOffset);
+ DateTime skipOffsetBucketToSegmentGranularity =
configuredSegmentGranularity.bucketStart(skipFromLastest);
+ skipFromLatest = new Interval(skipOffsetBucketToSegmentGranularity,
latest);
+ } else {
+ skipFromLatest = new Interval(skipOffset, latest);
+ }
if (skipIntervals != null) {
final List<Interval> sortedSkipIntervals = new
ArrayList<>(skipIntervals);
sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
final List<Interval> overlapIntervals = new ArrayList<>();
- final Interval skipFromLatest = new Interval(skipOffset, latest);
for (Interval interval : sortedSkipIntervals) {
if (interval.overlaps(skipFromLatest)) {
@@ -476,7 +581,6 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
nonNullSkipIntervals.add(skipFromLatest);
}
} else {
- final Interval skipFromLatest = new Interval(skipOffset, latest);
nonNullSkipIntervals.add(skipFromLatest);
}
@@ -579,6 +683,11 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
return segments.size();
}
+ private Interval getUmbrellaInterval()
+ {
+ return
JodaUtils.umbrellaInterval(segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()));
+ }
+
private long getNumberOfIntervals()
{
return
segments.stream().map(DataSegment::getInterval).distinct().count();
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 34800cc..09f4c69 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -50,6 +50,7 @@ public class NoopIndexingServiceClient implements
IndexingServiceClient
List<DataSegment> segments,
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
+ @Nullable ClientCompactionTaskQueryGranularitySpec granularitySpec,
@Nullable Map<String, Object> context
)
{
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
index a015422..25bf848 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
@@ -32,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
+import java.util.Map;
public class ArbitraryGranularityTest
{
@@ -218,6 +219,32 @@ public class ArbitraryGranularityTest
}
@Test
+ public void testAsMap()
+ {
+ final GranularitySpec spec = new
ArbitraryGranularitySpec(Granularities.NONE, Lists.newArrayList(
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+ ));
+
+ Map<String, Object> map = spec.asMap(JSON_MAPPER);
+ final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map,
GranularitySpec.class);
+ Assert.assertEquals(
+ "Round-trip",
+ ImmutableList.copyOf(spec.sortedBucketIntervals()),
+ ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
+ );
+ Assert.assertEquals(
+ "Round-trip",
+ ImmutableList.copyOf(spec.inputIntervals()),
+ ImmutableList.copyOf(rtSpec.inputIntervals())
+ );
+ Assert.assertEquals(spec, rtSpec);
+ }
+
+ @Test
public void testNullInputIntervals()
{
final GranularitySpec spec = new
ArbitraryGranularitySpec(Granularities.NONE, null);
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
index 27760f6..0c19724 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
@@ -38,10 +38,11 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
public class UniformGranularityTest
{
- private static final ObjectMapper JOSN_MAPPER = new DefaultObjectMapper();
+ private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
@Test
public void testSimple()
@@ -161,7 +162,7 @@ public class UniformGranularityTest
);
try {
- final GranularitySpec rtSpec =
JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec),
GranularitySpec.class);
+ final GranularitySpec rtSpec =
JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec),
GranularitySpec.class);
Assert.assertEquals(
"Round-trip sortedBucketIntervals",
ImmutableList.copyOf(spec.sortedBucketIntervals()),
@@ -179,6 +180,34 @@ public class UniformGranularityTest
}
@Test
+ public void testAsMap()
+ {
+ final GranularitySpec spec = new UniformGranularitySpec(
+ Granularities.DAY,
+ null,
+ Lists.newArrayList(
+ Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+ Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+ Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+ Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+ )
+ );
+ Map<String, Object> map = spec.asMap(JSON_MAPPER);
+ final GranularitySpec rtSpec = JSON_MAPPER.convertValue(map,
GranularitySpec.class);
+ Assert.assertEquals(
+ "Round-trip sortedBucketIntervals",
+ ImmutableList.copyOf(spec.sortedBucketIntervals()),
+ ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
+ );
+ Assert.assertEquals(
+ "Round-trip granularity",
+ spec.getSegmentGranularity(),
+ rtSpec.getSegmentGranularity()
+ );
+ Assert.assertEquals(spec, rtSpec);
+ }
+
+ @Test
public void testEquals()
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 96bfcc1..01cfabb 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -20,15 +20,20 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -56,6 +61,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -68,6 +74,7 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getSkipOffsetFromLatest(),
fromJson.getSkipOffsetFromLatest());
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
}
@Test
@@ -80,6 +87,7 @@ public class DataSourceCompactionConfigTest
30,
new Period(3600),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -122,6 +130,7 @@ public class DataSourceCompactionConfigTest
null,
null
),
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -164,6 +173,7 @@ public class DataSourceCompactionConfigTest
null,
null
),
+ null,
ImmutableMap.of("key", "val")
);
@@ -217,4 +227,92 @@ public class DataSourceCompactionConfigTest
OBJECT_MAPPER.readValue(json,
UserCompactionTaskQueryTuningConfig.class);
Assert.assertEquals(tuningConfig, fromJson);
}
+
+ @Test
+ public void testSerdeGranularitySpec() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UniformGranularitySpec(Granularities.HOUR, null, null),
+ ImmutableMap.of("key", "val")
+ );
+ final String json = OBJECT_MAPPER.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(),
fromJson.getInputSegmentSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(),
fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(),
fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailIfGranularitySpecContainsNonDefaultQueryGranularity()
+ {
+ new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH,
null),
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailIfGranularitySpecContainsNonDefaultRollup()
+ {
+ new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH,
false, null),
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailIfGranularitySpecContainsNonEmptyInterval()
+ {
+ new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UniformGranularitySpec(Granularities.HOUR, Granularities.MONTH,
ImmutableList.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z"))),
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFailIfGranularitySpecIsNotUniform()
+ {
+ new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new ArbitraryGranularitySpec(null, null, null),
+ ImmutableMap.of("key", "val")
+ );
+ }
+
+
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index c8ef83d..c892810 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -28,16 +28,24 @@ import com.google.common.collect.Lists;
import junitparams.converters.Nullable;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.indexing.ClientCompactionIOConfig;
+import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import
org.apache.druid.client.indexing.ClientCompactionTaskQueryGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -47,8 +55,10 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
@@ -81,6 +91,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.io.IOException;
@@ -89,6 +101,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
@@ -101,6 +114,7 @@ public class CompactSegmentsTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final String DATA_SOURCE_PREFIX = "dataSource_";
+ private static final int PARTITION_PER_TIME_INTERVAL = 4;
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals
needing compaction
private static final int TOTAL_BYTE_PER_DATASOURCE = 440;
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
@@ -144,6 +158,7 @@ public class CompactSegmentsTest
private final BiFunction<Integer, Integer, ShardSpec> shardSpecFactory;
private Map<String, VersionedIntervalTimeline<String, DataSegment>>
dataSources;
+ Map<String, List<DataSegment>> datasourceToSegments = new HashMap<>();
public CompactSegmentsTest(PartitionsSpec partitionsSpec,
BiFunction<Integer, Integer, ShardSpec> shardSpecFactory)
{
@@ -154,18 +169,23 @@ public class CompactSegmentsTest
@Before
public void setup()
{
- List<DataSegment> segments = new ArrayList<>();
+ List<DataSegment> allSegments = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
- for (int k = 0; k < 4; k++) {
- segments.add(createSegment(dataSource, j, true, k));
- segments.add(createSegment(dataSource, j, false, k));
+ for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
+ List<DataSegment> segmentForDatasource =
datasourceToSegments.computeIfAbsent(dataSource, key -> new ArrayList<>());
+ DataSegment dataSegment = createSegment(dataSource, j, true, k);
+ allSegments.add(dataSegment);
+ segmentForDatasource.add(dataSegment);
+ dataSegment = createSegment(dataSource, j, false, k);
+ allSegments.add(dataSegment);
+ segmentForDatasource.add(dataSegment);
}
}
}
dataSources = DataSourcesSnapshot
- .fromUsedSegments(segments, ImmutableMap.of())
+ .fromUsedSegments(allSegments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
}
@@ -351,17 +371,17 @@ public class CompactSegmentsTest
String dataSourceName = DATA_SOURCE_PREFIX + 1;
List<DataSegment> segments = new ArrayList<>();
for (int j : new int[]{0, 1, 2, 3, 7, 8}) {
- for (int k = 0; k < 4; k++) {
+ for (int k = 0; k < PARTITION_PER_TIME_INTERVAL; k++) {
DataSegment beforeNoon = createSegment(dataSourceName, j, true, k);
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day compacted (two compacted intervals
back-to-back)
- beforeNoon = beforeNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of()));
- afterNoon = afterNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of()));
+ beforeNoon = beforeNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+ afterNoon = afterNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
}
if (j == 1) {
// Make one interval on this day compacted
- afterNoon = afterNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of()));
+ afterNoon = afterNoon.withLastCompactionState(new
CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
}
segments.add(beforeNoon);
segments.add(afterNoon);
@@ -539,6 +559,205 @@ public class CompactSegmentsTest
}
@Test
+ public void testCompactWithoutGranularitySpec()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER,
mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<List<DataSegment>> segmentsCaptor =
ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec>
granularitySpecArgumentCaptor =
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ segmentsCaptor.capture(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any()
+ );
+ // Only the same amount of segments as the original
PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
+ Assert.assertEquals(PARTITION_PER_TIME_INTERVAL,
segmentsCaptor.getValue().size());
+ Assert.assertNull(granularitySpecArgumentCaptor.getValue());
+ }
+
+ @Test
+ public void testCompactWithGranularitySpec()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER,
mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ new UniformGranularitySpec(Granularities.YEAR, null, null),
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<List<DataSegment>> segmentsCaptor =
ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec>
granularitySpecArgumentCaptor =
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ segmentsCaptor.capture(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any()
+ );
+ // All segments is compact at the same time since we changed the segment
granularity to YEAR and all segment
+ // are within the same year
+ Assert.assertEquals(datasourceToSegments.get(dataSource).size(),
segmentsCaptor.getValue().size());
+ Assert.assertEquals(Granularities.YEAR,
granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
+ }
+
+ @Test
+ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask()
+ {
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ final String conflictTaskId = "taskIdDummy";
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus(
+ conflictTaskId,
+ "groupId",
+ "compact",
+ DateTimes.EPOCH,
+ DateTimes.EPOCH,
+ TaskState.RUNNING,
+ RunnerTaskState.RUNNING,
+ -1L,
+ TaskLocation.unknown(),
+ dataSource,
+ null
+ );
+ TaskPayloadResponse runningConflictCompactionTaskPayload = new
TaskPayloadResponse(
+ conflictTaskId,
+ new ClientCompactionTaskQuery(
+ conflictTaskId,
+ dataSource,
+ new ClientCompactionIOConfig(
+ new ClientCompactionIntervalSpec(
+ Intervals.of("2000/2099"),
+ "testSha256OfSortedSegmentIds"
+ )
+ ),
+ null,
+ new ClientCompactionTaskQueryGranularitySpec(Granularities.DAY,
null, null),
+ null
+ )
+ );
+
Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
+
Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
+
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER,
mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ new UniformGranularitySpec(Granularities.YEAR, null, null),
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ // Verify that conflict task was canceled
+ Mockito.verify(mockIndexingServiceClient).cancelTask(conflictTaskId);
+ // The active conflict task has interval of 2000/2099
+ // Make sure that we do not skip interval of conflict task.
+ // Since we cancel the task and will have to compact those intervals with
the new segmentGranulartity
+ ArgumentCaptor<List<DataSegment>> segmentsCaptor =
ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor<ClientCompactionTaskQueryGranularitySpec>
granularitySpecArgumentCaptor =
ArgumentCaptor.forClass(ClientCompactionTaskQueryGranularitySpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ segmentsCaptor.capture(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any()
+ );
+ // All segments is compact at the same time since we changed the segment
granularity to YEAR and all segment
+ // are within the same year
+ Assert.assertEquals(datasourceToSegments.get(dataSource).size(),
segmentsCaptor.getValue().size());
+ Assert.assertEquals(Granularities.YEAR,
granularitySpecArgumentCaptor.getValue().getSegmentGranularity());
+ }
+
+ @Test
public void testRunParallelCompactionMultipleCompactionTaskSlots()
{
final TestDruidLeaderClient leaderClient = new
TestDruidLeaderClient(JSON_MAPPER);
@@ -831,6 +1050,7 @@ public class CompactSegmentsTest
null,
null
),
+ null,
null
)
);
@@ -984,7 +1204,8 @@ public class CompactSegmentsTest
"lz4",
"longEncoding",
"longs"
- )
+ ),
+ ImmutableMap.of()
),
1,
segmentSize
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 76a84bc..b32a7b7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -71,6 +71,7 @@ public class NewestSegmentFirstIteratorTest
final List<Interval> fullSkipIntervals =
NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
DateTimes.of("2019-01-01"),
new Period(72, 0, 0, 0),
+ null,
ImmutableList.of(
Intervals.of("2018-12-30/2018-12-31"),
Intervals.of("2018-12-24/2018-12-25")
@@ -90,6 +91,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -128,6 +130,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -166,6 +169,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -204,6 +208,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -242,6 +247,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -280,6 +286,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -318,6 +325,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -356,6 +364,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
@@ -394,6 +403,7 @@ public class NewestSegmentFirstIteratorTest
null,
null
),
+ null,
null
);
Assert.assertEquals(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 749f9d7..cc23f6a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -22,10 +22,15 @@ package org.apache.druid.server.coordinator.duty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
@@ -58,7 +63,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P2D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P2D"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -83,7 +88,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1M"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1M"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -116,7 +121,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1H1M"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1H1M"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -149,7 +154,7 @@ public class NewestSegmentFirstPolicyTest
public void testHugeShard()
{
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -199,7 +204,7 @@ public class NewestSegmentFirstPolicyTest
public void testManySegmentsPerShard()
{
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new
Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(800000, new
Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -259,9 +264,9 @@ public class NewestSegmentFirstPolicyTest
final CompactionSegmentIterator iterator = policy.reset(
ImmutableMap.of(
unknownDataSource,
- createCompactionConfig(10000, new Period("P2D")),
+ createCompactionConfig(10000, new Period("P2D"), null),
DATA_SOURCE,
- createCompactionConfig(10000, new Period("P2D"))
+ createCompactionConfig(10000, new Period("P2D"), null)
),
ImmutableMap.of(
DATA_SOURCE,
@@ -307,7 +312,7 @@ public class NewestSegmentFirstPolicyTest
)
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"))),
+ ImmutableMap.of(DATA_SOURCE,
createCompactionConfig(inputSegmentSizeBytes, new Period("P0D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -340,7 +345,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -361,7 +366,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), null)),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -370,11 +375,87 @@ public class NewestSegmentFirstPolicyTest
}
@Test
+ public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityEqual()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ new
SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"),
new Period("P1D")),
+ new
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
new Period("P1D"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UniformGranularitySpec(Granularities.DAY, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+
+ // We should only get segments in Oct
+ final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-12-02T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+ }
+
+ @Test
+ public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularityLarger()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ // This contains segment that
+ // - Cross between month boundary of latest month (starts in Nov and
ends in Dec). This should be skipped
+ // - Fully in latest month (starts in Dec and ends in Dec). This
should be skipped
+ // - Does not overlap latest month (starts in Oct and ends in Oct).
This should not be skipped
+ new
SegmentGenerateSpec(Intervals.of("2017-11-30T23:00:00/2017-12-03T00:00:00"),
new Period("PT5H")),
+ new
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
new Period("PT5H"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+
+ // We should only get segments in Oct
+ final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> actual = iterator.next();
+ Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testIfSegmentsSkipOffsetWithConfiguredSegmentGranularitySmaller()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ new
SegmentGenerateSpec(Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"),
new Period("PT5H")),
+ new
SegmentGenerateSpec(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
new Period("PT5H"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+
+ // We should only get segments in Oct
+ final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-14T00:00:00/2017-10-15T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(Iterables.concat(ImmutableSet.copyOf(iterator))));
+ }
+
+ @Test
public void testWithSkipIntervals()
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("P1D"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -414,7 +495,7 @@ public class NewestSegmentFirstPolicyTest
{
final Period segmentPeriod = new Period("PT1H");
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1H"))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, new
Period("PT1H"), null)),
ImmutableMap.of(
DATA_SOURCE,
createTimeline(
@@ -455,6 +536,113 @@ public class NewestSegmentFirstPolicyTest
);
}
+ @Test
+ public void testIteratorReturnsSegmentsInConfiguredSegmentGranularity()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ // Segments with day interval from Oct to Dec
+ new
SegmentGenerateSpec(Intervals.of("2017-10-01T00:00:00/2017-12-31T00:00:00"),
new Period("P1D"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+
+ // We should get all segments in timeline back since skip offset is P0D.
+ // However, we only need to iterator 3 times (once for each month) since
the new configured segmentGranularity is MONTH.
+ // and hence iterator would return all segments bucketed to the configured
segmentGranularity
+ // Month of Dec
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-31T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ // Month of Nov
+ Assert.assertTrue(iterator.hasNext());
+ expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-11-01T00:00:00/2017-12-01T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ // Month of Oct
+ Assert.assertTrue(iterator.hasNext());
+ expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-11-01T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void
testIteratorReturnsSegmentsInMultipleIntervalIfConfiguredSegmentGranularityCrossBoundary()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ new SegmentGenerateSpec(Intervals.of("2020-01-01/2020-01-08"), new
Period("P7D")),
+ new SegmentGenerateSpec(Intervals.of("2020-01-28/2020-02-03"), new
Period("P7D")),
+ new SegmentGenerateSpec(Intervals.of("2020-02-08/2020-02-15"), new
Period("P7D"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UniformGranularitySpec(Granularities.MONTH, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ // We should get the segment of "2020-01-28/2020-02-03" back twice when
the iterator returns for Jan and when the
+ // iterator returns for Feb.
+
+ // Month of Jan
+ List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-28/2020-02-15"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> actual = iterator.next();
+ Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
+ // Month of Feb
+ expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2020-01-01/2020-02-03"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertTrue(iterator.hasNext());
+ actual = iterator.next();
+ Assert.assertEquals(expectedSegmentsToCompact.size(), actual.size());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(actual));
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testIteratorDoesNotReturnCompactedInterval()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ new
SegmentGenerateSpec(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
new Period("P1D"))
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P0D"), new UniformGranularitySpec(Granularities.MINUTE, null, null))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+
+ final List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals(ImmutableSet.copyOf(expectedSegmentsToCompact),
ImmutableSet.copyOf(iterator.next()));
+ // Iterator should return only once since all the "minute" interval of the
iterator contains the same interval
+ Assert.assertFalse(iterator.hasNext());
+ }
+
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
@@ -546,7 +734,8 @@ public class NewestSegmentFirstPolicyTest
private DataSourceCompactionConfig createCompactionConfig(
long inputSegmentSizeBytes,
- Period skipOffsetFromLatest
+ Period skipOffsetFromLatest,
+ GranularitySpec granularitySpec
)
{
return new DataSourceCompactionConfig(
@@ -556,6 +745,7 @@ public class NewestSegmentFirstPolicyTest
null,
skipOffsetFromLatest,
null,
+ granularitySpec,
null
);
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 61e0a32..ed6599a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -277,7 +277,8 @@ public class SystemSchemaTest extends CalciteTestBase
private final CompactionState expectedCompactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
- Collections.singletonMap("test", "map")
+ Collections.singletonMap("test", "map"),
+ Collections.singletonMap("test2", "map2")
);
private final DataSegment publishedCompactedSegment1 = new DataSegment(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]