This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4576152  Make dropExisting flag for Compaction configurable and add 
warning documentations (#11070)
4576152 is described below

commit 4576152e4a0213d17048a330e7089aa9d89f3972
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Apr 9 00:12:28 2021 -0700

    Make dropExisting flag for Compaction configurable and add warning 
documentations (#11070)
    
    * Make dropExisting flag for Compaction configurable
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix test
    
    * add tests
    
    * fix spelling
    
    * fix docs
    
    * add IT
    
    * fix test
    
    * fix doc
    
    * fix doc
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   1 +
 docs/configuration/index.md                        |  10 ++
 docs/ingestion/compaction.md                       |  12 +-
 docs/ingestion/native-batch.md                     |   7 +-
 .../indexing/common/task/CompactionIOConfig.java   |  20 ++-
 .../druid/indexing/common/task/CompactionTask.java |  29 ++--
 .../druid/indexing/common/task/IndexTask.java      |   1 -
 .../task/ClientCompactionTaskQuerySerdeTest.java   |  12 +-
 .../common/task/CompactionTaskParallelRunTest.java |  46 ++++++-
 .../common/task/CompactionTaskRunTest.java         |  83 ++++++++++-
 .../indexing/common/task/CompactionTaskTest.java   |  89 ++++++++----
 .../coordinator/duty/ITAutoCompactionTest.java     | 151 +++++++++++++++++++--
 .../client/indexing/ClientCompactionIOConfig.java  |  21 ++-
 .../client/indexing/HttpIndexingServiceClient.java |   3 +-
 .../client/indexing/IndexingServiceClient.java     |   1 +
 .../apache/druid/segment/indexing/IOConfig.java    |   1 +
 .../coordinator/DataSourceCompactionConfig.java    |  12 ++
 .../coordinator/UserCompactionTaskIOConfig.java    |  39 +++---
 .../server/coordinator/duty/CompactSegments.java   |   6 +
 .../client/indexing/NoopIndexingServiceClient.java |   1 +
 .../DataSourceCompactionConfigTest.java            |  64 +++++++++
 .../coordinator/duty/CompactSegmentsTest.java      | 115 +++++++++++++++-
 .../duty/NewestSegmentFirstIteratorTest.java       |   9 ++
 .../duty/NewestSegmentFirstPolicyTest.java         |   1 +
 website/.spelling                                  |   1 +
 25 files changed, 644 insertions(+), 91 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index e744bf9..b93053e 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -100,6 +100,7 @@ public class NewestSegmentFirstPolicyBenchmark
               null,
               null,
               null,
+              null,
               null
           )
       );
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 30cc5eb..b56f752 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -867,6 +867,7 @@ A description of the compaction config is:
 |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task 
