This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 33d9d9b Add rollup config to auto and manual compaction (#11850)
33d9d9b is described below
commit 33d9d9bd74ade384ef5feb31748b989122deb160
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Oct 29 10:22:25 2021 -0700
Add rollup config to auto and manual compaction (#11850)
* add rollup to auto and manual compaction
* add unit tests
* add unit tests
* add IT
* fix checkstyle
---
docs/configuration/index.md | 1 +
docs/ingestion/compaction.md | 11 ++-
.../druid/indexing/common/task/CompactionTask.java | 6 +-
.../task/ClientCompactionTaskQuerySerdeTest.java | 10 +-
.../common/task/CompactionTaskParallelRunTest.java | 4 +-
.../common/task/CompactionTaskRunTest.java | 10 +-
.../indexing/common/task/CompactionTaskTest.java | 65 +++++++++++--
.../coordinator/duty/ITAutoCompactionTest.java | 72 ++++++++++++---
.../duty/ITAutoCompactionUpgradeTest.java | 2 +-
.../indexer/wikipedia_index_rollup_queries.json | 56 ++++++++++++
.../indexer/wikipedia_index_task_no_rollup.json | 76 ++++++++++++++++
.../ClientCompactionTaskGranularitySpec.java | 19 +++-
.../UserCompactionTaskGranularityConfig.java | 17 +++-
.../server/coordinator/duty/CompactSegments.java | 4 +-
.../duty/NewestSegmentFirstIterator.java | 64 ++++++++-----
.../DataSourceCompactionConfigTest.java | 40 +++++++-
.../coordinator/duty/CompactSegmentsTest.java | 68 +++++++++++++-
.../coordinator/duty/KillCompactionConfigTest.java | 6 +-
.../duty/NewestSegmentFirstIteratorTest.java | 18 ++--
.../duty/NewestSegmentFirstPolicyTest.java | 101 +++++++++++++++++----
.../CoordinatorCompactionConfigsResourceTest.java | 8 +-
21 files changed, 542 insertions(+), 116 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 6f5b053..b346f4b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -995,6 +995,7 @@ You can optionally use the `granularitySpec` object to
configure the segment gra
|Field|Description|Required|
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity.
Defaults to 'null', which preserves the original segment granularity. Accepts
all [Query granularity](../querying/granularities.md) values.|No|
+|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null',
which preserves the original setting. Note that once data is rollup, individual
records can no longer be recovered. |No|
> Unlike manual compaction, automatic compaction does not support query
> granularity.
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index 8ccf9a9..88942ca 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -192,7 +192,8 @@ You can optionally use the `granularitySpec` object to
configure the segment gra
,
"granularitySpec": {
"segmentGranularity": <time_period>,
- "queryGranularity": <time_period>
+ "queryGranularity": <time_period>,
+ "rollup": true
}
...
```
@@ -203,8 +204,9 @@ You can optionally use the `granularitySpec` object to
configure the segment gra
|-----|-----------|--------|
|`segmentGranularity`|Time chunking period for the segment granularity.
Defaults to 'null', which preserves the original segment granularity. Accepts
all [Query granularity](../querying/granularities.md) values.|No|
|`queryGranularity`|Time chunking period for the query granularity. Defaults
to 'null', which preserves the original query granularity. Accepts all [Query
granularity](../querying/granularities.md) values. Not supported for automatic
compaction.|No|
+|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null',
which preserves the original setting. Note that once data is rollup, individual
records can no longer be recovered. |No|
-For example, to set the segment granularity to "day" and the query granularity
to "hour":
+For example, to set the segment granularity to "day", the query granularity to
"hour", and enabling rollup:
```json
{
"type" : "compact",
@@ -213,11 +215,12 @@ For example, to set the segment granularity to "day" and
the query granularity t
"type": "compact",
"inputSpec": {
"type": "interval",
- "interval": "2017-01-01/2018-01-01",
+ "interval": "2017-01-01/2018-01-01"
},
"granularitySpec": {
"segmentGranularity":"day",
- "queryGranularity":"hour"
+ "queryGranularity":"hour",
+ "rollup": true
}
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index cdb637f..e2098e0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -227,7 +227,7 @@ public class CompactionTask extends AbstractBatchIndexTask
));
}
if (granularitySpec == null && segmentGranularity != null) {
- this.granularitySpec = new
ClientCompactionTaskGranularitySpec(segmentGranularity, null);
+ this.granularitySpec = new
ClientCompactionTaskGranularitySpec(segmentGranularity, null, null);
} else {
this.granularitySpec = granularitySpec;
}
@@ -600,7 +600,7 @@ public class CompactionTask extends AbstractBatchIndexTask
dimensionsSpec,
metricsSpec,
granularitySpec == null
- ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse,
null)
+ ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse,
null, null)
: granularitySpec.withSegmentGranularity(segmentGranularityToUse)
);
@@ -729,7 +729,7 @@ public class CompactionTask extends AbstractBatchIndexTask
final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
queryGranularityToUse,
- rollup.get(),
+ granularitySpec.isRollup() == null ? rollup.get() :
granularitySpec.isRollup(),
Collections.singletonList(totalInterval)
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index fd7b699..8d78d32 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -116,7 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
- new ClientCompactionTaskGranularitySpec(Granularities.DAY,
Granularities.HOUR),
+ new ClientCompactionTaskGranularitySpec(Granularities.DAY,
Granularities.HOUR, true),
ImmutableMap.of("key", "value")
);
@@ -203,6 +203,10 @@ public class ClientCompactionTaskQuerySerdeTest
task.getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(
+ query.getGranularitySpec().isRollup(),
+ task.getGranularitySpec().isRollup()
+ );
+ Assert.assertEquals(
query.getIoConfig().isDropExisting(),
task.getIoConfig().isDropExisting()
);
@@ -264,7 +268,7 @@ public class ClientCompactionTaskQuerySerdeTest
null
)
)
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR,
true))
.build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -307,7 +311,7 @@ public class ClientCompactionTaskQuerySerdeTest
1000,
100
),
- new ClientCompactionTaskGranularitySpec(Granularities.DAY,
Granularities.HOUR),
+ new ClientCompactionTaskGranularitySpec(Granularities.DAY,
Granularities.HOUR, true),
new HashMap<>()
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 1732b3b..1d4b03a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -463,7 +463,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
// Set the dropExisting flag to true in the IOConfig of the compaction
task
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
@@ -496,7 +496,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
final CompactionTask compactionTask = builder
.inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
.build();
final Set<DataSegment> compactedSegments = runTask(compactionTask);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 19c6575..eebd875 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -605,7 +605,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day segmentGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, null))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -626,7 +626,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// hour segmentGranularity
final CompactionTask compactionTask2 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, null))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, null, null))
.build();
resultPair = runTask(compactionTask2);
@@ -660,7 +660,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
- .granularitySpec(new ClientCompactionTaskGranularitySpec(null,
Granularities.SECOND))
+ .granularitySpec(new ClientCompactionTaskGranularitySpec(null,
Granularities.SECOND, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -705,7 +705,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// day segmentGranularity and day queryGranularity
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
- .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY))
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY, null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -737,7 +737,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final CompactionTask compactionTask1 = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
- .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null))
+ .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null,
null))
.build();
Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 43d503d..e01f047 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -385,7 +385,7 @@ public class CompactionTaskTest
);
builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder2.tuningConfig(createTuningConfig());
- builder2.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
+ builder2.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY,
null));
final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
Assert.assertEquals(
taskCreatedWithGranularitySpec.getSegmentGranularity(),
@@ -404,7 +404,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
- builder.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY));
+ builder.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY,
null));
try {
builder.build();
}
@@ -433,7 +433,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
- builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null,
Granularities.DAY));
+ builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null,
Granularities.DAY, null));
try {
builder.build();
}
@@ -462,7 +462,7 @@ public class CompactionTaskTest
builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
builder.tuningConfig(createTuningConfig());
builder.segmentGranularity(Granularities.HOUR);
- builder.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
+ builder.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY,
null));
final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
Assert.assertEquals(Granularities.HOUR,
taskCreatedWithSegmentGranularity.getSegmentGranularity());
}
@@ -1315,7 +1315,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
- new ClientCompactionTaskGranularitySpec(new
PeriodGranularity(Period.months(3), null, null), null),
+ new ClientCompactionTaskGranularitySpec(new
PeriodGranularity(Period.months(3), null, null), null, null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@@ -1353,7 +1353,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
- new ClientCompactionTaskGranularitySpec(null, new
PeriodGranularity(Period.months(3), null, null)),
+ new ClientCompactionTaskGranularitySpec(null, new
PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@@ -1391,7 +1391,8 @@ public class CompactionTaskTest
null,
new ClientCompactionTaskGranularitySpec(
new PeriodGranularity(Period.months(3), null, null),
- new PeriodGranularity(Period.months(3), null, null)
+ new PeriodGranularity(Period.months(3), null, null),
+ null
),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
@@ -1467,7 +1468,7 @@ public class CompactionTaskTest
new PartitionConfigurationManager(TUNING_CONFIG),
null,
null,
- new ClientCompactionTaskGranularitySpec(null, null),
+ new ClientCompactionTaskGranularitySpec(null, null, null),
COORDINATOR_CLIENT,
segmentCacheManagerFactory,
RETRY_POLICY_FACTORY,
@@ -1494,6 +1495,54 @@ public class CompactionTaskTest
}
@Test
+ public void testGranularitySpecWithNotNullRollup()
+ throws IOException, SegmentLoadingException
+ {
+ final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ toolbox,
+ LockGranularity.TIME_CHUNK,
+ new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
+ new PartitionConfigurationManager(TUNING_CONFIG),
+ null,
+ null,
+ new ClientCompactionTaskGranularitySpec(null, null, true),
+ COORDINATOR_CLIENT,
+ segmentCacheManagerFactory,
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
+ );
+
+ Assert.assertEquals(6, ingestionSpecs.size());
+ for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
+
Assert.assertTrue(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
+ }
+ }
+
+ @Test
+ public void testGranularitySpecWithNullRollup()
+ throws IOException, SegmentLoadingException
+ {
+ final List<ParallelIndexIngestionSpec> ingestionSpecs =
CompactionTask.createIngestionSchema(
+ toolbox,
+ LockGranularity.TIME_CHUNK,
+ new SegmentProvider(DATA_SOURCE, new
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
+ new PartitionConfigurationManager(TUNING_CONFIG),
+ null,
+ null,
+ new ClientCompactionTaskGranularitySpec(null, null, null),
+ COORDINATOR_CLIENT,
+ segmentCacheManagerFactory,
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
+ );
+ Assert.assertEquals(6, ingestionSpecs.size());
+ for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
+ //Expect false since rollup value in metadata of existing segments are
null
+
Assert.assertFalse(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
+ }
+ }
+
+ @Test
public void testChooseFinestGranularityWithNulls()
{
List<Granularity> input = Arrays.asList(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 83cc761..495ecf1 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.tests.coordinator.duty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
@@ -71,6 +73,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
private static final String INDEX_TASK =
"/indexer/wikipedia_index_task.json";
+ private static final String INDEX_TASK_NO_ROLLUP =
"/indexer/wikipedia_index_task_no_rollup.json";
+ private static final String INDEX_ROLLUP_QUERIES_RESOURCE =
"/indexer/wikipedia_index_rollup_queries.json";
private static final String INDEX_QUERIES_RESOURCE =
"/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
@@ -290,7 +294,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), true);
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -307,7 +311,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to true
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), true);
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
LOG.info("Auto compaction test with DAY segment granularity");
@@ -340,7 +344,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -357,7 +361,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
newGranularity = Granularities.DAY;
// Set dropExisting to false
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
LOG.info("Auto compaction test with DAY segment granularity");
@@ -397,7 +401,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
Granularity newGranularity = Granularities.YEAR;
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null));
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null, null));
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -439,7 +443,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was
initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.DAY;
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null));
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null));
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@@ -471,7 +475,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
// Segments were compacted and already has DAY granularity since it was
initially ingested with DAY granularity.
// Now set auto compaction with DAY granularity in the granularitySpec
Granularity newGranularity = Granularities.YEAR;
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null));
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null));
forceTriggerAutoCompaction(1);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
@@ -495,7 +499,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR)
@@ -521,7 +525,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to true
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of
2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@@ -553,7 +557,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR)
@@ -579,7 +583,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
newGranularity = Granularities.MONTH;
// Set dropExisting to false
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
// Since dropExisting is set to true...
// This will submit a single compaction task for interval of
2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
@@ -605,6 +609,38 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
+ @Test
+ public void testAutoCompactionDutyWithRollup() throws Exception
+ {
+ loadData(INDEX_TASK_NO_ROLLUP);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ Map<String, Object> expectedResult = ImmutableMap.of(
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%",
ImmutableList.of(ImmutableMap.of("events",
ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ false
+ );
+ forceTriggerAutoCompaction(2);
+ expectedResult = ImmutableMap.of(
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%",
ImmutableList.of(ImmutableMap.of("events",
ImmutableList.of(ImmutableList.of(516.0))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
+
+ List<TaskResponseObject> compactTasksBefore =
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify rollup segments does not get compacted again
+ forceTriggerAutoCompaction(2);
+ List<TaskResponseObject> compactTasksAfter =
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
@@ -626,6 +662,11 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
private void verifyQuery(String queryResource) throws Exception
{
+ verifyQuery(queryResource, ImmutableMap.of());
+ }
+
+ private void verifyQuery(String queryResource, Map<String, Object>
expectedResults) throws Exception
+ {
String queryResponseTemplate;
try {
InputStream is =
AbstractITBatchIndexTest.class.getResourceAsStream(queryResource);
@@ -634,13 +675,18 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
catch (IOException e) {
throw new ISE(e, "could not read query file: %s", queryResource);
}
-
queryResponseTemplate = StringUtils.replace(
queryResponseTemplate,
"%%DATASOURCE%%",
fullDatasourceName
);
-
+ for (Map.Entry<String, Object> entry : expectedResults.entrySet()) {
+ queryResponseTemplate = StringUtils.replace(
+ queryResponseTemplate,
+ entry.getKey(),
+ jsonMapper.writeValueAsString(entry.getValue())
+ );
+ }
queryHelper.testQueriesFromString(queryResponseTemplate);
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index b960707..a4b1ed6 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -96,7 +96,7 @@ public class ITAutoCompactionUpgradeTest extends
AbstractIndexerTest
null,
1
),
- new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.YEAR, null,
null),
new UserCompactionTaskIOConfig(true),
null
);
diff --git
a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
new file mode 100644
index 0000000..93c58bd
--- /dev/null
+++
b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
@@ -0,0 +1,56 @@
+[
+ {
+ "description": "rows count",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter": {
+ "type": "selector",
+ "dimension": "language",
+ "value": "en",
+ "extractionFn": null
+ },
+ "aggregations":[
+ {
+ "type": "count",
+ "name": "count"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "count":%%EXPECTED_COUNT_RESULT%%
+ }
+ }
+ ]
+ },
+ {
+ "description": "scan with filter",
+ "query":{
+ "queryType" : "scan",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter": {
+ "type": "selector",
+ "dimension": "language",
+ "value": "en",
+ "extractionFn": null
+ },
+ "columns": [
+ "added"
+ ],
+ "resultFormat":"compactedList"
+ },
+ "expectedResults": %%EXPECTED_SCAN_RESULT%%,
+ "fieldsToTest": ["events"]
+ }
+]
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
new file mode 100644
index 0000000..821838b
--- /dev/null
+++
b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
@@ -0,0 +1,76 @@
+{
+ "type": "index",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ },
+ {
+ "name": "thetaSketch",
+ "type": "thetaSketch",
+ "fieldName": "user"
+ },
+ {
+ "name": "quantilesDoublesSketch",
+ "type": "quantilesDoublesSketch",
+ "fieldName": "delta"
+ },
+ {
+ "name": "HLLSketchBuild",
+ "type": "HLLSketchBuild",
+ "fieldName": "user"
+ }
+ ],
+ "granularitySpec": {
+ "segmentGranularity": "DAY",
+ "queryGranularity": "DAY",
+ "rollup": false,
+ "intervals" : [ "2013-08-31/2013-09-02" ]
+ },
+ "parser": {
+ "parseSpec": {
+ "format" : "json",
+ "timestampSpec": {
+ "column": "timestamp"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ {"type": "string", "name": "language", "createBitmapIndex":
false}
+ ]
+ }
+ }
+ }
+ },
+ "ioConfig": {
+ "type": "index",
+ "firehose": {
+ "type": "local",
+ "baseDir": "/resources/data/batch_index/json",
+ "filter": "wikipedia_index_data*"
+ }
+ },
+ "tuningConfig": {
+ "type": "index",
+ "maxRowsPerSegment": 10,
+ "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%%
+ }
+ }
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
index 74fd559..3ba732c 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
@@ -40,15 +40,18 @@ public class ClientCompactionTaskGranularitySpec
{
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
+ private final Boolean rollup;
@JsonCreator
public ClientCompactionTaskGranularitySpec(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
- @JsonProperty("queryGranularity") Granularity queryGranularity
+ @JsonProperty("queryGranularity") Granularity queryGranularity,
+ @JsonProperty("rollup") Boolean rollup
)
{
this.queryGranularity = queryGranularity;
this.segmentGranularity = segmentGranularity;
+ this.rollup = rollup;
}
@JsonProperty
@@ -63,9 +66,15 @@ public class ClientCompactionTaskGranularitySpec
return queryGranularity;
}
+ @JsonProperty
+ public Boolean isRollup()
+ {
+ return rollup;
+ }
+
public ClientCompactionTaskGranularitySpec
withSegmentGranularity(Granularity segmentGranularity)
{
- return new ClientCompactionTaskGranularitySpec(segmentGranularity,
queryGranularity);
+ return new ClientCompactionTaskGranularitySpec(segmentGranularity,
queryGranularity, rollup);
}
@Override
@@ -79,13 +88,14 @@ public class ClientCompactionTaskGranularitySpec
}
ClientCompactionTaskGranularitySpec that =
(ClientCompactionTaskGranularitySpec) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
- Objects.equals(queryGranularity, that.queryGranularity);
+ Objects.equals(queryGranularity, that.queryGranularity) &&
+ Objects.equals(rollup, that.rollup);
}
@Override
public int hashCode()
{
- return Objects.hash(segmentGranularity, queryGranularity);
+ return Objects.hash(segmentGranularity, queryGranularity, rollup);
}
@Override
@@ -94,6 +104,7 @@ public class ClientCompactionTaskGranularitySpec
return "ClientCompactionTaskGranularitySpec{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
+ ", rollup=" + rollup +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
index 9623e2a..d6b320a 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
@@ -39,15 +39,18 @@ public class UserCompactionTaskGranularityConfig
{
private final Granularity segmentGranularity;
private final Granularity queryGranularity;
+ private final Boolean rollup;
@JsonCreator
public UserCompactionTaskGranularityConfig(
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
- @JsonProperty("queryGranularity") Granularity queryGranularity
+ @JsonProperty("queryGranularity") Granularity queryGranularity,
+ @JsonProperty("rollup") Boolean rollup
)
{
this.queryGranularity = queryGranularity;
this.segmentGranularity = segmentGranularity;
+ this.rollup = rollup;
}
@JsonProperty("segmentGranularity")
@@ -62,6 +65,12 @@ public class UserCompactionTaskGranularityConfig
return queryGranularity;
}
+ @JsonProperty("rollup")
+ public Boolean isRollup()
+ {
+ return rollup;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -73,13 +82,14 @@ public class UserCompactionTaskGranularityConfig
}
UserCompactionTaskGranularityConfig that =
(UserCompactionTaskGranularityConfig) o;
return Objects.equals(segmentGranularity, that.segmentGranularity) &&
- Objects.equals(queryGranularity, that.queryGranularity);
+ Objects.equals(queryGranularity, that.queryGranularity) &&
+ Objects.equals(rollup, that.rollup);
}
@Override
public int hashCode()
{
- return Objects.hash(segmentGranularity, queryGranularity);
+ return Objects.hash(segmentGranularity, queryGranularity, rollup);
}
@Override
@@ -88,6 +98,7 @@ public class UserCompactionTaskGranularityConfig
return "UserCompactionTaskGranularityConfig{" +
"segmentGranularity=" + segmentGranularity +
", queryGranularity=" + queryGranularity +
+ ", rollup=" + rollup +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 9bb6f9c..0b2a586 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -345,7 +345,9 @@ public class CompactSegments implements CoordinatorDuty
if (config.getGranularitySpec() != null) {
queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
- config.getGranularitySpec().getQueryGranularity()
+ config.getGranularitySpec().getQueryGranularity(),
+ config.getGranularitySpec().isRollup()
+
);
} else {
queryGranularitySpec = null;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 3f9ee93..0291178 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -36,7 +37,6 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.CompactionState;
@@ -309,7 +309,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
}
@VisibleForTesting
- static PartitionsSpec
findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
+ static PartitionsSpec
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
{
final PartitionsSpec partitionsSpecFromTuningConfig =
tuningConfig.getPartitionsSpec();
if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
@@ -332,7 +332,7 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment());
- final PartitionsSpec partitionsSpecFromConfig =
findPartitinosSpecFromConfig(tuningConfig);
+ final PartitionsSpec partitionsSpecFromConfig =
findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState =
candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
log.info("Candidate segment[%s] is not compacted yet. Needs
compaction.", candidates.segments.get(0).getId());
@@ -384,32 +384,48 @@ public class NewestSegmentFirstIterator implements
CompactionSegmentIterator
return true;
}
- if (config.getGranularitySpec() != null &&
config.getGranularitySpec().getSegmentGranularity() != null) {
- // Only checks for segmentGranularity as auto compaction currently only
supports segmentGranularity
- final Granularity existingSegmentGranularity =
lastCompactionState.getGranularitySpec() != null ?
-
objectMapper.convertValue(lastCompactionState.getGranularitySpec(),
GranularitySpec.class).getSegmentGranularity() :
- null;
- if (existingSegmentGranularity == null) {
- // Candidate segments were all compacted without segment granularity
set.
- // We need to check if all segments have the same segment granularity
as the configured segment granularity.
- boolean needsCompaction = candidates.segments.stream()
- .anyMatch(segment ->
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
- if (needsCompaction) {
+ if (config.getGranularitySpec() != null) {
+
+ final ClientCompactionTaskGranularitySpec existingGranularitySpec =
lastCompactionState.getGranularitySpec() != null ?
+
objectMapper.convertValue(lastCompactionState.getGranularitySpec(),
ClientCompactionTaskGranularitySpec.class) :
+ null;
+ // Checks for segmentGranularity
+ if (config.getGranularitySpec().getSegmentGranularity() != null) {
+ final Granularity existingSegmentGranularity = existingGranularitySpec
!= null ?
+
existingGranularitySpec.getSegmentGranularity() :
+ null;
+ if (existingSegmentGranularity == null) {
+ // Candidate segments were all compacted without segment granularity
set.
+ // We need to check if all segments have the same segment
granularity as the configured segment granularity.
+ boolean needsCompaction = candidates.segments.stream()
+ .anyMatch(segment ->
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
+ if (needsCompaction) {
+ log.info(
+ "Segments were previously compacted but without
segmentGranularity in auto compaction."
+ + " Configured segmentGranularity[%s] is different from
granularity implied by segment intervals. Needs compaction",
+ config.getGranularitySpec().getSegmentGranularity()
+ );
+ return true;
+ }
+
+ } else if
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
{
log.info(
- "Segments were previously compacted but without
segmentGranularity in auto compaction."
- + " Configured segmentGranularity[%s] is different from
granularity implied by segment intervals. Needs compaction",
- config.getGranularitySpec().getSegmentGranularity()
+ "Configured segmentGranularity[%s] is different from the
segmentGranularity[%s] of segments. Needs compaction",
+ config.getGranularitySpec().getSegmentGranularity(),
+ existingSegmentGranularity
);
return true;
}
+ }
- } else if
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
{
- log.info(
- "Configured segmentGranularity[%s] is different from the
segmentGranularity[%s] of segments. Needs compaction",
- config.getGranularitySpec().getSegmentGranularity(),
- existingSegmentGranularity
- );
- return true;
+ // Checks for rollup
+ if (config.getGranularitySpec().isRollup() != null) {
+ final Boolean existingRollup = existingGranularitySpec != null ?
+ existingGranularitySpec.isRollup() :
+ null;
+ if (existingRollup == null ||
!config.getGranularitySpec().isRollup().equals(existingRollup)) {
+ return true;
+ }
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index c798d29..9ff6eac 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -238,7 +238,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
@@ -265,7 +265,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR,
Granularities.MONTH),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR,
Granularities.MONTH, null),
null,
ImmutableMap.of("key", "val")
);
@@ -308,7 +308,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(null, null),
+ new UserCompactionTaskGranularityConfig(null, null, null),
null,
ImmutableMap.of("key", "val")
);
@@ -326,6 +326,36 @@ public class DataSourceCompactionConfigTest
}
@Test
+ public void testSerdeGranularitySpecWithRollup() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ final String json = OBJECT_MAPPER.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(),
fromJson.getInputSegmentSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(),
fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(),
fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ Assert.assertNotNull(config.getGranularitySpec());
+ Assert.assertNotNull(fromJson.getGranularitySpec());
+ Assert.assertEquals(config.getGranularitySpec().isRollup(),
fromJson.getGranularitySpec().isRollup());
+ }
+
+ @Test
public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
{
final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@@ -335,7 +365,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
new UserCompactionTaskIOConfig(true),
ImmutableMap.of("key", "val")
);
@@ -363,7 +393,7 @@ public class DataSourceCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
new UserCompactionTaskIOConfig(null),
ImmutableMap.of("key", "val")
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index f4cdd9c..494f567 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -829,7 +829,7 @@ public class CompactSegmentsTest
null,
null
),
- new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.YEAR, null,
null),
null,
null
)
@@ -852,7 +852,65 @@ public class CompactSegmentsTest
Assert.assertEquals(datasourceToSegments.get(dataSource).size(),
segmentsCaptor.getValue().size());
ClientCompactionTaskGranularitySpec actual =
granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
- ClientCompactionTaskGranularitySpec expected = new
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
+ ClientCompactionTaskGranularitySpec expected = new
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testCompactWithRollupInGranularitySpec()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ new UserCompactionTaskGranularityConfig(Granularities.YEAR, null,
true),
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<List<DataSegment>> segmentsCaptor =
ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor<ClientCompactionTaskGranularitySpec>
granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
+ ClientCompactionTaskGranularitySpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ segmentsCaptor.capture(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Assert.assertEquals(datasourceToSegments.get(dataSource).size(),
segmentsCaptor.getValue().size());
+ ClientCompactionTaskGranularitySpec actual =
granularitySpecArgumentCaptor.getValue();
+ Assert.assertNotNull(actual);
+ ClientCompactionTaskGranularitySpec expected = new
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true);
Assert.assertEquals(expected, actual);
}
@@ -888,7 +946,7 @@ public class CompactSegmentsTest
null
),
null,
- new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
+ new ClientCompactionTaskGranularitySpec(Granularities.DAY, null,
null),
null
)
);
@@ -923,7 +981,7 @@ public class CompactSegmentsTest
null,
null
),
- new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.YEAR, null,
null),
null,
null
)
@@ -951,7 +1009,7 @@ public class CompactSegmentsTest
Assert.assertEquals(datasourceToSegments.get(dataSource).size(),
segmentsCaptor.getValue().size());
ClientCompactionTaskGranularitySpec actual =
granularitySpecArgumentCaptor.getValue();
Assert.assertNotNull(actual);
- ClientCompactionTaskGranularitySpec expected = new
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
+ ClientCompactionTaskGranularitySpec expected = new
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
Assert.assertEquals(expected, actual);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 4aad5df..a7d1d37 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -230,7 +230,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
@@ -242,7 +242,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
@@ -346,7 +346,7 @@ public class KillCompactionConfigTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 0cf0ded..092bc9f 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -97,7 +97,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -137,7 +137,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -177,7 +177,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -217,7 +217,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -257,7 +257,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -297,7 +297,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -337,7 +337,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -377,7 +377,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
@@ -417,7 +417,7 @@ public class NewestSegmentFirstIteratorTest
);
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
- NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+ NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment())
)
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 42cc20c..b79156f 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null,
null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -422,7 +422,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -448,7 +448,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -557,7 +557,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -606,7 +606,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -641,7 +641,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -664,7 +664,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -688,7 +688,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -708,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null,
null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -741,7 +741,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=DAY
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null,
null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -774,7 +774,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -797,7 +797,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -817,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
// Auto compaction config sets segmentGranularity=YEAR
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -840,7 +840,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -865,6 +865,7 @@ public class NewestSegmentFirstPolicyTest
null,
DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Bangkok"))
),
+ null,
null
)
)
@@ -891,7 +892,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -915,6 +916,7 @@ public class NewestSegmentFirstPolicyTest
DateTimes.of("2012-01-02T00:05:00.000Z"),
DateTimeZone.UTC
),
+ null,
null
)
)
@@ -936,6 +938,66 @@ public class NewestSegmentFirstPolicyTest
}
@Test
+ public void
testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentRollup()
+ {
+ // Same indexSpec as what is set in the auto compaction config
+ Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new
TypeReference<Map<String, Object>>() {});
+ // Same partitionsSpec as what is set in the auto compaction config
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+
+ // Create segments that were compacted (CompactionState != null) and have
+ // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+ // rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+ // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, indexSpec,
ImmutableMap.of("rollup", "false"))
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, indexSpec,
ImmutableMap.of("rollup", "true"))
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+ )
+ );
+
+ // Auto compaction config sets rollup=true
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and
interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ Assert.assertTrue(iterator.hasNext());
+ expectedSegmentsToCompact = new ArrayList<>(
+
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void
testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
{
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -944,7 +1006,7 @@ public class NewestSegmentFirstPolicyTest
);
final CompactionSegmentIterator iterator = policy.reset(
- ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR,
null))),
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR,
null, null))),
ImmutableMap.of(DATA_SOURCE, timeline),
Collections.emptyMap()
);
@@ -971,7 +1033,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(),
null, null, null);
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec,
new TypeReference<Map<String, Object>>() {});
- PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
+ PartitionsSpec partitionsSpec =
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
null));
// Create segments that were compacted (CompactionState != null) and have
segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline =
createTimeline(
@@ -995,6 +1057,7 @@ public class NewestSegmentFirstPolicyTest
null,
DateTimeZone.UTC
),
+ null,
null
)
)
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index c130dc3..353550d 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -55,7 +55,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
ImmutableMap.of("key", "val")
);
@@ -150,7 +150,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
true),
null,
ImmutableMap.of("key", "val")
);
@@ -190,7 +190,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
@@ -311,7 +311,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new Period(3600),
null,
- new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null,
null),
null,
ImmutableMap.of("key", "val")
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]