This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 33d9d9b  Add rollup config to auto and manual compaction (#11850)
33d9d9b is described below

commit 33d9d9bd74ade384ef5feb31748b989122deb160
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Fri Oct 29 10:22:25 2021 -0700

    Add rollup config to auto and manual compaction (#11850)
    
    * add rollup to auto and manual compaction
    
    * add unit tests
    
    * add unit tests
    
    * add IT
    
    * fix checkstyle
---
 docs/configuration/index.md                        |   1 +
 docs/ingestion/compaction.md                       |  11 ++-
 .../druid/indexing/common/task/CompactionTask.java |   6 +-
 .../task/ClientCompactionTaskQuerySerdeTest.java   |  10 +-
 .../common/task/CompactionTaskParallelRunTest.java |   4 +-
 .../common/task/CompactionTaskRunTest.java         |  10 +-
 .../indexing/common/task/CompactionTaskTest.java   |  65 +++++++++++--
 .../coordinator/duty/ITAutoCompactionTest.java     |  72 ++++++++++++---
 .../duty/ITAutoCompactionUpgradeTest.java          |   2 +-
 .../indexer/wikipedia_index_rollup_queries.json    |  56 ++++++++++++
 .../indexer/wikipedia_index_task_no_rollup.json    |  76 ++++++++++++++++
 .../ClientCompactionTaskGranularitySpec.java       |  19 +++-
 .../UserCompactionTaskGranularityConfig.java       |  17 +++-
 .../server/coordinator/duty/CompactSegments.java   |   4 +-
 .../duty/NewestSegmentFirstIterator.java           |  64 ++++++++-----
 .../DataSourceCompactionConfigTest.java            |  40 +++++++-
 .../coordinator/duty/CompactSegmentsTest.java      |  68 +++++++++++++-
 .../coordinator/duty/KillCompactionConfigTest.java |   6 +-
 .../duty/NewestSegmentFirstIteratorTest.java       |  18 ++--
 .../duty/NewestSegmentFirstPolicyTest.java         | 101 +++++++++++++++++----
 .../CoordinatorCompactionConfigsResourceTest.java  |   8 +-
 21 files changed, 542 insertions(+), 116 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 6f5b053..b346f4b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -995,6 +995,7 @@ You can optionally use the `granularitySpec` object to 
configure the segment gra
 |Field|Description|Required|
 |-----|-----------|--------|
 |`segmentGranularity`|Time chunking period for the segment granularity. 
Defaults to 'null', which preserves the original segment granularity. Accepts 
all [Query granularity](../querying/granularities.md) values.|No|
+|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', 
which preserves the original setting. Note that once data is rollup, individual 
records can no longer be recovered. |No|
 
 > Unlike manual compaction, automatic compaction does not support query 
 > granularity.
 
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index 8ccf9a9..88942ca 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -192,7 +192,8 @@ You can optionally use the `granularitySpec` object to 
configure the segment gra
     ,
     "granularitySpec": {
       "segmentGranularity": <time_period>,
-      "queryGranularity": <time_period>
+      "queryGranularity": <time_period>,
+      "rollup": true
     }
     ...
 ```
@@ -203,8 +204,9 @@ You can optionally use the `granularitySpec` object to 
configure the segment gra
 |-----|-----------|--------|
 |`segmentGranularity`|Time chunking period for the segment granularity. 
Defaults to 'null', which preserves the original segment granularity. Accepts 
all [Query granularity](../querying/granularities.md) values.|No|
 |`queryGranularity`|Time chunking period for the query granularity. Defaults 
to 'null', which preserves the original query granularity. Accepts all [Query 
granularity](../querying/granularities.md) values. Not supported for automatic 
compaction.|No|
+|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', 
which preserves the original setting. Note that once data is rollup, individual 
records can no longer be recovered. |No|
 
-For example, to set the segment granularity to "day" and the query granularity 
to "hour":
+For example, to set the segment granularity to "day", the query granularity to 
"hour", and enabling rollup:
 ```json
 {
   "type" : "compact",
@@ -213,11 +215,12 @@ For example, to set the segment granularity to "day" and 
the query granularity t
     "type": "compact",
     "inputSpec": {
       "type": "interval",
-      "interval": "2017-01-01/2018-01-01",
+      "interval": "2017-01-01/2018-01-01"
     },
     "granularitySpec": {
       "segmentGranularity":"day",
-      "queryGranularity":"hour"
+      "queryGranularity":"hour",
+      "rollup": true
     }
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index cdb637f..e2098e0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -227,7 +227,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       ));
     }
     if (granularitySpec == null && segmentGranularity != null) {
-      this.granularitySpec = new 
ClientCompactionTaskGranularitySpec(segmentGranularity, null);
+      this.granularitySpec = new 
ClientCompactionTaskGranularitySpec(segmentGranularity, null, null);
     } else {
       this.granularitySpec = granularitySpec;
     }