TuningConfig](#automatic-compaction-tuningconfig).|no|
 |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction 
tasks.|no|
 |`granularitySpec`|Custom `granularitySpec` to describe the 
`segmentGranularity` for the compacted segments. See [Automatic compaction 
granularitySpec](#automatic-compaction-granularityspec)|No|
+|`ioConfig`|IO config for compaction tasks. See below [Compaction Task 
IOConfig](#automatic-compaction-ioconfig).|no|
 
 An example of compaction config is:
 
@@ -918,6 +919,15 @@ You can optionally use the `granularitySpec` object to 
configure the segment gra
 
 > Unlike manual compaction, automatic compaction does not support query 
 > granularity.
 
+###### Automatic compaction IOConfig
+
+Auto compaction supports a subset of the [IOConfig for Parallel 
task](../ingestion/native-batch.md).
+The below is a list of the supported configurations for auto compaction.
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|--------|
+|`dropExisting`|If `true`, then the generated compaction task drops (mark 
unused) all existing segments fully contained by the umbrella interval of the 
compacted segments when the task publishes new segments. If compaction fails, 
Druid does not drop or mark unused any segments. WARNING: this functionality is 
still in beta and can result in temporary data unavailability for data within 
the compacted `interval`. Note that changing this config does not cause 
intervals to be compacted again. [...]
+
 ### Overlord
 
 For general Overlord Process information, see [here](../design/overlord.md).
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index f6846dc..9f34381 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -54,7 +54,7 @@ See [Setting up a manual compaction 
task](#setting-up-manual-compaction) for mor
 ## Data handling with compaction
 During compaction, Druid overwrites the original set of segments with the 
compacted set. Druid also locks the segments for the time interval being 
compacted to ensure data consistency. By default, compaction tasks do not 
modify the underlying data. You can configure the compaction task to change the 
query granularity or add or remove dimensions in the compaction task. This 
means that the only changes to query results should be the result of 
intentional, not automatic, changes.
 
-For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". 
This means that Druid can drop (mark unused) all the existing segments fully 
within interval for the compaction task. For an example of why this is 
important, see the suggestion for reindexing with finer granularity under 
[Implementation considerations](native-batch.md#implementation-considerations). 
+For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for 
Druid to drop (mark unused) all existing segments fully contained by the 
interval of the compaction task. For an example of why this is important, see 
the suggestion for reindexing with finer granularity under [Implementation 
considerations](native-batch.md#implementation-considerations). WARNING: this 
functionality is still in beta and can result in temporary data unavailability 
for data within the compaction ta [...]
 
 If an ingestion task needs to write data to a segment for a time interval 
locked for compaction, by default the ingestion task supersedes the compaction 
task and the compaction task fails without finishing. For manual compaction 
tasks you can adjust the input spec interval to avoid conflicts between 
ingestion and compaction. For automatic compaction, you can set the 
`skipOffsetFromLatest` key to adjustment the auto compaction starting point 
from the current time to reduce the chance of c [...]
 
@@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid 
retains the original segm
 
 The compaction `ioConfig` requires specifying `inputSpec` as follows:
 
-|Field|Description|Required|
-|-----|-----------|--------|
-|`type`|Task type. Should be `compact`|Yes|
-|`inputSpec`|Input specification|Yes|
+|Field|Description|Default|Required?|
+|-----|-----------|-------|--------|
+|`type`|Task type. Should be `compact`|none|Yes|
+|`inputSpec`|Input specification|none|Yes|
+|`dropExisting`|If `true`, then the compaction task drops (mark unused) all 
existing segments fully contained by either the `interval` in the `interval` 
type `inputSpec` or the umbrella interval of the `segments` in the `segment` 
type `inputSpec` when the task publishes new compacted segments. If compaction 
fails, Druid does not drop or mark unused any segments. WARNING: this 
functionality is still in beta and can result in temporary data unavailability 
for data within the compaction tas [...]
+
 
 There are two supported `inputSpec`s for now.
 
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 5e7b337..48539bb 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion 
are `bz2`, `gz`, `x
   `granularitySpec`'s intervals, the portion of those segments outside the new 
segments' intervals will still be visible.
 - You can set `dropExisting` flag in the `ioConfig` to true if you want the 
ingestion task to drop all existing segments that 
   start and end within your `granularitySpec`'s intervals. This applies 
whether or not the new data covers all existing segments. 
-  `dropExisting` only applies when `appendToExisting` is false and the  
`granularitySpec` contains an `interval`. 
+  `dropExisting` only applies when `appendToExisting` is false and the  
`granularitySpec` contains an `interval`. WARNING: this 
+  functionality is still in beta and can result in temporary data 
unavailability for data within the specified `interval`
   
   The following examples demonstrate when to set the `dropExisting` property 
to true in the `ioConfig`:
   
@@ -220,7 +221,7 @@ that range if there's some stray data with unexpected 
timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to 
parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, 
effectively appending to the segment set instead of replacing it. This means 
that you can append new segments to any datasource regardless of its original 
partitioning scheme. You must use the `dynamic` partitioning type for the 
appended segments. If you specify a different partitioning type, the task fails 
with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the 
`granularitySpec` contains an`interval`, then the ingestion task drops (mark 
unused) all existing segments fully contained by the specified `interval` when 
the task publishes new segments. If ingestion fails, Druid does not drop or 
mark unused any segments. In the case of misconfiguration where either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec`, Druid does not drop any segments even if  [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the 
`granularitySpec` contains an`interval`, then the ingestion task drops (mark 
unused) all existing segments fully contained by the specified `interval` when 
the task publishes new segments. If ingestion fails, Druid does not drop or 
mark unused any segments. In the case of misconfiguration where either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec`, Druid does not drop any segments even if  [...]
 
 ### `tuningConfig`
 
@@ -749,7 +750,7 @@ that range if there's some stray data with unexpected 
timestamps.
 |type|The task type, this should always be "index".|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to 
parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, 
effectively appending to the segment set instead of replacing it. This means 
that you can append new segments to any datasource regardless of its original 
partitioning scheme. You must use the `dynamic` partitioning type for the 
appended segments. If you specify a different partitioning type, the task fails 
with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the 
`granularitySpec` contains an`interval`, then the ingestion task drops (mark 
unused) all existing segments fully contained by the specified `interval` when 
the task publishes new segments. If ingestion fails, Druid does not drop or 
mark unused any segments. In the case of misconfiguration where either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec`, Druid does not drop any segments even if  [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the 
`granularitySpec` contains an`interval`, then the ingestion task drops (mark 
unused) all existing segments fully contained by the specified `interval` when 
the task publishes new segments. If ingestion fails, Druid does not drop or 
mark unused any segments. In the case of misconfiguration where either 
`appendToExisting` is `true` or `interval` is not specified in 
`granularitySpec`, Druid does not drop any segments even if  [...]
 
 ### `tuningConfig`
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
index 60972de..faedd3d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
@@ -36,11 +37,16 @@ import java.util.Objects;
 public class CompactionIOConfig implements IOConfig
 {
   private final CompactionInputSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec 
inputSpec)
+  public CompactionIOConfig(
+      @JsonProperty("inputSpec") CompactionInputSpec inputSpec,
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
     this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : 
dropExisting;
   }
 
   @JsonProperty
@@ -49,6 +55,12 @@ public class CompactionIOConfig implements IOConfig
     return inputSpec;
   }
 
+  @JsonProperty
+  public boolean isDropExisting()
+  {
+    return dropExisting;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -59,13 +71,14 @@ public class CompactionIOConfig implements IOConfig
       return false;
     }
     CompactionIOConfig that = (CompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    return dropExisting == that.dropExisting &&
+           Objects.equals(inputSpec, that.inputSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(inputSpec, dropExisting);
   }
 
   @Override
@@ -73,6 +86,7 @@ public class CompactionIOConfig implements IOConfig
   {
     return "CompactionIOConfig{" +
            "inputSpec=" + inputSpec +
+           ", dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 18f5f71..1961de1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -203,11 +203,11 @@ public class CompactionTask extends AbstractBatchIndexTask
     if (ioConfig != null) {
       this.ioConfig = ioConfig;
     } else if (interval != null) {
-      this.ioConfig = new CompactionIOConfig(new 
CompactionIntervalSpec(interval, null));
+      this.ioConfig = new CompactionIOConfig(new 
CompactionIntervalSpec(interval, null), null);
     } else {
       // We already checked segments is not null or empty above.
       //noinspection ConstantConditions
-      this.ioConfig = new 
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
+      this.ioConfig = new 
CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
     }
 
     this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
@@ -424,7 +424,8 @@ public class CompactionTask extends AbstractBatchIndexTask
         granularitySpec,
         toolbox.getCoordinatorClient(),
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        ioConfig.isDropExisting()
     );
     final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
         .range(0, ingestionSpecs.size())
@@ -532,7 +533,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       final CoordinatorClient coordinatorClient,
       final SegmentLoaderFactory segmentLoaderFactory,
-      final RetryPolicyFactory retryPolicyFactory
+      final RetryPolicyFactory retryPolicyFactory,
+      final boolean dropExisting
   ) throws IOException, SegmentLoadingException
   {
     NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, 
DataSegment>>> pair = prepareSegments(
@@ -614,7 +616,8 @@ public class CompactionTask extends AbstractBatchIndexTask
                     interval,
                     coordinatorClient,
                     segmentLoaderFactory,
-                    retryPolicyFactory
+                    retryPolicyFactory,
+                    dropExisting
                 ),
                 compactionTuningConfig
             )
@@ -641,7 +644,8 @@ public class CompactionTask extends AbstractBatchIndexTask
                   segmentProvider.interval,
                   coordinatorClient,
                   segmentLoaderFactory,
-                  retryPolicyFactory
+                  retryPolicyFactory,
+                  dropExisting
               ),
               compactionTuningConfig
           )
@@ -655,7 +659,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       Interval interval,
       CoordinatorClient coordinatorClient,
       SegmentLoaderFactory segmentLoaderFactory,
-      RetryPolicyFactory retryPolicyFactory
+      RetryPolicyFactory retryPolicyFactory,
+      boolean dropExisting
   )
   {
     return new ParallelIndexIOConfig(
@@ -675,7 +680,7 @@ public class CompactionTask extends AbstractBatchIndexTask
         ),
         null,
         false,
-        true
+        dropExisting
     );
   }
 
@@ -1062,7 +1067,13 @@ public class CompactionTask extends 
AbstractBatchIndexTask
 
     public Builder inputSpec(CompactionInputSpec inputSpec)
     {
-      this.ioConfig = new CompactionIOConfig(inputSpec);
+      this.ioConfig = new CompactionIOConfig(inputSpec, null);
+      return this;
+    }
+
+    public Builder inputSpec(CompactionInputSpec inputSpec, Boolean 
dropExisting)
+    {
+      this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
       return this;
     }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 75ae4d2..3b759bc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1046,7 +1046,6 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
   public static class IndexIOConfig implements BatchIOConfig
   {
     private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
-    private static final boolean DEFAULT_DROP_EXISTING = false;
 
     private final FirehoseFactory firehoseFactory;
     private final InputSource inputSource;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 6e93f3a..04b1b00 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -83,7 +83,8 @@ public class ClientCompactionTaskQuerySerdeTest
             new ClientCompactionIntervalSpec(
                 Intervals.of("2019/2020"),
                 "testSha256OfSortedSegmentIds"
-            )
+            ),
+            true
         ),
         new ClientCompactionTaskQueryTuningConfig(
             null,
@@ -201,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest
         query.getGranularitySpec().getSegmentGranularity(),
         task.getGranularitySpec().getSegmentGranularity()
     );
+    Assert.assertEquals(
+        query.getIoConfig().isDropExisting(),
+        task.getIoConfig().isDropExisting()
+    );
     Assert.assertEquals(query.getContext(), task.getContext());
   }
 
@@ -214,7 +219,7 @@ public class ClientCompactionTaskQuerySerdeTest
         new RetryPolicyFactory(new RetryPolicyConfig())
     );
     final CompactionTask task = builder
