This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4576152 Make dropExisting flag for Compaction configurable and add
warning documentations (#11070)
4576152 is described below
commit 4576152e4a0213d17048a330e7089aa9d89f3972
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Apr 9 00:12:28 2021 -0700
Make dropExisting flag for Compaction configurable and add warning
documentations (#11070)
* Make dropExisting flag for Compaction configurable
* fix checkstyle
* fix checkstyle
* fix test
* add tests
* fix spelling
* fix docs
* add IT
* fix test
* fix doc
* fix doc
---
.../NewestSegmentFirstPolicyBenchmark.java | 1 +
docs/configuration/index.md | 10 ++
docs/ingestion/compaction.md | 12 +-
docs/ingestion/native-batch.md | 7 +-
.../indexing/common/task/CompactionIOConfig.java | 20 ++-
.../druid/indexing/common/task/CompactionTask.java | 29 ++--
.../druid/indexing/common/task/IndexTask.java | 1 -
.../task/ClientCompactionTaskQuerySerdeTest.java | 12 +-
.../common/task/CompactionTaskParallelRunTest.java | 46 ++++++-
.../common/task/CompactionTaskRunTest.java | 83 ++++++++++-
.../indexing/common/task/CompactionTaskTest.java | 89 ++++++++----
.../coordinator/duty/ITAutoCompactionTest.java | 151 +++++++++++++++++++--
.../client/indexing/ClientCompactionIOConfig.java | 21 ++-
.../client/indexing/HttpIndexingServiceClient.java | 3 +-
.../client/indexing/IndexingServiceClient.java | 1 +
.../apache/druid/segment/indexing/IOConfig.java | 1 +
.../coordinator/DataSourceCompactionConfig.java | 12 ++
.../coordinator/UserCompactionTaskIOConfig.java | 39 +++---
.../server/coordinator/duty/CompactSegments.java | 6 +
.../client/indexing/NoopIndexingServiceClient.java | 1 +
.../DataSourceCompactionConfigTest.java | 64 +++++++++
.../coordinator/duty/CompactSegmentsTest.java | 115 +++++++++++++++-
.../duty/NewestSegmentFirstIteratorTest.java | 9 ++
.../duty/NewestSegmentFirstPolicyTest.java | 1 +
website/.spelling | 1 +
25 files changed, 644 insertions(+), 91 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index e744bf9..b93053e 100644
---
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -100,6 +100,7 @@ public class NewestSegmentFirstPolicyBenchmark
null,
null,
null,
+ null,
null
)
);
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 30cc5eb..b56f752 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -867,6 +867,7 @@ A description of the compaction config is:
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task
TuningConfig](#automatic-compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction
tasks.|no|
|`granularitySpec`|Custom `granularitySpec` to describe the
`segmentGranularity` for the compacted segments. See [Automatic compaction
granularitySpec](#automatic-compaction-granularityspec)|No|
+|`ioConfig`|IO config for compaction tasks. See below [Compaction Task
IOConfig](#automatic-compaction-ioconfig).|no|
An example of compaction config is:
@@ -918,6 +919,15 @@ You can optionally use the `granularitySpec` object to
configure the segment gra
> Unlike manual compaction, automatic compaction does not support query
> granularity.
+###### Automatic compaction IOConfig
+
+Auto compaction supports a subset of the [IOConfig for Parallel
task](../ingestion/native-batch.md).
+The below is a list of the supported configurations for auto compaction.
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|--------|
+|`dropExisting`|If `true`, then the generated compaction task drops (mark
unused) all existing segments fully contained by the umbrella interval of the
compacted segments when the task publishes new segments. If compaction fails,
Druid does not drop or mark unused any segments. WARNING: this functionality is
still in beta and can result in temporary data unavailability for data within
the compacted `interval`. Note that changing this config does not cause
intervals to be compacted again. [...]
+
### Overlord
For general Overlord Process information, see [here](../design/overlord.md).
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index f6846dc..9f34381 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -54,7 +54,7 @@ See [Setting up a manual compaction
task](#setting-up-manual-compaction) for mor
## Data handling with compaction
During compaction, Druid overwrites the original set of segments with the
compacted set. Druid also locks the segments for the time interval being
compacted to ensure data consistency. By default, compaction tasks do not
modify the underlying data. You can configure the compaction task to change the
query granularity or add or remove dimensions in the compaction task. This
means that the only changes to query results should be the result of
intentional, not automatic, changes.
-For compaction tasks, `dropExisting` for underlying ingestion tasks is "true".
This means that Druid can drop (mark unused) all the existing segments fully
within interval for the compaction task. For an example of why this is
important, see the suggestion for reindexing with finer granularity under
[Implementation considerations](native-batch.md#implementation-considerations).
+For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for
Druid to drop (mark unused) all existing segments fully contained by the
interval of the compaction task. For an example of why this is important, see
the suggestion for reindexing with finer granularity under [Implementation
considerations](native-batch.md#implementation-considerations). WARNING: this
functionality is still in beta and can result in temporary data unavailability
for data within the compaction ta [...]
If an ingestion task needs to write data to a segment for a time interval
locked for compaction, by default the ingestion task supersedes the compaction
task and the compaction task fails without finishing. For manual compaction
tasks you can adjust the input spec interval to avoid conflicts between
ingestion and compaction. For automatic compaction, you can set the
`skipOffsetFromLatest` key to adjustment the auto compaction starting point
from the current time to reduce the chance of c [...]
@@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid
retains the original segm
The compaction `ioConfig` requires specifying `inputSpec` as follows:
-|Field|Description|Required|
-|-----|-----------|--------|
-|`type`|Task type. Should be `compact`|Yes|
-|`inputSpec`|Input specification|Yes|
+|Field|Description|Default|Required?|
+|-----|-----------|-------|--------|
+|`type`|Task type. Should be `compact`|none|Yes|
+|`inputSpec`|Input specification|none|Yes|
+|`dropExisting`|If `true`, then the compaction task drops (mark unused) all
existing segments fully contained by either the `interval` in the `interval`
type `inputSpec` or the umbrella interval of the `segments` in the `segment`
type `inputSpec` when the task publishes new compacted segments. If compaction
fails, Druid does not drop or mark unused any segments. WARNING: this
functionality is still in beta and can result in temporary data unavailability
for data within the compaction tas [...]
+
There are two supported `inputSpec`s for now.
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 5e7b337..48539bb 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion
are `bz2`, `gz`, `x
`granularitySpec`'s intervals, the portion of those segments outside the new
segments' intervals will still be visible.
- You can set `dropExisting` flag in the `ioConfig` to true if you want the
ingestion task to drop all existing segments that
start and end within your `granularitySpec`'s intervals. This applies
whether or not the new data covers all existing segments.
- `dropExisting` only applies when `appendToExisting` is false and the
`granularitySpec` contains an `interval`.
+ `dropExisting` only applies when `appendToExisting` is false and the
`granularitySpec` contains an `interval`. WARNING: this
+ functionality is still in beta and can result in temporary data
unavailability for data within the specified `interval`
The following examples demonstrate when to set the `dropExisting` property
to true in the `ioConfig`:
@@ -220,7 +221,7 @@ that range if there's some stray data with unexpected
timestamps.
|type|The task type, this should always be `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task drops (mark
unused) all existing segments fully contained by the specified `interval` when
the task publishes new segments. If ingestion fails, Druid does not drop or
mark unused any segments. In the case of misconfiguration where either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec`, Druid does not drop any segments even if [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task drops (mark
unused) all existing segments fully contained by the specified `interval` when
the task publishes new segments. If ingestion fails, Druid does not drop or
mark unused any segments. In the case of misconfiguration where either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec`, Druid does not drop any segments even if [...]
### `tuningConfig`
@@ -749,7 +750,7 @@ that range if there's some stray data with unexpected
timestamps.
|type|The task type, this should always be "index".|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to
parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version,
effectively appending to the segment set instead of replacing it. This means
that you can append new segments to any datasource regardless of its original
partitioning scheme. You must use the `dynamic` partitioning type for the
appended segments. If you specify a different partitioning type, the task fails
with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task drops (mark
unused) all existing segments fully contained by the specified `interval` when
the task publishes new segments. If ingestion fails, Druid does not drop or
mark unused any segments. In the case of misconfiguration where either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec`, Druid does not drop any segments even if [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the
`granularitySpec` contains an`interval`, then the ingestion task drops (mark
unused) all existing segments fully contained by the specified `interval` when
the task publishes new segments. If ingestion fails, Druid does not drop or
mark unused any segments. In the case of misconfiguration where either
`appendToExisting` is `true` or `interval` is not specified in
`granularitySpec`, Druid does not drop any segments even if [...]
### `tuningConfig`
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
index 60972de..faedd3d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.IOConfig;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -36,11 +37,16 @@ import java.util.Objects;
public class CompactionIOConfig implements IOConfig
{
private final CompactionInputSpec inputSpec;
+ private final boolean dropExisting;
@JsonCreator
- public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec
inputSpec)
+ public CompactionIOConfig(
+ @JsonProperty("inputSpec") CompactionInputSpec inputSpec,
+ @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+ )
{
this.inputSpec = inputSpec;
+ this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING :
dropExisting;
}
@JsonProperty
@@ -49,6 +55,12 @@ public class CompactionIOConfig implements IOConfig
return inputSpec;
}
+ @JsonProperty
+ public boolean isDropExisting()
+ {
+ return dropExisting;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -59,13 +71,14 @@ public class CompactionIOConfig implements IOConfig
return false;
}
CompactionIOConfig that = (CompactionIOConfig) o;
- return Objects.equals(inputSpec, that.inputSpec);
+ return dropExisting == that.dropExisting &&
+ Objects.equals(inputSpec, that.inputSpec);
}
@Override
public int hashCode()
{
- return Objects.hash(inputSpec);
+ return Objects.hash(inputSpec, dropExisting);
}
@Override
@@ -73,6 +86,7 @@ public class CompactionIOConfig implements IOConfig
{
return "CompactionIOConfig{" +
"inputSpec=" + inputSpec +
+ ", dropExisting=" + dropExisting +
'}';
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 18f5f71..1961de1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -203,11 +203,11 @@ public class CompactionTask extends AbstractBatchIndexTask
if (ioConfig != null) {
this.ioConfig = ioConfig;
} else if (interval != null) {
- this.ioConfig = new CompactionIOConfig(new
CompactionIntervalSpec(interval, null));
+ this.ioConfig = new CompactionIOConfig(new
CompactionIntervalSpec(interval, null), null);
} else {
// We already checked segments is not null or empty above.
//noinspection ConstantConditions
- this.ioConfig = new
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
+ this.ioConfig = new
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
}
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
@@ -424,7 +424,8 @@ public class CompactionTask extends AbstractBatchIndexTask
granularitySpec,
toolbox.getCoordinatorClient(),
segmentLoaderFactory,
- retryPolicyFactory
+ retryPolicyFactory,
+ ioConfig.isDropExisting()
);
final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
.range(0, ingestionSpecs.size())
@@ -532,7 +533,8 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
- final RetryPolicyFactory retryPolicyFactory
+ final RetryPolicyFactory retryPolicyFactory,
+ final boolean dropExisting
) throws IOException, SegmentLoadingException
{
NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String,
DataSegment>>> pair = prepareSegments(
@@ -614,7 +616,8 @@ public class CompactionTask extends AbstractBatchIndexTask
interval,
coordinatorClient,
segmentLoaderFactory,
- retryPolicyFactory
+ retryPolicyFactory,
+ dropExisting
),
compactionTuningConfig
)
@@ -641,7 +644,8 @@ public class CompactionTask extends AbstractBatchIndexTask
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
- retryPolicyFactory
+ retryPolicyFactory,
+ dropExisting
),
compactionTuningConfig
)
@@ -655,7 +659,8 @@ public class CompactionTask extends AbstractBatchIndexTask
Interval interval,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
- RetryPolicyFactory retryPolicyFactory
+ RetryPolicyFactory retryPolicyFactory,
+ boolean dropExisting
)
{
return new ParallelIndexIOConfig(
@@ -675,7 +680,7 @@ public class CompactionTask extends AbstractBatchIndexTask
),
null,
false,
- true
+ dropExisting
);
}
@@ -1062,7 +1067,13 @@ public class CompactionTask extends
AbstractBatchIndexTask
public Builder inputSpec(CompactionInputSpec inputSpec)
{
- this.ioConfig = new CompactionIOConfig(inputSpec);
+ this.ioConfig = new CompactionIOConfig(inputSpec, null);
+ return this;
+ }
+
+ public Builder inputSpec(CompactionInputSpec inputSpec, Boolean
dropExisting)
+ {
+ this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
return this;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 75ae4d2..3b759bc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1046,7 +1046,6 @@ public class IndexTask extends AbstractBatchIndexTask
implements ChatHandler
public static class IndexIOConfig implements BatchIOConfig
{
private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
- private static final boolean DEFAULT_DROP_EXISTING = false;
private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 6e93f3a..04b1b00 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -83,7 +83,8 @@ public class ClientCompactionTaskQuerySerdeTest
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
- )
+ ),
+ true
),
new ClientCompactionTaskQueryTuningConfig(
null,
@@ -201,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest
query.getGranularitySpec().getSegmentGranularity(),
task.getGranularitySpec().getSegmentGranularity()
);
+ Assert.assertEquals(
+ query.getIoConfig().isDropExisting(),
+ task.getIoConfig().isDropExisting()
+ );
Assert.assertEquals(query.getContext(), task.getContext());
}
@@ -214,7 +219,7 @@ public class ClientCompactionTaskQuerySerdeTest
new RetryPolicyFactory(new RetryPolicyConfig())
);
final CompactionTask task = builder
- .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"))
+ .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"), true)
.tuningConfig(
new ParallelIndexTuningConfig(
null,
@@ -269,7 +274,8 @@ public class ClientCompactionTaskQuerySerdeTest
new ClientCompactionIntervalSpec(
Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"
- )
+ ),
+ true
),
new ClientCompactionTaskQueryTuningConfig(
100,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 95826fa..95f7e3a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -443,7 +443,7 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
}
@Test
- public void testCompactionDropSegmentsOfInputInterval()
+ public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet()
{
runIndexTask(null, true);
@@ -459,7 +459,8 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
RETRY_POLICY_FACTORY
);
final CompactionTask compactionTask = builder
- .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+ // Set the dropExisting flag to true in the IOConfig of the compaction
task
+ .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
.granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
.build();
@@ -475,6 +476,47 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
}
}
+ @Test
+ public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet()
+ {
+ runIndexTask(null, true);
+
+ Collection<DataSegment> usedSegments =
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE,
ImmutableList.of(INTERVAL_TO_INDEX));
+ Assert.assertEquals(3, usedSegments.size());
+ for (DataSegment segment : usedSegments) {
+ Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval()));
+ }
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ getSegmentLoaderFactory(),
+ RETRY_POLICY_FACTORY
+ );
+ final CompactionTask compactionTask = builder
+ .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
+ .granularitySpec(new
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+ .build();
+
+ final Set<DataSegment> compactedSegments = runTask(compactionTask);
+
+ usedSegments =
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE,
ImmutableList.of(INTERVAL_TO_INDEX));
+ // All the HOUR segments did not get dropped since MINUTES segments did
not fully covering the 3 HOURS interval.
+ Assert.assertEquals(6, usedSegments.size());
+ int hourSegmentCount = 0;
+ int minuteSegmentCount = 0;
+ for (DataSegment segment : usedSegments) {
+ if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+ minuteSegmentCount++;
+ }
+ if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+ hourSegmentCount++;
+ }
+ }
+ Assert.assertEquals(3, hourSegmentCount);
+ Assert.assertEquals(3, minuteSegmentCount);
+ }
+
private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean
appendToExisting)
{
ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 384d26f..03acabd 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -803,7 +803,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
@Test
- public void
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact()
throws Exception
+ public void
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue()
throws Exception
{
// This test fails with segment lock because of the bug reported in
https://github.com/apache/druid/issues/10911.
if (lockGranularity == LockGranularity.SEGMENT) {
@@ -842,8 +842,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
final CompactionTask partialCompactionTask = builder
- .interval(compactionPartialInterval)
.segmentGranularity(Granularities.MINUTE)
+ // Set dropExisting to true
+ .inputSpec(new CompactionIntervalSpec(compactionPartialInterval,
null), true)
.build();
final Pair<TaskStatus, List<DataSegment>> partialCompactionResult =
runTask(partialCompactionTask);
@@ -865,8 +866,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
final CompactionTask fullCompactionTask = builder
- .interval(Intervals.of("2014-01-01/2014-01-02"))
.segmentGranularity(null)
+ // Set dropExisting to true
+ .inputSpec(new
CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), true)
.build();
final Pair<TaskStatus, List<DataSegment>> fullCompactionResult =
runTask(fullCompactionTask);
@@ -904,6 +906,81 @@ public class CompactionTaskRunTest extends
IngestionTestBase
}
@Test
+ public void
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse()
throws Exception
+ {
+ // This test fails with segment lock because of the bug reported in
https://github.com/apache/druid/issues/10911.
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ runIndexTask();
+
+ final Set<DataSegment> expectedSegments = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+
+ final Interval partialInterval =
Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+ final CompactionTask partialCompactionTask = builder
+ .segmentGranularity(Granularities.MINUTE)
+ // Set dropExisting to false
+ .inputSpec(new CompactionIntervalSpec(partialInterval, null), false)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> partialCompactionResult =
runTask(partialCompactionTask);
+ Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+ // All segments in the previous expectedSegments should still appear as
they have larger segment granularity.
+ expectedSegments.addAll(partialCompactionResult.rhs);
+
+ final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+ final CompactionTask fullCompactionTask = builder
+ .segmentGranularity(null)
+ // Set dropExisting to false
+ .inputSpec(new
CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), false)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> fullCompactionResult =
runTask(fullCompactionTask);
+ Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+ final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ segmentsAfterFullCompaction.sort(
+ (s1, s2) ->
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(),
s2.getInterval())
+ );
+
+ Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+ for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+ Assert.assertEquals(
+ Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d",
i, i + 1)),
+ segmentsAfterFullCompaction.get(i).getInterval()
+ );
+ }
+ }
+
+ @Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws
Exception
{
runIndexTask();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f9f3a66..c4faa5b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -109,6 +109,7 @@ import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.IOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -848,7 +849,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -865,7 +867,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -919,7 +922,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -937,7 +941,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -991,7 +996,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1009,7 +1015,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1063,7 +1070,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1081,7 +1089,8 @@ public class CompactionTaskTest
SEGMENT_INTERVALS,
tuningConfig,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1125,7 +1134,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
ingestionSpecs.sort(
@@ -1143,7 +1153,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1167,7 +1178,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1185,7 +1197,8 @@ public class CompactionTaskTest
Arrays.asList(customMetricsSpec),
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1202,7 +1215,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1219,7 +1233,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1243,7 +1258,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1266,7 +1282,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1300,7 +1317,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(new
PeriodGranularity(Period.months(3), null, null), null),
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1319,7 +1337,8 @@ public class CompactionTaskTest
AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1336,7 +1355,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(null, new
PeriodGranularity(Period.months(3), null, null)),
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1353,7 +1373,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- new PeriodGranularity(Period.months(3), null, null)
+ new PeriodGranularity(Period.months(3), null, null),
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1373,7 +1394,8 @@ public class CompactionTaskTest
),
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
new DimensionsSpec(getDimensionSchema(new
DoubleDimensionSchema("string_to_double")))
@@ -1392,7 +1414,8 @@ public class CompactionTaskTest
AGGREGATORS,
Collections.singletonList(COMPACTION_INTERVAL),
new PeriodGranularity(Period.months(3), null, null),
- new PeriodGranularity(Period.months(3), null, null)
+ new PeriodGranularity(Period.months(3), null, null),
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1409,7 +1432,8 @@ public class CompactionTaskTest
null,
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1426,7 +1450,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1444,7 +1469,8 @@ public class CompactionTaskTest
new ClientCompactionTaskGranularitySpec(null, null),
COORDINATOR_CLIENT,
segmentLoaderFactory,
- RETRY_POLICY_FACTORY
+ RETRY_POLICY_FACTORY,
+ IOConfig.DEFAULT_DROP_EXISTING
);
final List<DimensionsSpec> expectedDimensionsSpec =
getExpectedDimensionsSpecForAutoGeneration();
@@ -1461,7 +1487,8 @@ public class CompactionTaskTest
AGGREGATORS,
SEGMENT_INTERVALS,
Granularities.MONTH,
- Granularities.NONE
+ Granularities.NONE,
+ IOConfig.DEFAULT_DROP_EXISTING
);
}
@@ -1569,7 +1596,8 @@ public class CompactionTaskTest
List<AggregatorFactory> expectedMetricsSpec,
List<Interval> expectedSegmentIntervals,
Granularity expectedSegmentGranularity,
- Granularity expectedQueryGranularity
+ Granularity expectedQueryGranularity,
+ boolean expectedDropExisting
)
{
assertIngestionSchema(
@@ -1615,7 +1643,8 @@ public class CompactionTaskTest
null
),
expectedSegmentGranularity,
- expectedQueryGranularity
+ expectedQueryGranularity,
+ expectedDropExisting
);
}
@@ -1626,7 +1655,8 @@ public class CompactionTaskTest
List<Interval> expectedSegmentIntervals,
CompactionTask.CompactionTuningConfig expectedTuningConfig,
Granularity expectedSegmentGranularity,
- Granularity expectedQueryGranularity
+ Granularity expectedQueryGranularity,
+ boolean expectedDropExisting
)
{
Preconditions.checkArgument(
@@ -1673,6 +1703,7 @@ public class CompactionTaskTest
// assert ioConfig
final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
Assert.assertFalse(ioConfig.isAppendToExisting());
+ Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
final InputSource inputSource = ioConfig.getInputSource();
Assert.assertTrue(inputSource instanceof DruidInputSource);
final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 990d573..05593c4 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -35,6 +35,7 @@ import
org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
@@ -168,7 +169,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new
HashedPartitionsSpec(null, 3, null);
- submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
+ submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null,
false);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -183,7 +184,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
"city",
false
);
- submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
+ submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null,
false);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -287,7 +288,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
@Test
- public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
+ public void
testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws
Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -298,7 +299,8 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR;
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null));
+ // Set dropExisting to true
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), true);
LOG.info("Auto compaction test with YEAR segment granularity");
@@ -314,10 +316,12 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.DAY;
- submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null));
+ // Set dropExisting to true
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), true);
LOG.info("Auto compaction test with DAY segment granularity");
+ // Since dropExisting is set to true...
// The earlier segment with YEAR granularity will be dropped
post-compaction
// Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to
2013-09-02.
expectedIntervalAfterCompaction = new ArrayList<>();
@@ -334,6 +338,58 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
@Test
+ public void
testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse() throws
Exception
+ {
+ loadData(INDEX_TASK);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+
+ Granularity newGranularity = Granularities.YEAR;
+ // Set dropExisting to false
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+ LOG.info("Auto compaction test with YEAR segment granularity");
+
+ List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(1);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(1, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ newGranularity = Granularities.DAY;
+ // Set dropExisting to false
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+ LOG.info("Auto compaction test with DAY segment granularity");
+
+ // Since dropExisting is set to false...
+ // The earlier segment with YEAR granularity is still 'used' as it’s not
fully overshaowed.
+ // This is because we only have newer version on 2013-08-31 to
2013-09-01 and 2013-09-01 to 2013-09-02.
+ // The version for the YEAR segment is still the latest for 2013-01-01
to 2013-08-31 and 2013-09-02 to 2014-01-01.
+ // Hence, all three segments are available and the expected intervals
are combined from the DAY and YEAR segment granularities
+ // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and
2013-01-01 to 2014-01-01)
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(3);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(3, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+ }
+ }
+
+ @Test
public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion()
throws Exception
{
loadData(INDEX_TASK);
@@ -437,7 +493,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
@Test
- public void
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
throws Exception
+ public void
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue()
throws Exception
{
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -448,7 +504,8 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
Granularity newGranularity = Granularities.YEAR;
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null));
+ // Set dropExisting to true
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
// We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR)
@@ -473,7 +530,9 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
checkCompactionIntervals(expectedIntervalAfterCompaction);
newGranularity = Granularities.MONTH;
- submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null));
+ // Set dropExisting to true
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+ // Since dropExisting is set to true...
// This will submit a single compaction task for interval of
2013-01-01/2014-01-01 with MONTH granularity
expectedIntervalAfterCompaction = new ArrayList<>();
// The previous segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR) will be dropped
@@ -491,6 +550,71 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
+ @Test
+ public void
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingFalse()
throws Exception
+ {
+ loadData(INDEX_TASK);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+
+ Granularity newGranularity = Granularities.YEAR;
+ // Set dropExisting to false
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+ List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+ // We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR)
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(1);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ loadData(INDEX_TASK);
+ verifySegmentsCount(5);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ // 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY
segments across 2 days
+ // We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR) from the compaction earlier
+ // two segments with interval of 2013-08-31/2013-09-01 (newly ingested
with DAY)
+ // and two segments with interval of 2013-09-01/2013-09-02 (newly
ingested with DAY)
+ expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ newGranularity = Granularities.MONTH;
+ // Set dropExisting to false
+ submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+ // Since dropExisting is set to true...
+ // This will submit a single compaction task for interval of
2013-01-01/2014-01-01 with MONTH granularity
+ expectedIntervalAfterCompaction = new ArrayList<>();
+ // Since dropExisting is set to false...
+ // We wil have one segment with interval of 2013-01-01/2014-01-01
(compacted with YEAR) from before the compaction
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : Granularities.YEAR.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ // one segments with interval of 2013-09-01/2013-10-01 (compacted with
MONTH)
+ // and one segments with interval of 2013-10-01/2013-11-01 (compacted
with MONTH)
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : Granularities.MONTH.getIterable(new
Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+
+ forceTriggerAutoCompaction(3);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+ }
+ }
+
private void loadData(String indexTask) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
@@ -537,14 +661,20 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
private void submitCompactionConfig(Integer maxRowsPerSegment, Period
skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec)
throws Exception
{
- submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null),
skipOffsetFromLatest, 1, granularitySpec);
+ submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest,
granularitySpec, false);
+ }
+
+ private void submitCompactionConfig(Integer maxRowsPerSegment, Period
skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec,
boolean dropExisting) throws Exception
+ {
+ submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null),
skipOffsetFromLatest, 1, granularitySpec, dropExisting);
}
private void submitCompactionConfig(
PartitionsSpec partitionsSpec,
Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks,
- UserCompactionTaskGranularityConfig granularitySpec
+ UserCompactionTaskGranularityConfig granularitySpec,
+ boolean dropExisting
) throws Exception
{
DataSourceCompactionConfig compactionConfig = new
DataSourceCompactionConfig(
@@ -573,6 +703,7 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
1
),
granularitySpec,
+ !dropExisting ? null : new UserCompactionTaskIOConfig(true),
null
);
compactionResource.submitCompactionConfig(compactionConfig);
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index eec0954..0419bc2 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,7 +21,9 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.indexing.IOConfig;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -34,11 +36,16 @@ public class ClientCompactionIOConfig
private static final String TYPE = "compact";
private final ClientCompactionIntervalSpec inputSpec;
+ private final boolean dropExisting;
@JsonCreator
- public ClientCompactionIOConfig(@JsonProperty("inputSpec")
ClientCompactionIntervalSpec inputSpec)
+ public ClientCompactionIOConfig(
+ @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
+ @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+ )
{
this.inputSpec = inputSpec;
+ this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING
: dropExisting;
}
@JsonProperty
@@ -53,6 +60,12 @@ public class ClientCompactionIOConfig
return inputSpec;
}
+ @JsonProperty
+ public boolean isDropExisting()
+ {
+ return dropExisting;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -63,13 +76,14 @@ public class ClientCompactionIOConfig
return false;
}
ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
- return Objects.equals(inputSpec, that.inputSpec);
+ return dropExisting == that.dropExisting &&
+ Objects.equals(inputSpec, that.inputSpec);
}
@Override
public int hashCode()
{
- return Objects.hash(inputSpec);
+ return Objects.hash(inputSpec, dropExisting);
}
@Override
@@ -77,6 +91,7 @@ public class ClientCompactionIOConfig
{
return "ClientCompactionIOConfig{" +
"inputSpec=" + inputSpec +
+ ", dropExisting=" + dropExisting +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index bc9f9c5..d8f8f35 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -80,6 +80,7 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
)
{
@@ -98,7 +99,7 @@ public class HttpIndexingServiceClient implements
IndexingServiceClient
final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
taskId,
dataSource,
- new
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
+ new
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments),
dropExisting),
tuningConfig,
granularitySpec,
context
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index f7c9712..14dfcfe 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -41,6 +41,7 @@ public interface IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
);
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
index b178480..58f84c2 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
@@ -30,4 +30,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
})
public interface IOConfig
{
+ boolean DEFAULT_DROP_EXISTING = false;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index d64eb08..e7a4240 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
private final Period skipOffsetFromLatest;
private final UserCompactionTaskQueryTuningConfig tuningConfig;
private final UserCompactionTaskGranularityConfig granularitySpec;
+ private final UserCompactionTaskIOConfig ioConfig;
private final Map<String, Object> taskContext;
@JsonCreator
@@ -58,6 +59,7 @@ public class DataSourceCompactionConfig
@JsonProperty("skipOffsetFromLatest") @Nullable Period
skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable
UserCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") @Nullable
UserCompactionTaskGranularityConfig granularitySpec,
+ @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
{
@@ -71,6 +73,7 @@ public class DataSourceCompactionConfig
this.maxRowsPerSegment = maxRowsPerSegment;
this.skipOffsetFromLatest = skipOffsetFromLatest == null ?
DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
this.tuningConfig = tuningConfig;
+ this.ioConfig = ioConfig;
if (granularitySpec != null) {
Preconditions.checkArgument(
granularitySpec.getQueryGranularity() == null,
@@ -121,6 +124,13 @@ public class DataSourceCompactionConfig
@JsonProperty
@Nullable
+ public UserCompactionTaskIOConfig getIoConfig()
+ {
+ return ioConfig;
+ }
+
+ @JsonProperty
+ @Nullable
public UserCompactionTaskGranularityConfig getGranularitySpec()
{
return granularitySpec;
@@ -150,6 +160,7 @@ public class DataSourceCompactionConfig
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(granularitySpec, that.granularitySpec) &&
+ Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(taskContext, that.taskContext);
}
@@ -164,6 +175,7 @@ public class DataSourceCompactionConfig
skipOffsetFromLatest,
tuningConfig,
granularitySpec,
+ ioConfig,
taskContext
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
similarity index 54%
copy from
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
copy to
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
index 60972de..df5af28 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
@@ -17,36 +17,39 @@
* under the License.
*/
-package org.apache.druid.indexing.common.task;
+package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.segment.indexing.IOConfig;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
- * {@link IOConfig} for {@link CompactionTask}. Should be synchronized with
{@link
- * org.apache.druid.client.indexing.ClientCompactionIOConfig}.
- *
- * @see CompactionInputSpec
+ * Spec containing IO configs for Auto Compaction.
+ * This class mimics JSON field names for fields supported in auto compaction
with
+ * the corresponding fields in {@link IOConfig}.
+ * This is done for end-user ease of use. Basically, end-user will use the
same syntax / JSON structure to set
+ * IO configs for Auto Compaction as they would for any other ingestion task.
+ * Note that this class simply holds IO configs and pass it to compaction task
spec.
*/
-@JsonTypeName("compact")
-public class CompactionIOConfig implements IOConfig
+public class UserCompactionTaskIOConfig
{
- private final CompactionInputSpec inputSpec;
+ private final boolean dropExisting;
@JsonCreator
- public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec
inputSpec)
+ public UserCompactionTaskIOConfig(
+ @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+ )
{
- this.inputSpec = inputSpec;
+ this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING
: dropExisting;
}
@JsonProperty
- public CompactionInputSpec getInputSpec()
+ public boolean isDropExisting()
{
- return inputSpec;
+ return dropExisting;
}
@Override
@@ -58,21 +61,21 @@ public class CompactionIOConfig implements IOConfig
if (o == null || getClass() != o.getClass()) {
return false;
}
- CompactionIOConfig that = (CompactionIOConfig) o;
- return Objects.equals(inputSpec, that.inputSpec);
+ UserCompactionTaskIOConfig that = (UserCompactionTaskIOConfig) o;
+ return dropExisting == that.dropExisting;
}
@Override
public int hashCode()
{
- return Objects.hash(inputSpec);
+ return Objects.hash(dropExisting);
}
@Override
public String toString()
{
- return "CompactionIOConfig{" +
- "inputSpec=" + inputSpec +
+ return "UserCompactionTaskIOConfig{" +
+ "dropExisting=" + dropExisting +
'}';
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b047758..5d31751 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -320,6 +320,11 @@ public class CompactSegments implements CoordinatorDuty
queryGranularitySpec = null;
}
+ Boolean dropExisting = null;
+ if (config.getIoConfig() != null) {
+ dropExisting = config.getIoConfig().isDropExisting();
+ }
+
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
"coordinator-issued",
@@ -327,6 +332,7 @@ public class CompactSegments implements CoordinatorDuty
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(),
config.getMaxRowsPerSegment()),
queryGranularitySpec,
+ dropExisting,
newAutoCompactionContext(config.getTaskContext())
);
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index c5a85fc..600b4be 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements
IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
)
{
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index efe04e3..c798d29 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -58,6 +58,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -84,6 +85,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -127,6 +129,7 @@ public class DataSourceCompactionConfigTest
null
),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -170,6 +173,7 @@ public class DataSourceCompactionConfigTest
null
),
null,
+ null,
ImmutableMap.of("key", "val")
);
@@ -235,6 +239,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -261,6 +266,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR,
Granularities.MONTH),
+ null,
ImmutableMap.of("key", "val")
);
}
@@ -276,6 +282,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -302,6 +309,62 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(null, null),
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ final String json = OBJECT_MAPPER.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(),
fromJson.getInputSegmentSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(),
fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(),
fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ }
+
+ @Test
+ public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskIOConfig(true),
+ ImmutableMap.of("key", "val")
+ );
+ final String json = OBJECT_MAPPER.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json,
DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(),
fromJson.getInputSegmentSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(),
fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(),
fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
+ }
+
+ @Test
+ public void testSerdeIOConfigWithNullDropExisting() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+ new UserCompactionTaskIOConfig(null),
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -315,5 +378,6 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
Assert.assertEquals(config.getGranularitySpec(),
fromJson.getGranularitySpec());
+ Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 4bec733..90b3b5d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -592,6 +593,7 @@ public class CompactSegmentsTest
null
),
null,
+ null,
null
)
);
@@ -605,6 +607,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// Only the same amount of segments as the original
PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@@ -613,6 +616,110 @@ public class CompactSegmentsTest
}
@Test
+ public void testCompactWithNotNullIOConfig()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER,
mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ new UserCompactionTaskIOConfig(true),
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<Boolean> dropExistingCapture =
ArgumentCaptor.forClass(Boolean.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ dropExistingCapture.capture(),
+ ArgumentMatchers.any()
+ );
+ Assert.assertEquals(true, dropExistingCapture.getValue());
+ }
+
+ @Test
+ public void testCompactWithNullIOConfig()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER,
mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new
ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<Boolean> dropExistingCapture =
ArgumentCaptor.forClass(Boolean.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ dropExistingCapture.capture(),
+ ArgumentMatchers.any()
+ );
+ Assert.assertNull(dropExistingCapture.getValue());
+ }
+
+ @Test
public void testCompactWithGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient =
Mockito.mock(HttpIndexingServiceClient.class);
@@ -646,6 +753,7 @@ public class CompactSegmentsTest
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ null,
null
)
);
@@ -659,6 +767,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// All segments is compact at the same time since we changed the segment
granularity to YEAR and all segment
@@ -698,7 +807,8 @@ public class CompactSegmentsTest
new ClientCompactionIntervalSpec(
Intervals.of("2000/2099"),
"testSha256OfSortedSegmentIds"
- )
+ ),
+ null
),
null,
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
@@ -737,6 +847,7 @@ public class CompactSegmentsTest
null
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+ null,
null
)
);
@@ -755,6 +866,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// All segments is compact at the same time since we changed the segment
granularity to YEAR and all segment
@@ -1059,6 +1171,7 @@ public class CompactSegmentsTest
null
),
null,
+ null,
null
)
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index b32a7b7..0cf0ded 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -92,6 +92,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -131,6 +132,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -170,6 +172,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -209,6 +212,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -248,6 +252,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -287,6 +292,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -326,6 +332,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -365,6 +372,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
@@ -404,6 +412,7 @@ public class NewestSegmentFirstIteratorTest
null
),
null,
+ null,
null
);
Assert.assertEquals(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 9b7f600..5ccd52c 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -1068,6 +1068,7 @@ public class NewestSegmentFirstPolicyTest
skipOffsetFromLatest,
null,
granularitySpec,
+ null,
null
);
}
diff --git a/website/.spelling b/website/.spelling
index ed803c7..5c7237b 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1656,6 +1656,7 @@ HadoopIndexTasks
HttpEmitter
HttpPostEmitter
InetAddress.getLocalHost
+IOConfig
JRE8u60
KeyManager
L1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]