@@ -600,7 +600,7 @@ public class CompactionTask extends AbstractBatchIndexTask
             dimensionsSpec,
             metricsSpec,
             granularitySpec == null
-            ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, 
null)
+            ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, 
null, null)
             : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
         );
 
@@ -729,7 +729,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     final GranularitySpec uniformGranularitySpec = new UniformGranularitySpec(
         Preconditions.checkNotNull(granularitySpec.getSegmentGranularity()),
         queryGranularityToUse,
-        rollup.get(),
+        granularitySpec.isRollup() == null ? rollup.get() : 
granularitySpec.isRollup(),
         Collections.singletonList(totalInterval)
     );
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index fd7b699..8d78d32 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -116,7 +116,7 @@ public class ClientCompactionTaskQuerySerdeTest
             1000,
             100
         ),
-        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR),
+        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
         ImmutableMap.of("key", "value")
     );
 
@@ -203,6 +203,10 @@ public class ClientCompactionTaskQuerySerdeTest
         task.getGranularitySpec().getSegmentGranularity()
     );
     Assert.assertEquals(
+        query.getGranularitySpec().isRollup(),
+        task.getGranularitySpec().isRollup()
+    );
+    Assert.assertEquals(
         query.getIoConfig().isDropExisting(),
         task.getIoConfig().isDropExisting()
     );
@@ -264,7 +268,7 @@ public class ClientCompactionTaskQuerySerdeTest
                 null
             )
         )
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, 
true))
         .build();
 
     final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -307,7 +311,7 @@ public class ClientCompactionTaskQuerySerdeTest
             1000,
             100
         ),
-        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR),
+        new ClientCompactionTaskGranularitySpec(Granularities.DAY, 
Granularities.HOUR, true),
         new HashMap<>()
     );
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 1732b3b..1d4b03a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -463,7 +463,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
         // Set the dropExisting flag to true in the IOConfig of the compaction 
task
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
         
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
@@ -496,7 +496,7 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
     final CompactionTask compactionTask = builder
         .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
         
.tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null, null))
         .build();
 
     final Set<DataSegment> compactedSegments = runTask(compactionTask);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 19c6575..eebd875 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -605,7 +605,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     // day segmentGranularity
     final CompactionTask compactionTask1 = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, null))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null))
         .build();
 
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -626,7 +626,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     // hour segmentGranularity
     final CompactionTask compactionTask2 = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, null))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, null, null))
         .build();
 
     resultPair = runTask(compactionTask2);
@@ -660,7 +660,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     // day queryGranularity
     final CompactionTask compactionTask1 = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
-        .granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
Granularities.SECOND))
+        .granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
Granularities.SECOND, null))
         .build();
 
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -705,7 +705,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     // day segmentGranularity and day queryGranularity
     final CompactionTask compactionTask1 = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
-        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY))
+        .granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.DAY, null))
         .build();
 
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
@@ -737,7 +737,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
 
     final CompactionTask compactionTask1 = builder
         .interval(Intervals.of("2014-01-01/2014-01-02"))
-        .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null))
+        .granularitySpec(new ClientCompactionTaskGranularitySpec(null, null, 
null))
         .build();
 
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 43d503d..e01f047 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -385,7 +385,7 @@ public class CompactionTaskTest
     );
     builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder2.tuningConfig(createTuningConfig());
-    builder2.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
+    builder2.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, 
null));
     final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
     Assert.assertEquals(
         taskCreatedWithGranularitySpec.getSegmentGranularity(),
@@ -404,7 +404,7 @@ public class CompactionTaskTest
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
     builder.segmentGranularity(Granularities.HOUR);
-    builder.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY));
+    builder.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY, 
null));
     try {
       builder.build();
     }
@@ -433,7 +433,7 @@ public class CompactionTaskTest
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
     builder.segmentGranularity(Granularities.HOUR);
-    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
Granularities.DAY));
+    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
Granularities.DAY, null));
     try {
       builder.build();
     }
@@ -462,7 +462,7 @@ public class CompactionTaskTest
     builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
     builder.tuningConfig(createTuningConfig());
     builder.segmentGranularity(Granularities.HOUR);