-        .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), 
"testSha256OfSortedSegmentIds"))
+        .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), 
"testSha256OfSortedSegmentIds"), true)
         .tuningConfig(
             new ParallelIndexTuningConfig(
                 null,
@@ -269,7 +274,8 @@ public class ClientCompactionTaskQuerySerdeTest
             new ClientCompactionIntervalSpec(
                 Intervals.of("2019/2020"),
                 "testSha256OfSortedSegmentIds"
-            )
+            ),
+            true
         ),
         new ClientCompactionTaskQueryTuningConfig(
             100,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 95826fa..95f7e3a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -443,7 +443,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
   }
 
   @Test
-  public void testCompactionDropSegmentsOfInputInterval()
+  public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet()
   {
     runIndexTask(null, true);
 
@@ -459,7 +459,8 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
-        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        // Set the dropExisting flag to true in the IOConfig of the compaction 
task
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
         
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
         .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
         .build();
@@ -475,6 +476,47 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
     }
   }
 
+  @Test
+  public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet()
+  {
+    runIndexTask(null, true);
+
+    Collection<DataSegment> usedSegments = 
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, 
ImmutableList.of(INTERVAL_TO_INDEX));
+    Assert.assertEquals(3, usedSegments.size());
+    for (DataSegment segment : usedSegments) {
+      Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval()));
+    }
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentLoaderFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+        .build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+
+    usedSegments = 
getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, 
ImmutableList.of(INTERVAL_TO_INDEX));
+    // All the HOUR segments did not get dropped since MINUTES segments did 
not fully covering the 3 HOURS interval.
+    Assert.assertEquals(6, usedSegments.size());
+    int hourSegmentCount = 0;
+    int minuteSegmentCount = 0;
+    for (DataSegment segment : usedSegments) {
+      if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+        minuteSegmentCount++;
+      }
+      if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+        hourSegmentCount++;
+      }
+    }
+    Assert.assertEquals(3, hourSegmentCount);
+    Assert.assertEquals(3, minuteSegmentCount);
+  }
+
   private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean 