-    builder.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY));
+    builder.granularitySpec(new 
ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, 
null));
     final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
     Assert.assertEquals(Granularities.HOUR, 
taskCreatedWithSegmentGranularity.getSegmentGranularity());
   }
@@ -1315,7 +1315,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(TUNING_CONFIG),
         null,
         null,
-        new ClientCompactionTaskGranularitySpec(new 
PeriodGranularity(Period.months(3), null, null), null),
+        new ClientCompactionTaskGranularitySpec(new 
PeriodGranularity(Period.months(3), null, null), null, null),
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
@@ -1353,7 +1353,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(TUNING_CONFIG),
         null,
         null,
-        new ClientCompactionTaskGranularitySpec(null, new 
PeriodGranularity(Period.months(3), null, null)),
+        new ClientCompactionTaskGranularitySpec(null, new 
PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
@@ -1391,7 +1391,8 @@ public class CompactionTaskTest
         null,
         new ClientCompactionTaskGranularitySpec(
             new PeriodGranularity(Period.months(3), null, null),
-            new PeriodGranularity(Period.months(3), null, null)
+            new PeriodGranularity(Period.months(3), null, null),
+            null
         ),
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
@@ -1467,7 +1468,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(TUNING_CONFIG),
         null,
         null,
-        new ClientCompactionTaskGranularitySpec(null, null),
+        new ClientCompactionTaskGranularitySpec(null, null, null),
         COORDINATOR_CLIENT,
         segmentCacheManagerFactory,
         RETRY_POLICY_FACTORY,
@@ -1494,6 +1495,54 @@ public class CompactionTaskTest
   }
 
   @Test
+  public void testGranularitySpecWithNotNullRollup()
+      throws IOException, SegmentLoadingException
+  {
+    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        LockGranularity.TIME_CHUNK,
+        new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
+        new PartitionConfigurationManager(TUNING_CONFIG),
+        null,
+        null,
+        new ClientCompactionTaskGranularitySpec(null, null, true),
+        COORDINATOR_CLIENT,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
+    );
+
+    Assert.assertEquals(6, ingestionSpecs.size());
+    for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
+      
Assert.assertTrue(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
+    }
+  }
+
+  @Test
+  public void testGranularitySpecWithNullRollup()
+      throws IOException, SegmentLoadingException
+  {
+    final List<ParallelIndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        LockGranularity.TIME_CHUNK,
+        new SegmentProvider(DATA_SOURCE, new 
CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
+        new PartitionConfigurationManager(TUNING_CONFIG),
+        null,
+        null,
+        new ClientCompactionTaskGranularitySpec(null, null, null),
+        COORDINATOR_CLIENT,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
+    );
+    Assert.assertEquals(6, ingestionSpecs.size());
+    for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
+      //Expect false since rollup value in metadata of existing segments are 
null
+      
Assert.assertFalse(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
+    }
+  }
+
+  @Test
   public void testChooseFinestGranularityWithNulls()
   {
     List<Granularity> input = Arrays.asList(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 83cc761..495ecf1 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.tests.coordinator.duty;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
@@ -71,6 +73,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 {
   private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
   private static final String INDEX_TASK = 
"/indexer/wikipedia_index_task.json";
+  private static final String INDEX_TASK_NO_ROLLUP = 
"/indexer/wikipedia_index_task_no_rollup.json";
+  private static final String INDEX_ROLLUP_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_rollup_queries.json";
   private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
   private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
   private static final Period NO_SKIP_OFFSET = Period.seconds(0);
@@ -290,7 +294,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to true
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -307,7 +311,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.DAY;
       // Set dropExisting to true
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
       LOG.info("Auto compaction test with DAY segment granularity");
 
@@ -340,7 +344,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to false
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -357,7 +361,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.DAY;
       // Set dropExisting to false
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
 
       LOG.info("Auto compaction test with DAY segment granularity");
 
@@ -397,7 +401,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null));
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null));
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -439,7 +443,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       // Segments were compacted and already has DAY granularity since it was 
initially ingested with DAY granularity.
       // Now set auto compaction with DAY granularity in the granularitySpec
       Granularity newGranularity = Granularities.DAY;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null));
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null));
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@@ -471,7 +475,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       // Segments were compacted and already has DAY granularity since it was 
initially ingested with DAY granularity.
       // Now set auto compaction with DAY granularity in the granularitySpec
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null));
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null));
       forceTriggerAutoCompaction(1);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
@@ -495,7 +499,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to true
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR)
@@ -521,7 +525,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.MONTH;
       // Set dropExisting to true
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
       // Since dropExisting is set to true...
       // This will submit a single compaction task for interval of 
2013-01-01/2014-01-01 with MONTH granularity
       expectedIntervalAfterCompaction = new ArrayList<>();
@@ -553,7 +557,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to false
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR)
@@ -579,7 +583,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.MONTH;
       // Set dropExisting to false
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
       // Since dropExisting is set to true...
       // This will submit a single compaction task for interval of 
2013-01-01/2014-01-01 with MONTH granularity
       expectedIntervalAfterCompaction = new ArrayList<>();
@@ -605,6 +609,38 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testAutoCompactionDutyWithRollup() throws Exception
+  {
+    loadData(INDEX_TASK_NO_ROLLUP);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      Map<String, Object> expectedResult = ImmutableMap.of(
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          false
+      );
+      forceTriggerAutoCompaction(2);
+      expectedResult = ImmutableMap.of(
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", 
ImmutableList.of(ImmutableMap.of("events", 
ImmutableList.of(ImmutableList.of(516.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
+
+      List<TaskResponseObject> compactTasksBefore = 
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(2);
+      List<TaskResponseObject> compactTasksAfter = 
indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
@@ -626,6 +662,11 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
   private void verifyQuery(String queryResource) throws Exception
   {
+    verifyQuery(queryResource, ImmutableMap.of());
+  }
+
+  private void verifyQuery(String queryResource, Map<String, Object> 
expectedResults) throws Exception
+  {
     String queryResponseTemplate;
     try {
       InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(queryResource);
@@ -634,13 +675,18 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     catch (IOException e) {
       throw new ISE(e, "could not read query file: %s", queryResource);
     }
-
     queryResponseTemplate = StringUtils.replace(
         queryResponseTemplate,
         "%%DATASOURCE%%",
         fullDatasourceName
     );
-
+    for (Map.Entry<String, Object> entry : expectedResults.entrySet()) {
+      queryResponseTemplate = StringUtils.replace(
+          queryResponseTemplate,
+          entry.getKey(),
+          jsonMapper.writeValueAsString(entry.getValue())
+      );
+    }
     queryHelper.testQueriesFromString(queryResponseTemplate);
   }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index b960707..a4b1ed6 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -96,7 +96,7 @@ public class ITAutoCompactionUpgradeTest extends 
AbstractIndexerTest
             null,
             1
         ),
-        new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, 
null),
         new UserCompactionTaskIOConfig(true),
         null
     );
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
new file mode 100644
index 0000000..93c58bd
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_queries.json
@@ -0,0 +1,56 @@
+[
+  {
+    "description": "rows count",
+    "query":{
+      "queryType" : "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"day",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-01T00:00"
+      ],
+      "filter": {
+        "type": "selector",
+        "dimension": "language",
+        "value": "en",
+        "extractionFn": null
+      },
+      "aggregations":[
+        {
+          "type": "count",
+          "name": "count"
+        }
+      ]
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-31T00:00:00.000Z",
+        "result" : {
+          "count":%%EXPECTED_COUNT_RESULT%%
+        }
+      }
+    ]
+  },
+  {
+    "description": "scan with filter",
+    "query":{
+      "queryType" : "scan",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"day",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-01T00:00"
+      ],
+      "filter": {
+        "type": "selector",
+        "dimension": "language",
+        "value": "en",
+        "extractionFn": null
+      },
+      "columns": [
+        "added"
+      ],
+      "resultFormat":"compactedList"
+    },
+    "expectedResults": %%EXPECTED_SCAN_RESULT%%,
+    "fieldsToTest": ["events"]
+  }
+]
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
new file mode 100644
index 0000000..821838b
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_index_task_no_rollup.json
@@ -0,0 +1,76 @@
+{
+  "type": "index",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "added",
+          "fieldName": "added"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        },
+        {
+          "name": "thetaSketch",
+          "type": "thetaSketch",
+          "fieldName": "user"
+        },
+        {
+          "name": "quantilesDoublesSketch",
+          "type": "quantilesDoublesSketch",
+          "fieldName": "delta"
+        },
+        {
+          "name": "HLLSketchBuild",
+          "type": "HLLSketchBuild",
+          "fieldName": "user"
+        }
+      ],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "DAY",
+        "rollup": false,
+        "intervals" : [ "2013-08-31/2013-09-02" ]
+      },
+      "parser": {
+        "parseSpec": {
+          "format" : "json",
+          "timestampSpec": {
+            "column": "timestamp"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              {"type": "string", "name": "language", "createBitmapIndex": 
false}
+            ]
+          }
+        }
+      }
+    },
+    "ioConfig": {
+      "type": "index",
+      "firehose": {
+        "type": "local",
+        "baseDir": "/resources/data/batch_index/json",
+        "filter": "wikipedia_index_data*"
+      }
+    },
+    "tuningConfig": {
+      "type": "index",
+      "maxRowsPerSegment": 10,
+      "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%%
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
index 74fd559..3ba732c 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskGranularitySpec.java
@@ -40,15 +40,18 @@ public class ClientCompactionTaskGranularitySpec
 {
   private final Granularity segmentGranularity;
   private final Granularity queryGranularity;
+  private final Boolean rollup;
 
   @JsonCreator
   public ClientCompactionTaskGranularitySpec(
       @JsonProperty("segmentGranularity") Granularity segmentGranularity,
-      @JsonProperty("queryGranularity") Granularity queryGranularity
+      @JsonProperty("queryGranularity") Granularity queryGranularity,
+      @JsonProperty("rollup") Boolean rollup
   )
   {
     this.queryGranularity = queryGranularity;
     this.segmentGranularity = segmentGranularity;
+    this.rollup = rollup;
   }
 
   @JsonProperty
@@ -63,9 +66,15 @@ public class ClientCompactionTaskGranularitySpec
     return queryGranularity;
   }
 
+  @JsonProperty
+  public Boolean isRollup()
+  {
+    return rollup;
+  }
+
   public ClientCompactionTaskGranularitySpec 
withSegmentGranularity(Granularity segmentGranularity)
   {
-    return new ClientCompactionTaskGranularitySpec(segmentGranularity, 
queryGranularity);
+    return new ClientCompactionTaskGranularitySpec(segmentGranularity, 
queryGranularity, rollup);
   }
 
   @Override
@@ -79,13 +88,14 @@ public class ClientCompactionTaskGranularitySpec
     }
     ClientCompactionTaskGranularitySpec that = 