appendToExisting)
   {
     ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 384d26f..03acabd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -803,7 +803,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
   }
 
   @Test
-  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() 
throws Exception
+  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue()
 throws Exception
   {
     // This test fails with segment lock because of the bug reported in 
https://github.com/apache/druid/issues/10911.
     if (lockGranularity == LockGranularity.SEGMENT) {
@@ -842,8 +842,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
     );
 
     final CompactionTask partialCompactionTask = builder
-        .interval(compactionPartialInterval)
         .segmentGranularity(Granularities.MINUTE)
+        // Set dropExisting to true
+        .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, 
null), true)
         .build();
 
     final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = 
runTask(partialCompactionTask);
@@ -865,8 +866,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
     Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
 
     final CompactionTask fullCompactionTask = builder
-        .interval(Intervals.of("2014-01-01/2014-01-02"))
         .segmentGranularity(null)
+        // Set dropExisting to true
+        .inputSpec(new 
CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), true)
         .build();
 
     final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = 
runTask(fullCompactionTask);
@@ -904,6 +906,81 @@ public class CompactionTaskRunTest extends 
IngestionTestBase
   }
 
   @Test
+  public void 
testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse()
 throws Exception
+  {
+    // This test fails with segment lock because of the bug reported in 
https://github.com/apache/druid/issues/10911.
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    runIndexTask();
+
+    final Set<DataSegment> expectedSegments = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    final Interval partialInterval = 
Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+    final CompactionTask partialCompactionTask = builder
+        .segmentGranularity(Granularities.MINUTE)
+        // Set dropExisting to false
+        .inputSpec(new CompactionIntervalSpec(partialInterval, null), false)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = 
runTask(partialCompactionTask);
+    Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+    // All segments in the previous expectedSegments should still appear as 
they have larger segment granularity.
+    expectedSegments.addAll(partialCompactionResult.rhs);
+
+    final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+    final CompactionTask fullCompactionTask = builder
+        .segmentGranularity(null)
+        // Set dropExisting to false
+        .inputSpec(new 
CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), false)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = 
runTask(fullCompactionTask);
+    Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+    final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    segmentsAfterFullCompaction.sort(
+        (s1, s2) -> 
Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), 
s2.getInterval())
+    );
+
+    Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+    for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+      Assert.assertEquals(
+          Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", 
i, i + 1)),
+          segmentsAfterFullCompaction.get(i).getInterval()
+      );
+    }
+  }
+
+  @Test
   public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws 