(ClientCompactionTaskGranularitySpec) o;
     return Objects.equals(segmentGranularity, that.segmentGranularity) &&
-           Objects.equals(queryGranularity, that.queryGranularity);
+           Objects.equals(queryGranularity, that.queryGranularity) &&
+           Objects.equals(rollup, that.rollup);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(segmentGranularity, queryGranularity);
+    return Objects.hash(segmentGranularity, queryGranularity, rollup);
   }
 
   @Override
@@ -94,6 +104,7 @@ public class ClientCompactionTaskGranularitySpec
     return "ClientCompactionTaskGranularitySpec{" +
            "segmentGranularity=" + segmentGranularity +
            ", queryGranularity=" + queryGranularity +
+           ", rollup=" + rollup +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
index 9623e2a..d6b320a 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskGranularityConfig.java
@@ -39,15 +39,18 @@ public class UserCompactionTaskGranularityConfig
 {
   private final Granularity segmentGranularity;
   private final Granularity queryGranularity;
+  private final Boolean rollup;
 
   @JsonCreator
   public UserCompactionTaskGranularityConfig(
       @JsonProperty("segmentGranularity") Granularity segmentGranularity,
-      @JsonProperty("queryGranularity") Granularity queryGranularity
+      @JsonProperty("queryGranularity") Granularity queryGranularity,
+      @JsonProperty("rollup") Boolean rollup
   )
   {
     this.queryGranularity = queryGranularity;
     this.segmentGranularity = segmentGranularity;
+    this.rollup = rollup;
   }
 
   @JsonProperty("segmentGranularity")
@@ -62,6 +65,12 @@ public class UserCompactionTaskGranularityConfig
     return queryGranularity;
   }
 
+  @JsonProperty("rollup")
+  public Boolean isRollup()
+  {
+    return rollup;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -73,13 +82,14 @@ public class UserCompactionTaskGranularityConfig
     }
     UserCompactionTaskGranularityConfig that = 
(UserCompactionTaskGranularityConfig) o;
     return Objects.equals(segmentGranularity, that.segmentGranularity) &&
-           Objects.equals(queryGranularity, that.queryGranularity);
+           Objects.equals(queryGranularity, that.queryGranularity) &&
+           Objects.equals(rollup, that.rollup);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(segmentGranularity, queryGranularity);
+    return Objects.hash(segmentGranularity, queryGranularity, rollup);
   }
 
   @Override
@@ -88,6 +98,7 @@ public class UserCompactionTaskGranularityConfig
     return "UserCompactionTaskGranularityConfig{" +
            "segmentGranularity=" + segmentGranularity +
            ", queryGranularity=" + queryGranularity +
+           ", rollup=" + rollup +
            '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 9bb6f9c..0b2a586 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -345,7 +345,9 @@ public class CompactSegments implements CoordinatorDuty
         if (config.getGranularitySpec() != null) {
           queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
               config.getGranularitySpec().getSegmentGranularity(),
-              config.getGranularitySpec().getQueryGranularity()
+              config.getGranularitySpec().getQueryGranularity(),
+              config.getGranularitySpec().isRollup()
+
           );
         } else {
           queryGranularitySpec = null;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 3f9ee93..0291178 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -36,7 +37,6 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.CompactionState;
@@ -309,7 +309,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
   }
 
   @VisibleForTesting
-  static PartitionsSpec 
findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
+  static PartitionsSpec 
findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
   {
     final PartitionsSpec partitionsSpecFromTuningConfig = 
tuningConfig.getPartitionsSpec();
     if (partitionsSpecFromTuningConfig instanceof DynamicPartitionsSpec) {
@@ -332,7 +332,7 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
     Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
     final ClientCompactionTaskQueryTuningConfig tuningConfig =
         ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment());
-    final PartitionsSpec partitionsSpecFromConfig = 
findPartitinosSpecFromConfig(tuningConfig);
+    final PartitionsSpec partitionsSpecFromConfig = 
findPartitionsSpecFromConfig(tuningConfig);
     final CompactionState lastCompactionState = 
candidates.segments.get(0).getLastCompactionState();
     if (lastCompactionState == null) {
       log.info("Candidate segment[%s] is not compacted yet. Needs 
compaction.", candidates.segments.get(0).getId());
@@ -384,32 +384,48 @@ public class NewestSegmentFirstIterator implements 
CompactionSegmentIterator
       return true;
     }
 
-    if (config.getGranularitySpec() != null && 
config.getGranularitySpec().getSegmentGranularity() != null) {
-      // Only checks for segmentGranularity as auto compaction currently only 
supports segmentGranularity
-      final Granularity existingSegmentGranularity = 
lastCompactionState.getGranularitySpec() != null ?
-                                                     
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), 
GranularitySpec.class).getSegmentGranularity() :
-                                                     null;
-      if (existingSegmentGranularity == null) {
-        // Candidate segments were all compacted without segment granularity 
set.
-        // We need to check if all segments have the same segment granularity 
as the configured segment granularity.
-        boolean needsCompaction = candidates.segments.stream()
-                                             .anyMatch(segment -> 
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
-        if (needsCompaction) {
+    if (config.getGranularitySpec() != null) {
+
+      final ClientCompactionTaskGranularitySpec existingGranularitySpec = 
lastCompactionState.getGranularitySpec() != null ?
+                                                                          
objectMapper.convertValue(lastCompactionState.getGranularitySpec(), 
ClientCompactionTaskGranularitySpec.class) :
+                                                                          null;
+      // Checks for segmentGranularity
+      if (config.getGranularitySpec().getSegmentGranularity() != null) {
+        final Granularity existingSegmentGranularity = existingGranularitySpec 
!= null ?
+                                                       
existingGranularitySpec.getSegmentGranularity() :
+                                                       null;
+        if (existingSegmentGranularity == null) {
+          // Candidate segments were all compacted without segment granularity 
set.
+          // We need to check if all segments have the same segment 
granularity as the configured segment granularity.
+          boolean needsCompaction = candidates.segments.stream()
+                                                       .anyMatch(segment -> 
!config.getGranularitySpec().getSegmentGranularity().isAligned(segment.getInterval()));
+          if (needsCompaction) {
+            log.info(
+                "Segments were previously compacted but without 
segmentGranularity in auto compaction."
+                + " Configured segmentGranularity[%s] is different from 
granularity implied by segment intervals. Needs compaction",
+                config.getGranularitySpec().getSegmentGranularity()
+            );
+            return true;
+          }
+
+        } else if 
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
 {
           log.info(
-              "Segments were previously compacted but without 
segmentGranularity in auto compaction."
-              + " Configured segmentGranularity[%s] is different from 
granularity implied by segment intervals. Needs compaction",
-              config.getGranularitySpec().getSegmentGranularity()
+              "Configured segmentGranularity[%s] is different from the 
segmentGranularity[%s] of segments. Needs compaction",
+              config.getGranularitySpec().getSegmentGranularity(),
+              existingSegmentGranularity
           );
           return true;
         }
+      }
 
-      } else if 
(!config.getGranularitySpec().getSegmentGranularity().equals(existingSegmentGranularity))
 {
-        log.info(
-            "Configured segmentGranularity[%s] is different from the 
segmentGranularity[%s] of segments. Needs compaction",
-            config.getGranularitySpec().getSegmentGranularity(),
-            existingSegmentGranularity
-        );
-        return true;
+      // Checks for rollup
+      if (config.getGranularitySpec().isRollup() != null) {
+        final Boolean existingRollup = existingGranularitySpec != null ?
+                                       existingGranularitySpec.isRollup() :
+                                       null;
+        if (existingRollup == null || 
!config.getGranularitySpec().isRollup().equals(existingRollup)) {
+          return true;
+        }
       }
     }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index c798d29..9ff6eac 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -238,7 +238,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -265,7 +265,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, 
Granularities.MONTH),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, 
Granularities.MONTH, null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -308,7 +308,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(null, null),
+        new UserCompactionTaskGranularityConfig(null, null, null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -326,6 +326,36 @@ public class DataSourceCompactionConfigTest
   }
 
   @Test
+  public void testSerdeGranularitySpecWithRollup() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UserCompactionTaskGranularityConfig(null, null, true),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, 
DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), 
fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), 
fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), 
fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), 
fromJson.getGranularitySpec());
+    Assert.assertNotNull(config.getGranularitySpec());
+    Assert.assertNotNull(fromJson.getGranularitySpec());
+    Assert.assertEquals(config.getGranularitySpec().isRollup(), 
fromJson.getGranularitySpec().isRollup());
+  }
+
+  @Test
   public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
   {
     final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@@ -335,7 +365,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         new UserCompactionTaskIOConfig(true),
         ImmutableMap.of("key", "val")
     );