Exception
   {
     runIndexTask();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f9f3a66..c4faa5b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -109,6 +109,7 @@ import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.IOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -848,7 +849,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -865,7 +867,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -919,7 +922,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -937,7 +941,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -991,7 +996,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1009,7 +1015,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1063,7 +1070,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1081,7 +1089,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1125,7 +1134,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
 
     ingestionSpecs.sort(
@@ -1143,7 +1153,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1167,7 +1178,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
 
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
@@ -1185,7 +1197,8 @@ public class CompactionTaskTest
         Arrays.asList(customMetricsSpec),
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1202,7 +1215,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1219,7 +1233,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1243,7 +1258,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1266,7 +1282,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1300,7 +1317,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(new 
PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1319,7 +1337,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1336,7 +1355,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(null, new 
PeriodGranularity(Period.months(3), null, null)),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1353,7 +1373,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        new PeriodGranularity(Period.months(3), null, null)
+        new PeriodGranularity(Period.months(3), null, null),
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1373,7 +1394,8 @@ public class CompactionTaskTest
         ),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
@@ -1392,7 +1414,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
-        new PeriodGranularity(Period.months(3), null, null)
+        new PeriodGranularity(Period.months(3), null, null),
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1409,7 +1432,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1426,7 +1450,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1444,7 +1469,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(null, null),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1461,7 +1487,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1569,7 +1596,8 @@ public class CompactionTaskTest
       List<AggregatorFactory> expectedMetricsSpec,
       List<Interval> expectedSegmentIntervals,
       Granularity expectedSegmentGranularity,
-      Granularity expectedQueryGranularity
+      Granularity expectedQueryGranularity,
+      boolean expectedDropExisting
   )
   {
     assertIngestionSchema(
@@ -1615,7 +1643,8 @@ public class CompactionTaskTest
             null
         ),
         expectedSegmentGranularity,
-        expectedQueryGranularity
+        expectedQueryGranularity,
+        expectedDropExisting
     );
   }
 
@@ -1626,7 +1655,8 @@ public class CompactionTaskTest
       List<Interval> expectedSegmentIntervals,
       CompactionTask.CompactionTuningConfig expectedTuningConfig,
       Granularity expectedSegmentGranularity,
-      Granularity expectedQueryGranularity
+      Granularity expectedQueryGranularity,
+      boolean expectedDropExisting
   )
   {
     Preconditions.checkArgument(
@@ -1673,6 +1703,7 @@ public class CompactionTaskTest
       // assert ioConfig
       final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
       Assert.assertFalse(ioConfig.isAppendToExisting());
+      Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
       final InputSource inputSource = ioConfig.getInputSource();
       Assert.assertTrue(inputSource instanceof DruidInputSource);
       final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 990d573..05593c4 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CompactionResourceTestClient;
@@ -168,7 +169,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       LOG.info("Auto compaction test with hash partitioning");
 
       final HashedPartitionsSpec hashedPartitionsSpec = new 
HashedPartitionsSpec(null, 3, null);
-      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
+      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, 
false);
       // 2 segments published per day after compaction.
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -183,7 +184,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           "city",
           false
       );
-      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
+      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, 
false);
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -287,7 +288,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
   }
 
   @Test
-  public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
+  public void 
testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws 
Exception
   {
     loadData(INDEX_TASK);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -298,7 +299,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -314,10 +316,12 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
       newGranularity = Granularities.DAY;
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       LOG.info("Auto compaction test with DAY segment granularity");
 
+      // Since dropExisting is set to true...
       // The earlier segment with YEAR granularity will be dropped 
post-compaction
       // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 
2013-09-02.
       expectedIntervalAfterCompaction = new ArrayList<>();
@@ -334,6 +338,58 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
   }
 
   @Test
+  public void 
testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse() throws 
Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      Granularity newGranularity = Granularities.YEAR;
+      // Set dropExisting to false
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      LOG.info("Auto compaction test with YEAR segment granularity");
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      newGranularity = Granularities.DAY;
+      // Set dropExisting to false
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      LOG.info("Auto compaction test with DAY segment granularity");
+
+      // Since dropExisting is set to false...
+      // The earlier segment with YEAR granularity is still 'used' as it’s not 
fully overshaowed.
+      // This is because we only have newer version on 2013-08-31 to 
2013-09-01 and 2013-09-01 to 2013-09-02.
+      // The version for the YEAR segment is still the latest for 2013-01-01 
to 2013-08-31 and 2013-09-02 to 2014-01-01.
+      // Hence, all three segments are available and the expected intervals 
are combined from the DAY and YEAR segment granularities
+      // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 
2013-01-01 to 2014-01-01)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(3);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(3, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+    }
+  }
+
+  @Test
   public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() 
throws Exception
   {
     loadData(INDEX_TASK);
@@ -437,7 +493,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
   }
 
   @Test
-  public void 
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
 throws Exception
+  public void 
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue()
 throws Exception
   {
     loadData(INDEX_TASK);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -448,7 +504,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR)
@@ -473,7 +530,9 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
       newGranularity = Granularities.MONTH;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      // Since dropExisting is set to true...
       // This will submit a single compaction task for interval of 
2013-01-01/2014-01-01 with MONTH granularity
       expectedIntervalAfterCompaction = new ArrayList<>();
       // The previous segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR) will be dropped
@@ -491,6 +550,71 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test
+  public void 
testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingFalse()
 throws Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = 
coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      Granularity newGranularity = Granularities.YEAR;
+      // Set dropExisting to false
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      loadData(INDEX_TASK);
+      verifySegmentsCount(5);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      // 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY 
segments across 2 days
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR) from the compaction earlier
+      // two segments with interval of 2013-08-31/2013-09-01 (newly ingested 
with DAY)
+      // and two segments with interval of 2013-09-01/2013-09-02 (newly 
ingested with DAY)
+      expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      newGranularity = Granularities.MONTH;
+      // Set dropExisting to false
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      // Since dropExisting is set to true...
+      // This will submit a single compaction task for interval of 
2013-01-01/2014-01-01 with MONTH granularity
+      expectedIntervalAfterCompaction = new ArrayList<>();
+      // Since dropExisting is set to false...
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR) from before the compaction
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : Granularities.YEAR.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      // one segments with interval of 2013-09-01/2013-10-01 (compacted with 
MONTH)
+      // and one segments with interval of 2013-10-01/2013-11-01 (compacted 
with MONTH)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : Granularities.MONTH.getIterable(new 
Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+
+      forceTriggerAutoCompaction(3);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+    }
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
@@ -537,14 +661,20 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
   private void submitCompactionConfig(Integer maxRowsPerSegment, Period 
skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) 
throws Exception
   {
-    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), 
skipOffsetFromLatest, 1, granularitySpec);
+    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, 
granularitySpec, false);
+  }
+
+  private void submitCompactionConfig(Integer maxRowsPerSegment, Period 
skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, 
boolean dropExisting) throws Exception
+  {
+    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), 
skipOffsetFromLatest, 1, granularitySpec, dropExisting);
   }
 
   private void submitCompactionConfig(
       PartitionsSpec partitionsSpec,
       Period skipOffsetFromLatest,
       int maxNumConcurrentSubTasks,
-      UserCompactionTaskGranularityConfig granularitySpec
+      UserCompactionTaskGranularityConfig granularitySpec,
+      boolean dropExisting
   ) throws Exception
   {
     DataSourceCompactionConfig compactionConfig = new 
DataSourceCompactionConfig(
@@ -573,6 +703,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
             1
         ),
         granularitySpec,
+        !dropExisting ? null : new UserCompactionTaskIOConfig(true),
         null
     );
     compactionResource.submitCompactionConfig(compactionConfig);
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index eec0954..0419bc2 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,7 +21,9 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
@@ -34,11 +36,16 @@ public class ClientCompactionIOConfig
   private static final String TYPE = "compact";
 
   private final ClientCompactionIntervalSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public ClientCompactionIOConfig(@JsonProperty("inputSpec") 
ClientCompactionIntervalSpec inputSpec)
+  public ClientCompactionIOConfig(
+      @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
     this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING 
: dropExisting;
   }
 
   @JsonProperty