@@ -363,7 +393,7 @@ public class DataSourceCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         new UserCompactionTaskIOConfig(null),
         ImmutableMap.of("key", "val")
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index f4cdd9c..494f567 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -829,7 +829,7 @@ public class CompactSegmentsTest
                 null,
                 null
             ),
-            new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, 
null),
             null,
             null
         )
@@ -852,7 +852,65 @@ public class CompactSegmentsTest
     Assert.assertEquals(datasourceToSegments.get(dataSource).size(), 
segmentsCaptor.getValue().size());
     ClientCompactionTaskGranularitySpec actual = 
granularitySpecArgumentCaptor.getValue();
     Assert.assertNotNull(actual);
-    ClientCompactionTaskGranularitySpec expected = new 
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
+    ClientCompactionTaskGranularitySpec expected = new 
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testCompactWithRollupInGranularitySpec()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = 
Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new 
CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new 
ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, 
true),
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<List<DataSegment>> segmentsCaptor = 
ArgumentCaptor.forClass(List.class);
+    ArgumentCaptor<ClientCompactionTaskGranularitySpec> 
granularitySpecArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskGranularitySpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        segmentsCaptor.capture(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertEquals(datasourceToSegments.get(dataSource).size(), 
segmentsCaptor.getValue().size());
+    ClientCompactionTaskGranularitySpec actual = 
granularitySpecArgumentCaptor.getValue();
+    Assert.assertNotNull(actual);
+    ClientCompactionTaskGranularitySpec expected = new 
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true);
     Assert.assertEquals(expected, actual);
   }
 
@@ -888,7 +946,7 @@ public class CompactSegmentsTest
                 null
             ),
             null,
-            new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
+            new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, 
null),
             null
         )
     );
@@ -923,7 +981,7 @@ public class CompactSegmentsTest
                 null,
                 null
             ),
-            new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, 
null),
             null,
             null
         )
@@ -951,7 +1009,7 @@ public class CompactSegmentsTest
     Assert.assertEquals(datasourceToSegments.get(dataSource).size(), 
segmentsCaptor.getValue().size());
     ClientCompactionTaskGranularitySpec actual = 
granularitySpecArgumentCaptor.getValue();
     Assert.assertNotNull(actual);