@@ -53,6 +60,12 @@ public class ClientCompactionIOConfig
     return inputSpec;
   }
 
+  @JsonProperty
+  public boolean isDropExisting()
+  {
+    return dropExisting;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -63,13 +76,14 @@ public class ClientCompactionIOConfig
       return false;
     }
     ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    return dropExisting == that.dropExisting &&
+           Objects.equals(inputSpec, that.inputSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(inputSpec, dropExisting);
   }
 
   @Override
@@ -77,6 +91,7 @@ public class ClientCompactionIOConfig
   {
     return "ClientCompactionIOConfig{" +
            "inputSpec=" + inputSpec +
+           ", dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index bc9f9c5..d8f8f35 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -80,6 +80,7 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
   {
@@ -98,7 +99,7 @@ public class HttpIndexingServiceClient implements 
IndexingServiceClient
     final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
         taskId,
         dataSource,
-        new 
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
+        new 
ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), 
dropExisting),
         tuningConfig,
         granularitySpec,
         context
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index f7c9712..14dfcfe 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -41,6 +41,7 @@ public interface IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   );
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java 
b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
index b178480..58f84c2 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
@@ -30,4 +30,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 })
 public interface IOConfig
 {
+  boolean DEFAULT_DROP_EXISTING = false;
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index d64eb08..e7a4240 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
   private final Period skipOffsetFromLatest;
   private final UserCompactionTaskQueryTuningConfig tuningConfig;
   private final UserCompactionTaskGranularityConfig granularitySpec;
+  private final UserCompactionTaskIOConfig ioConfig;
   private final Map<String, Object> taskContext;
 
   @JsonCreator
@@ -58,6 +59,7 @@ public class DataSourceCompactionConfig
       @JsonProperty("skipOffsetFromLatest") @Nullable Period 
skipOffsetFromLatest,
       @JsonProperty("tuningConfig") @Nullable 
UserCompactionTaskQueryTuningConfig tuningConfig,
       @JsonProperty("granularitySpec") @Nullable 
UserCompactionTaskGranularityConfig granularitySpec,
+      @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
   {
@@ -71,6 +73,7 @@ public class DataSourceCompactionConfig
     this.maxRowsPerSegment = maxRowsPerSegment;
     this.skipOffsetFromLatest = skipOffsetFromLatest == null ? 
DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
     this.tuningConfig = tuningConfig;
+    this.ioConfig = ioConfig;
     if (granularitySpec != null) {
       Preconditions.checkArgument(
           granularitySpec.getQueryGranularity() == null,
@@ -121,6 +124,13 @@ public class DataSourceCompactionConfig
 
   @JsonProperty
   @Nullable
+  public UserCompactionTaskIOConfig getIoConfig()
+  {
+    return ioConfig;
+  }
+
+  @JsonProperty
+  @Nullable
   public UserCompactionTaskGranularityConfig getGranularitySpec()
   {
     return granularitySpec;
@@ -150,6 +160,7 @@ public class DataSourceCompactionConfig
            Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
            Objects.equals(granularitySpec, that.granularitySpec) &&
+           Objects.equals(ioConfig, that.ioConfig) &&
            Objects.equals(taskContext, that.taskContext);
   }
 
@@ -164,6 +175,7 @@ public class DataSourceCompactionConfig
         skipOffsetFromLatest,
         tuningConfig,
         granularitySpec,
+        ioConfig,
         taskContext
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
similarity index 54%
copy from 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
copy to 
server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
index 60972de..df5af28 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
@@ -17,36 +17,39 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.task;
+package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
- * {@link IOConfig} for {@link CompactionTask}. Should be synchronized with 
{@link
- * org.apache.druid.client.indexing.ClientCompactionIOConfig}.
- *
- * @see CompactionInputSpec
+ * Spec containing IO configs for Auto Compaction.
+ * This class mimics JSON field names for fields supported in auto compaction 
with
+ * the corresponding fields in {@link IOConfig}.
+ * This is done for end-user ease of use. Basically, end-user will use the 
same syntax / JSON structure to set
+ * IO configs for Auto Compaction as they would for any other ingestion task.
+ * Note that this class simply holds IO configs and pass it to compaction task 
spec.
  */
-@JsonTypeName("compact")
-public class CompactionIOConfig implements IOConfig
+public class UserCompactionTaskIOConfig
 {
-  private final CompactionInputSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec 
inputSpec)
+  public UserCompactionTaskIOConfig(
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
-    this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING 
: dropExisting;
   }
 
   @JsonProperty
-  public CompactionInputSpec getInputSpec()
+  public boolean isDropExisting()
   {
-    return inputSpec;
+    return dropExisting;
   }
 
   @Override
@@ -58,21 +61,21 @@ public class CompactionIOConfig implements IOConfig
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    CompactionIOConfig that = (CompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    UserCompactionTaskIOConfig that = (UserCompactionTaskIOConfig) o;
+    return dropExisting == that.dropExisting;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(dropExisting);
   }
 
   @Override
   public String toString()
   {
-    return "CompactionIOConfig{" +
-           "inputSpec=" + inputSpec +
+    return "UserCompactionTaskIOConfig{" +
+           "dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b047758..5d31751 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -320,6 +320,11 @@ public class CompactSegments implements CoordinatorDuty
           queryGranularitySpec = null;
         }
 
+        Boolean dropExisting = null;
+        if (config.getIoConfig() != null) {
+          dropExisting = config.getIoConfig().isDropExisting();
+        }
+
         // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             "coordinator-issued",
@@ -327,6 +332,7 @@ public class CompactSegments implements CoordinatorDuty
             config.getTaskPriority(),
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment()),
             queryGranularitySpec,
+            dropExisting,
             newAutoCompactionContext(config.getTaskContext())
         );
 
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index c5a85fc..600b4be 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements 
IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index efe04e3..c798d29 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -58,6 +58,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -84,6 +85,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -127,6 +129,7 @@ public class DataSourceCompactionConfigTest
             null
         ),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -170,6 +173,7 @@ public class DataSourceCompactionConfigTest
             null
         ),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
 
@@ -235,6 +239,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -261,6 +266,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, 
Granularities.MONTH),
+        null,
         ImmutableMap.of("key", "val")
     );
   }
@@ -276,6 +282,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -302,6 +309,62 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(null, null),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), 
fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), 
fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), 
fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
+  }
+
+  @Test
+  public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskIOConfig(true),
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), 
fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), 
fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), 
fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
+    Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
+  }
+
+  @Test
+  public void testSerdeIOConfigWithNullDropExisting() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskIOConfig(null),
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -315,5 +378,6 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
     Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
     Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
+    Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 4bec733..90b3b5d 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -592,6 +593,7 @@ public class CompactSegmentsTest
                 null
             ),
             null,
+            null,
             null
         )
     );
@@ -605,6 +607,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // Only the same amount of segments as the original 
PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@@ -613,6 +616,110 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testCompactWithNotNullIOConfig()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, 
mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            new UserCompactionTaskIOConfig(true),
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<Boolean> dropExistingCapture = 
ArgumentCaptor.forClass(Boolean.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dropExistingCapture.capture(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertEquals(true, dropExistingCapture.getValue());
+  }
+
+  @Test
+  public void testCompactWithNullIOConfig()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, 
mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<Boolean> dropExistingCapture = 
ArgumentCaptor.forClass(Boolean.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dropExistingCapture.capture(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertNull(dropExistingCapture.getValue());
+  }
+
+  @Test
   public void testCompactWithGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
@@ -646,6 +753,7 @@ public class CompactSegmentsTest
                 null
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            null,
             null
         )
     );
@@ -659,6 +767,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment 
granularity to YEAR and all segment
@@ -698,7 +807,8 @@ public class CompactSegmentsTest
                 new ClientCompactionIntervalSpec(
                     Intervals.of("2000/2099"),
                     "testSha256OfSortedSegmentIds"
-                )
+                ),
+                null
             ),
             null,
             new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
@@ -737,6 +847,7 @@ public class CompactSegmentsTest
                 null
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            null,
             null
         )
     );
@@ -755,6 +866,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment 
granularity to YEAR and all segment
@@ -1059,6 +1171,7 @@ public class CompactSegmentsTest
                   null
               ),
               null,
+              null,
               null
           )
       );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index b32a7b7..0cf0ded 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -92,6 +92,7 @@ public class NewestSegmentFirstIteratorTest
         null,
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -131,6 +132,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -170,6 +172,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -209,6 +212,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -248,6 +252,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -287,6 +292,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -326,6 +332,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -365,6 +372,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -404,6 +412,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 9b7f600..5ccd52c 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -1068,6 +1068,7 @@ public class NewestSegmentFirstPolicyTest
         skipOffsetFromLatest,
         null,
         granularitySpec,
+        null,
         null
     );
   }
diff --git a/website/.spelling b/website/.spelling
index ed803c7..5c7237b 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1656,6 +1656,7 @@ HadoopIndexTasks
 HttpEmitter
 HttpPostEmitter
 InetAddress.getLocalHost
+IOConfig
 JRE8u60
 KeyManager
 L1

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to