-    ClientCompactionTaskGranularitySpec expected = new 
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null);
+    ClientCompactionTaskGranularitySpec expected = new 
ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null);
     Assert.assertEquals(expected, actual);
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index 4aad5df..a7d1d37 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -230,7 +230,7 @@ public class KillCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -242,7 +242,7 @@ public class KillCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -346,7 +346,7 @@ public class KillCompactionConfigTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 0cf0ded..092bc9f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -97,7 +97,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -137,7 +137,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -177,7 +177,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, 1000L),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -217,7 +217,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -257,7 +257,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -297,7 +297,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -337,7 +337,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -377,7 +377,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
@@ -417,7 +417,7 @@ public class NewestSegmentFirstIteratorTest
     );
     Assert.assertEquals(
         new SingleDimensionPartitionsSpec(10000, null, "dim", false),
-        NewestSegmentFirstIterator.findPartitinosSpecFromConfig(
+        NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
             
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment())
         )
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 42cc20c..b79156f 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -395,7 +395,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -422,7 +422,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -448,7 +448,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P1D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -557,7 +557,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -606,7 +606,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -641,7 +641,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MINUTE, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -664,7 +664,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.MONTH, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -688,7 +688,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -708,7 +708,7 @@ public class NewestSegmentFirstPolicyTest
 
     // Auto compaction config sets segmentGranularity=DAY
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -721,7 +721,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -741,7 +741,7 @@ public class NewestSegmentFirstPolicyTest
 
     // Auto compaction config sets segmentGranularity=DAY
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -754,7 +754,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -774,7 +774,7 @@ public class NewestSegmentFirstPolicyTest
 
     // Auto compaction config sets segmentGranularity=YEAR
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -797,7 +797,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -817,7 +817,7 @@ public class NewestSegmentFirstPolicyTest
 
     // Auto compaction config sets segmentGranularity=YEAR
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.YEAR, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -840,7 +840,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -865,6 +865,7 @@ public class NewestSegmentFirstPolicyTest
                                     null,
                                     
DateTimeZone.forTimeZone(TimeZone.getTimeZone("Asia/Bangkok"))
                                 ),
+                                null,
                                 null
                             )
                         )
@@ -891,7 +892,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -915,6 +916,7 @@ public class NewestSegmentFirstPolicyTest
                                     DateTimes.of("2012-01-02T00:05:00.000Z"),
                                     DateTimeZone.UTC
                                 ),
+                                null,
                                 null
                             )
                         )
@@ -936,6 +938,66 @@ public class NewestSegmentFirstPolicyTest
   }
 
   @Test
+  public void 
testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentRollup()
+  {
+    // Same indexSpec as what is set in the auto compaction config
+    Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new 
TypeReference<Map<String, Object>>() {});
+    // Same partitionsSpec as what is set in the auto compaction config
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+
+    // Create segments that were compacted (CompactionState != null) and have
+    // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+    // rollup=true for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+    // and rollup=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, indexSpec, 
ImmutableMap.of("rollup", "false"))
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, indexSpec, 
ImmutableMap.of("rollup", "true"))
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+        )
+    );
+
+    // Auto compaction config sets rollup=true
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(null, null, true))),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00 and 
interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    Assert.assertTrue(iterator.hasNext());
+    expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
   public void 
testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
   {
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -944,7 +1006,7 @@ public class NewestSegmentFirstPolicyTest
     );
 
     final CompactionSegmentIterator iterator = policy.reset(
-        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, 
null))),
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(130000, new 
Period("P0D"), new UserCompactionTaskGranularityConfig(Granularities.HOUR, 
null, null))),
         ImmutableMap.of(DATA_SOURCE, timeline),
         Collections.emptyMap()
     );
@@ -971,7 +1033,7 @@ public class NewestSegmentFirstPolicyTest
     // Different indexSpec as what is set in the auto compaction config
     IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), 
null, null, null);
     Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, 
new TypeReference<Map<String, Object>>() {});
-    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitinosSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
+    PartitionsSpec partitionsSpec = 
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null,
 null));
 
     // Create segments that were compacted (CompactionState != null) and have 
segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = 
createTimeline(
@@ -995,6 +1057,7 @@ public class NewestSegmentFirstPolicyTest
                                     null,
                                     DateTimeZone.UTC
                                 ),
+                                null,
                                 null
                             )
                         )
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index c130dc3..353550d 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -55,7 +55,7 @@ public class CoordinatorCompactionConfigsResourceTest
       null,
       new Period(3600),
       null,
-      new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+      new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
       null,
       ImmutableMap.of("key", "val")
   );
@@ -150,7 +150,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
true),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -190,7 +190,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );
@@ -311,7 +311,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new Period(3600),
         null,
-        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, 
null),
         null,
         ImmutableMap.of("key", "val")
     );

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to