This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad6a73  Add support segmentGranularity for CompactionTask (#6758)
9ad6a73 is described below

commit 9ad6a733a58e81ef2e0dee067b1df8477af1dab4
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Thu Jan 3 17:50:45 2019 -0800

    Add support segmentGranularity for CompactionTask (#6758)
    
    * Add support segmentGranularity
    
    * add doc and fix combination of options
    
    * improve doc
---
 docs/content/ingestion/compaction.md               |  21 +-
 docs/content/ingestion/ingestion-spec.md           |   9 +-
 .../druid/indexing/common/task/CompactionTask.java | 148 ++++---
 .../common/task/CompactionTaskRunTest.java         | 482 +++++++++++++++++++++
 .../indexing/common/task/CompactionTaskTest.java   | 274 +++++++++---
 .../druid/indexing/common/task/IndexTaskTest.java  |   2 +-
 .../indexing/common/task/IngestionTestBase.java    | 100 ++++-
 .../granularity/UniformGranularitySpec.java        |   2 +-
 8 files changed, 916 insertions(+), 122 deletions(-)

diff --git a/docs/content/ingestion/compaction.md 
b/docs/content/ingestion/compaction.md
index cd7345f..2991584 100644
--- a/docs/content/ingestion/compaction.md
+++ b/docs/content/ingestion/compaction.md
@@ -34,6 +34,7 @@ Compaction tasks merge all segments of the given interval. 
The syntax is:
     "interval": <interval to specify segments to be merged>,
     "dimensions" <custom dimensionsSpec>,
     "keepSegmentGranularity": <true or false>,
+    "segmentGranularity": <segment granularity after compaction>,
     "targetCompactionSizeBytes": <target size of compacted segments>
     "tuningConfig" <index task tuningConfig>,
     "context": <task context>
@@ -47,11 +48,23 @@ Compaction tasks merge all segments of the given interval. 
The syntax is:
 |`dataSource`|DataSource name to be compacted|Yes|
 |`interval`|Interval of segments to be compacted|Yes|
 |`dimensions`|Custom dimensionsSpec. compaction task will use this 
dimensionsSpec if exist instead of generating one. See below for more 
details.|No|
-|`keepSegmentGranularity`|If set to true, compactionTask will keep the time 
chunk boundaries and merge segments only if they fall into the same time 
chunk.|No (default = true)|
+|`segmentGranularity`|If this is set, compactionTask will change the segment 
granularity for the given interval. See [segmentGranularity of Uniform 
Granularity Spec](./ingestion-spec.html#uniform-granularity-spec) for more 
details. See the below table for the behavior.|No|
+|`keepSegmentGranularity`|Deprecated. Please use `segmentGranularity` instead. 
See the below table for its behavior.|No|
 |`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be 
used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in 
tuningConfig.|No|
 |`tuningConfig`|[Index task 
tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No|
 |`context`|[Task 
context](../ingestion/locking-and-priority.html#task-context)|No|
 
+### Used segmentGranularity based on `segmentGranularity` and 
`keepSegmentGranularity`
+
+|SegmentGranularity|keepSegmentGranularity|Used SegmentGranularity|
+|------------------|----------------------|-----------------------|
+|Non-null|True|Error|
+|Non-null|False|Given segmentGranularity|
+|Non-null|Null|Given segmentGranularity|
+|Null|True|Original segmentGranularity|
+|Null|False|ALL segmentGranularity. All events will fall into the single time 
chunk.|
+|Null|Null|Original segmentGranularity|
+
 An example of compaction task is
 
 ```json
@@ -63,9 +76,9 @@ An example of compaction task is
 ```
 
 This compaction task reads _all segments_ of the interval 
`2017-01-01/2018-01-01` and results in new segments.
-Note that intervals of the input segments are merged into a single interval of 
`2017-01-01/2018-01-01` no matter what the segmentGranularity was.
-To control the number of result segments, you can set `targetPartitionSize` or 
`numShards`. See 
[indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more 
details.
-To merge each day's worth of data into separate segments, you can submit 
multiple `compact` tasks, one for each day. They will run in parallel.
+Since both `segmentGranularity` and `keepSegmentGranularity` are null, the 
original segment granularity will be remained and not changed after compaction.
+To control the number of result segments per time chunk, you can set 
`targetPartitionSize` or `numShards`. See 
[indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more 
details.
+Please note that you can run multiple compactionTasks at the same time. For 
example, you can run 12 compactionTasks per month instead of running a single 
task for the entire year.
 
 A compaction task internally generates an `index` task spec for performing 
compaction work with some fixed parameters.
 For example, its `firehose` is always the 
[ingestSegmentSpec](./firehose.html#ingestsegmentfirehose), and 
`dimensionsSpec` and `metricsSpec`
diff --git a/docs/content/ingestion/ingestion-spec.md 
b/docs/content/ingestion/ingestion-spec.md
index 5f54906..2eb5670 100644
--- a/docs/content/ingestion/ingestion-spec.md
+++ b/docs/content/ingestion/ingestion-spec.md
@@ -271,7 +271,8 @@ for the `comment` column.
  
 ## GranularitySpec
 
-The default granularity spec is `uniform`, and can be changed by setting the 
`type` field.
+GranularitySpec is to define how to partition a dataSource into [time 
chunks](../design/index.html#datasources-and-segments).
+The default granularitySpec is `uniform`, and can be changed by setting the 
`type` field.
 Currently, `uniform` and `arbitrary` types are supported.
 
 ### Uniform Granularity Spec
@@ -280,8 +281,8 @@ This spec is used to generated segments with uniform 
intervals.
 
 | Field | Type | Description | Required |
 |-------|------|-------------|----------|
-| segmentGranularity | string | The granularity to create segments at. | no 
(default == 'DAY') |
-| queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity.| no (default == 
'NONE') |
+| segmentGranularity | string | The granularity to create time chunks at. 
Multiple segments can be created per time chunk. For example, with 'DAY' 
`segmentGranularity`, the events of the same day fall into the same time chunk 
which can be optionally further partitioned into multiple segments based on 
other configurations and input size. See 
[Granularity](../querying/granularities.html) for supported granularities.| no 
(default == 'DAY') |
+| queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity. See 
[Granularity](../querying/granularit [...]
 | rollup | boolean | rollup or not | no (default == true) |
 | intervals | string | A list of intervals for the raw data being ingested. 
Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may 
skip determining partitions phase which results in faster ingestion. |
 
@@ -291,7 +292,7 @@ This spec is used to generate segments with arbitrary 
intervals (it tries to cre
 
 | Field | Type | Description | Required |
 |-------|------|-------------|----------|
-| queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity.| no (default == 
'NONE') |
+| queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity. See 
[Granularity](../querying/granularit [...]
 | rollup | boolean | rollup or not | no (default == true) |
 | intervals | string | A list of intervals for the raw data being ingested. 
Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may 
skip determining partitions phase which results in faster ingestion. |
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 3c69923..69cd5c6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -54,6 +54,8 @@ import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -64,8 +66,8 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -90,6 +92,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -99,12 +102,14 @@ public class CompactionTask extends AbstractTask
 {
   private static final Logger log = new Logger(CompactionTask.class);
   private static final String TYPE = "compact";
-  private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
 
   private final Interval interval;
   private final List<DataSegment> segments;
   private final DimensionsSpec dimensionsSpec;
-  private final boolean keepSegmentGranularity;
+  @Deprecated
+  @Nullable
+  private final Boolean keepSegmentGranularity;
+  private final Granularity segmentGranularity;
   @Nullable
   private final Long targetCompactionSizeBytes;
   @Nullable
@@ -135,7 +140,8 @@ public class CompactionTask extends AbstractTask
       @Nullable @JsonProperty("interval") final Interval interval,
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @Nullable @JsonProperty("dimensions") final DimensionsSpec 
dimensionsSpec,
-      @Nullable @JsonProperty("keepSegmentGranularity") final Boolean 
keepSegmentGranularity,
+      @Nullable @JsonProperty("keepSegmentGranularity") @Deprecated final 
Boolean keepSegmentGranularity,
+      @Nullable @JsonProperty("segmentGranularity") final Granularity 
segmentGranularity,
       @Nullable @JsonProperty("targetCompactionSizeBytes") final Long 
targetCompactionSizeBytes,
       @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig 
tuningConfig,
       @Nullable @JsonProperty("context") final Map<String, Object> context,
@@ -153,12 +159,19 @@ public class CompactionTask extends AbstractTask
       throw new IAE("Interval[%s] is empty, must specify a nonempty interval", 
interval);
     }
 
+    if ((keepSegmentGranularity != null && keepSegmentGranularity) && 
segmentGranularity != null) {
+      throw new IAE("keepSegmentGranularity and segmentGranularity can't be 
used together");
+    }
+
+    if (keepSegmentGranularity != null) {
+      log.warn("keepSegmentGranularity is deprecated. Set a proper 
segmentGranularity instead");
+    }
+
     this.interval = interval;
     this.segments = segments;
     this.dimensionsSpec = dimensionsSpec;
-    this.keepSegmentGranularity = keepSegmentGranularity == null
-                                  ? DEFAULT_KEEP_SEGMENT_GRANULARITY
-                                  : keepSegmentGranularity;
+    this.keepSegmentGranularity = keepSegmentGranularity;
+    this.segmentGranularity = segmentGranularity;
     this.targetCompactionSizeBytes = targetCompactionSizeBytes;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
@@ -188,11 +201,19 @@ public class CompactionTask extends AbstractTask
   }
 
   @JsonProperty
-  public boolean isKeepSegmentGranularity()
+  @Deprecated
+  @Nullable
+  public Boolean isKeepSegmentGranularity()
   {
     return keepSegmentGranularity;
   }
 
+  @JsonProperty
+  public Granularity getSegmentGranularity()
+  {
+    return segmentGranularity;
+  }
+
   @Nullable
   @JsonProperty
   public Long getTargetCompactionSizeBytes()
@@ -243,6 +264,7 @@ public class CompactionTask extends AbstractTask
           partitionConfigurationManager,
           dimensionsSpec,
           keepSegmentGranularity,
+          segmentGranularity,
           jsonMapper
       ).stream()
       .map(spec -> new IndexTask(
@@ -300,7 +322,8 @@ public class CompactionTask extends AbstractTask
       final SegmentProvider segmentProvider,
       final PartitionConfigurationManager partitionConfigurationManager,
       final DimensionsSpec dimensionsSpec,
-      final boolean keepSegmentGranularity,
+      @Nullable final Boolean keepSegmentGranularity,
+      @Nullable final Granularity segmentGranularity,
       final ObjectMapper jsonMapper
   ) throws IOException, SegmentLoadingException
   {
@@ -326,62 +349,83 @@ public class CompactionTask extends AbstractTask
         queryableIndexAndSegments
     );
 
-    if (keepSegmentGranularity) {
-      // If keepSegmentGranularity = true, create indexIngestionSpec per 
segment interval, so that we can run an index
-      // task per segment interval.
+    if (segmentGranularity == null) {
+      if (keepSegmentGranularity != null && !keepSegmentGranularity) {
+        // all granularity
+        final DataSchema dataSchema = createDataSchema(
+            segmentProvider.dataSource,
+            segmentProvider.interval,
+            queryableIndexAndSegments,
+            dimensionsSpec,
+            Granularities.ALL,
+            jsonMapper
+        );
 
-      //noinspection unchecked,ConstantConditions
-      final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> 
intervalToSegments = queryableIndexAndSegments
-          .stream()
-          .collect(
-              Collectors.toMap(
-                  // rhs can't be null here so we skip null checking and 
supress the warning with the above comment
-                  p -> p.rhs.getInterval(),
-                  Lists::newArrayList,
-                  (l1, l2) -> {
-                    l1.addAll(l2);
-                    return l1;
-                  }
+        return Collections.singletonList(
+            new IndexIngestionSpec(
+                dataSchema,
+                createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+                compactionTuningConfig
+            )
+        );
+      } else {
+        // original granularity
+        final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> 
intervalToSegments = new TreeMap<>(
+            Comparators.intervalsByStartThenEnd()
+        );
+        //noinspection ConstantConditions
+        queryableIndexAndSegments.forEach(
+            p -> intervalToSegments.computeIfAbsent(p.rhs.getInterval(), k -> 
new ArrayList<>())
+                                   .add(p)
+        );
+
+        final List<IndexIngestionSpec> specs = new 
ArrayList<>(intervalToSegments.size());
+        for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : 
intervalToSegments.entrySet()) {
+          final Interval interval = entry.getKey();
+          final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = 
entry.getValue();
+          final DataSchema dataSchema = createDataSchema(
+              segmentProvider.dataSource,
+              interval,
+              segmentsToCompact,
+              dimensionsSpec,
+              
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(),
+              jsonMapper
+          );
+
+          specs.add(
+              new IndexIngestionSpec(
+                  dataSchema,
+                  createIoConfig(toolbox, dataSchema, interval),
+                  compactionTuningConfig
               )
           );
-      final List<IndexIngestionSpec> specs = new 
ArrayList<>(intervalToSegments.size());
-      for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : 
intervalToSegments.entrySet()) {
-        final Interval interval = entry.getKey();
-        final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = 
entry.getValue();
+        }
+
+        return specs;
+      }
+    } else {
+      if (keepSegmentGranularity != null && keepSegmentGranularity) {
+        // error
+        throw new ISE("segmentGranularity[%s] and keepSegmentGranularity can't 
be used together", segmentGranularity);
+      } else {
+        // given segment granularity
         final DataSchema dataSchema = createDataSchema(
             segmentProvider.dataSource,
-            interval,
-            segmentsToCompact,
+            segmentProvider.interval,
+            queryableIndexAndSegments,
             dimensionsSpec,
+            segmentGranularity,
             jsonMapper
         );
 
-        specs.add(
+        return Collections.singletonList(
             new IndexIngestionSpec(
                 dataSchema,
-                createIoConfig(toolbox, dataSchema, interval),
+                createIoConfig(toolbox, dataSchema, segmentProvider.interval),
                 compactionTuningConfig
             )
         );
       }
-
-      return specs;
-    } else {
-      final DataSchema dataSchema = createDataSchema(
-          segmentProvider.dataSource,
-          segmentProvider.interval,
-          queryableIndexAndSegments,
-          dimensionsSpec,
-          jsonMapper
-      );
-
-      return Collections.singletonList(
-          new IndexIngestionSpec(
-              dataSchema,
-              createIoConfig(toolbox, dataSchema, segmentProvider.interval),
-              compactionTuningConfig
-          )
-      );
     }
   }
 
@@ -419,6 +463,7 @@ public class CompactionTask extends AbstractTask
       Interval totalInterval,
       List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
       DimensionsSpec dimensionsSpec,
+      Granularity segmentGranularity,
       ObjectMapper jsonMapper
   )
   {
@@ -447,7 +492,8 @@ public class CompactionTask extends AbstractTask
       return isRollup != null && isRollup;
     });
 
-    final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Preconditions.checkNotNull(segmentGranularity),
         Granularities.NONE,
         rollup,
         Collections.singletonList(totalInterval)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
new file mode 100644
index 0000000..5020baa
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.NoopDataSegmentKiller;
+import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class CompactionTaskRunTest extends IngestionTestBase
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
+      new TimestampSpec(
+          "ts",
+          "auto",
+          null
+      ),
+      new DimensionsSpec(
+          DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
+          Collections.emptyList(),
+          Collections.emptyList()
+      ),
+      null,
+      Arrays.asList("ts", "dim", "val"),
+      false,
+      0
+  );
+
+  private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private ExecutorService exec;
+
+  public CompactionTaskRunTest()
+  {
+    TestUtils testUtils = new TestUtils();
+    rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
+  }
+
+  @Before
+  public void setup()
+  {
+    exec = Execs.multiThreaded(2, "compaction-task-run-test-%d");
+  }
+
+  @After
+  public void teardown()
+  {
+    exec.shutdownNow();
+  }
+
+  @Test
+  public void testRun() throws Exception
+  {
+    runIndexTask();
+
+    final CompactionTask compactionTask = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    final Pair<TaskStatus, List<DataSegment>> resultPair = 
runTask(compactionTask);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    final List<DataSegment> segments = resultPair.rhs;
+    Assert.assertEquals(3, segments.size());
+
+    for (int i = 0; i < 3; i++) {
+      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(i).getShardSpec());
+    }
+  }
+
+  @Test
+  public void testRunCompactionTwiceWithoutKeepSegmentGranularity() throws 
Exception
+  {
+    runIndexTask();
+
+    final CompactionTask compactionTask1 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    List<DataSegment> segments = resultPair.rhs;
+    Assert.assertEquals(1, segments.size());
+
+    Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
+    Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(0).getShardSpec());
+
+    final CompactionTask compactionTask2 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        false,
+        null,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    resultPair = runTask(compactionTask2);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    segments = resultPair.rhs;
+    Assert.assertEquals(1, segments.size());
+
+    Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
+    Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(0).getShardSpec());
+  }
+
+  @Test
+  public void testRunCompactionTwiceWithKeepSegmentGranularity() throws 
Exception
+  {
+    runIndexTask();
+
+    final CompactionTask compactionTask1 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        true,
+        null,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    List<DataSegment> segments = resultPair.rhs;
+    Assert.assertEquals(3, segments.size());
+
+    for (int i = 0; i < 3; i++) {
+      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(i).getShardSpec());
+    }
+
+    final CompactionTask compactionTask2 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        true,
+        null,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    resultPair = runTask(compactionTask2);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    segments = resultPair.rhs;
+    Assert.assertEquals(3, segments.size());
+
+    for (int i = 0; i < 3; i++) {
+      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(i).getShardSpec());
+    }
+  }
+
+  @Test
+  public void testWithSegmentGranularity() throws Exception
+  {
+    runIndexTask();
+
+    // day segmentGranularity
+    final CompactionTask compactionTask1 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        null,
+        Granularities.DAY,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    List<DataSegment> segments = resultPair.rhs;
+
+    Assert.assertEquals(1, segments.size());
+
+    Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), 
segments.get(0).getInterval());
+    Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(0).getShardSpec());
+
+    // hour segmentGranularity
+    final CompactionTask compactionTask2 = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        Intervals.of("2014-01-01/2014-01-02"),
+        null,
+        null,
+        null,
+        Granularities.HOUR,
+        null,
+        null,
+        null,
+        getObjectMapper(),
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    resultPair = runTask(compactionTask2);
+
+    Assert.assertTrue(resultPair.lhs.isSuccess());
+
+    segments = resultPair.rhs;
+    Assert.assertEquals(3, segments.size());
+
+    for (int i = 0; i < 3; i++) {
+      
Assert.assertEquals(Intervals.of("2014-01-01T0%d:00:00/2014-01-01T0%d:00:00", 
i, i + 1), segments.get(i).getInterval());
+      Assert.assertEquals(new NumberedShardSpec(0, 0), 
segments.get(i).getShardSpec());
+    }
+  }
+
+  private Pair<TaskStatus, List<DataSegment>> runIndexTask() throws Exception
+  {
+    File tmpDir = temporaryFolder.newFolder();
+    File tmpFile = File.createTempFile("druid", "index", tmpDir);
+
+    try (BufferedWriter writer = Files.newWriter(tmpFile, 
StandardCharsets.UTF_8)) {
+      writer.write("2014-01-01T00:00:10Z,a,1\n");
+      writer.write("2014-01-01T00:00:10Z,b,2\n");
+      writer.write("2014-01-01T00:00:10Z,c,3\n");
+      writer.write("2014-01-01T01:00:20Z,a,1\n");
+      writer.write("2014-01-01T01:00:20Z,b,2\n");
+      writer.write("2014-01-01T01:00:20Z,c,3\n");
+      writer.write("2014-01-01T02:00:30Z,a,1\n");
+      writer.write("2014-01-01T02:00:30Z,b,2\n");
+      writer.write("2014-01-01T02:00:30Z,c,3\n");
+    }
+
+    IndexTask indexTask = new IndexTask(
+        null,
+        null,
+        createIngestionSpec(
+            tmpDir,
+            DEFAULT_PARSE_SPEC,
+            new UniformGranularitySpec(
+                Granularities.HOUR,
+                Granularities.MINUTE,
+                null
+            ),
+            IndexTaskTest.createTuningConfig(2, 2, null, 2L, null, null, 
false, false, true),
+            false
+        ),
+        null,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
+
+    return runTask(indexTask);
+  }
+
+  private Pair<TaskStatus, List<DataSegment>> runTask(Task task) throws 
Exception
+  {
+    getLockbox().add(task);
+    getTaskStorage().insert(task, TaskStatus.running(task.getId()));
+    final LocalTaskActionClient actionClient = createActionClient(task);
+
+    final File deepStorageDir = temporaryFolder.newFolder();
+    final ObjectMapper objectMapper = getObjectMapper();
+    objectMapper.registerSubtypes(
+        new NamedType(LocalLoadSpec.class, "local")
+    );
+    objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
+
+    final List<DataSegment> segments = new ArrayList<>();
+    final DataSegmentPusher pusher = new LocalDataSegmentPusher(
+        new LocalDataSegmentPusherConfig()
+        {
+          @Override
+          public File getStorageDirectory()
+          {
+            return deepStorageDir;
+          }
+        },
+        objectMapper
+    )
+    {
+      @Override
+      public DataSegment push(File file, DataSegment segment, boolean 
useUniquePath) throws IOException
+      {
+        segments.add(segment);
+        return super.push(file, segment, useUniquePath);
+      }
+    };
+
+    final SegmentLoader loader = new SegmentLoaderLocalCacheManager(
+        getIndexIO(),
+        new SegmentLoaderConfig() {
+          @Override
+          public List<StorageLocationConfig> getLocations()
+          {
+            return ImmutableList.of(
+                new StorageLocationConfig()
+                {
+                  @Override
+                  public File getPath()
+                  {
+                    return deepStorageDir;
+                  }
+                }
+            );
+          }
+        },
+        objectMapper
+    );
+
+    final TaskToolbox box = new TaskToolbox(
+        null,
+        actionClient,
+        null,
+        pusher,
+        new NoopDataSegmentKiller(),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        loader,
+        objectMapper,
+        temporaryFolder.newFolder(),
+        getIndexIO(),
+        null,
+        null,
+        null,
+        getIndexMerger(),
+        null,
+        null,
+        null,
+        null,
+        new NoopTestTaskFileWriter()
+    );
+
+    if (task.isReady(box.getTaskActionClient())) {
+      TaskStatus status = task.run(box);
+      shutdownTask(task);
+      Collections.sort(segments);
+      return Pair.of(status, segments);
+    } else {
+      throw new ISE("task is not ready");
+    }
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 5f09460..867cdfe 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -54,10 +54,13 @@ import 
org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -84,7 +87,7 @@ import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
@@ -97,6 +100,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -130,26 +134,16 @@ public class CompactionTaskTest
   private static final String DATA_SOURCE = "dataSource";
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
-  private static final Interval COMPACTION_INTERVAL = 
Intervals.of("2017-01-01/2017-06-01");
+  private static final Interval COMPACTION_INTERVAL = 
Intervals.of("2017-01-01/2017-07-01");
   private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
       Intervals.of("2017-01-01/2017-02-01"),
       Intervals.of("2017-02-01/2017-03-01"),
       Intervals.of("2017-03-01/2017-04-01"),
       Intervals.of("2017-04-01/2017-05-01"),
-      Intervals.of("2017-05-01/2017-06-01")
-  );
-  private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = 
ImmutableMap.of(
-      Intervals.of("2017-01-01/2017-02-01"),
-      new StringDimensionSchema(MIXED_TYPE_COLUMN),
-      Intervals.of("2017-02-01/2017-03-01"),
-      new StringDimensionSchema(MIXED_TYPE_COLUMN),
-      Intervals.of("2017-03-01/2017-04-01"),
-      new StringDimensionSchema(MIXED_TYPE_COLUMN),
-      Intervals.of("2017-04-01/2017-05-01"),
-      new StringDimensionSchema(MIXED_TYPE_COLUMN),
       Intervals.of("2017-05-01/2017-06-01"),
-      new DoubleDimensionSchema(MIXED_TYPE_COLUMN)
+      Intervals.of("2017-06-01/2017-07-01")
   );
+  private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = 
new HashMap<>();
   private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
 
   private static Map<String, DimensionSchema> DIMENSIONS;
@@ -166,12 +160,19 @@ public class CompactionTaskTest
   @BeforeClass
   public static void setupClass()
   {
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-01-01/2017-02-01"), new 
StringDimensionSchema(MIXED_TYPE_COLUMN));
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-02-01/2017-03-01"), new 
StringDimensionSchema(MIXED_TYPE_COLUMN));
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-03-01/2017-04-01"), new 
StringDimensionSchema(MIXED_TYPE_COLUMN));
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-04-01/2017-05-01"), new 
StringDimensionSchema(MIXED_TYPE_COLUMN));
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new 
DoubleDimensionSchema(MIXED_TYPE_COLUMN));
+    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new 
DoubleDimensionSchema(MIXED_TYPE_COLUMN));
+
     DIMENSIONS = new HashMap<>();
     AGGREGATORS = new HashMap<>();
 
     DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new 
LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
     DIMENSIONS.put(TIMESTAMP_COLUMN, new 
LongDimensionSchema(TIMESTAMP_COLUMN));
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final StringDimensionSchema schema = new StringDimensionSchema(
           "string_dim_" + i,
           null,
@@ -179,15 +180,15 @@ public class CompactionTaskTest
       );
       DIMENSIONS.put(schema.getName(), schema);
     }
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + 
i);
       DIMENSIONS.put(schema.getName(), schema);
     }
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final FloatDimensionSchema schema = new 
FloatDimensionSchema("float_dim_" + i);
       DIMENSIONS.put(schema.getName(), schema);
     }
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final DoubleDimensionSchema schema = new 
DoubleDimensionSchema("double_dim_" + i);
       DIMENSIONS.put(schema.getName(), schema);
     }
@@ -198,8 +199,8 @@ public class CompactionTaskTest
     AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", 
"float_dim_3"));
     AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", 
"double_dim_4"));
 
-    segmentMap = new HashMap<>(5);
-    for (int i = 0; i < 5; i++) {
+    segmentMap = new HashMap<>(SEGMENT_INTERVALS.size());
+    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final Interval segmentInterval = 
Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
       segmentMap.put(
           new DataSegment(
@@ -254,9 +255,9 @@ public class CompactionTaskTest
   {
     final List<String> dimensions = new ArrayList<>();
     dimensions.add(TIMESTAMP_COLUMN);
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 6; i++) {
       int postfix = i + startIndex;
-      postfix = postfix >= 5 ? postfix - 5 : postfix;
+      postfix = postfix >= 6 ? postfix - 6 : postfix;
       dimensions.add("string_dim_" + postfix);
       dimensions.add("long_dim_" + postfix);
       dimensions.add("float_dim_" + postfix);
@@ -335,6 +336,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -366,6 +368,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -395,6 +398,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -408,11 +412,16 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, Granularities.MONTH);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
Collections.singletonList(COMPACTION_INTERVAL));
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          Granularities.ALL
+      );
     }
   }
 
@@ -420,7 +429,7 @@ public class CompactionTaskTest
   public void testCreateIngestionSchemaWithTargetPartitionSize() throws 
IOException, SegmentLoadingException
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
-        5,
+        6,
         500000,
         1000000L,
         null,
@@ -451,6 +460,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, tuningConfig),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -464,15 +474,22 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          SEGMENT_INTERVALS,
+          tuningConfig,
+          Granularities.MONTH
+      );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
           Collections.singletonList(COMPACTION_INTERVAL),
-          tuningConfig
+          tuningConfig,
+          Granularities.ALL
       );
     }
   }
@@ -484,7 +501,7 @@ public class CompactionTaskTest
         null,
         500000,
         1000000L,
-        5L,
+        6L,
         null,
         null,
         null,
@@ -512,6 +529,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, tuningConfig),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -525,15 +543,22 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          SEGMENT_INTERVALS,
+          tuningConfig,
+          Granularities.MONTH
+      );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
           Collections.singletonList(COMPACTION_INTERVAL),
-          tuningConfig
+          tuningConfig,
+          Granularities.ALL
       );
     }
   }
@@ -573,6 +598,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, tuningConfig),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -586,15 +612,22 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          SEGMENT_INTERVALS,
+          tuningConfig,
+          Granularities.MONTH
+      );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
           Collections.singletonList(COMPACTION_INTERVAL),
-          tuningConfig
+          tuningConfig,
+          Granularities.ALL
       );
     }
   }
@@ -635,6 +668,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         customSpec,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
 
@@ -645,20 +679,22 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
-      IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
+      Assert.assertEquals(6, ingestionSpecs.size());
+      final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(6);
+      IntStream.range(0, 6).forEach(i -> dimensionsSpecs.add(customSpec));
       assertIngestionSchema(
           ingestionSpecs,
           dimensionsSpecs,
-          SEGMENT_INTERVALS
+          SEGMENT_INTERVALS,
+          Granularities.MONTH
       );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           Collections.singletonList(customSpec),
-          Collections.singletonList(COMPACTION_INTERVAL)
+          Collections.singletonList(COMPACTION_INTERVAL),
+          Granularities.ALL
       );
     }
   }
@@ -672,6 +708,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -685,11 +722,16 @@ public class CompactionTaskTest
               s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
           )
       );
-      Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, Granularities.MONTH);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
Collections.singletonList(COMPACTION_INTERVAL));
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          Granularities.ALL
+      );
     }
   }
 
@@ -709,6 +751,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
   }
@@ -728,6 +771,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
+        null,
         objectMapper
     );
   }
@@ -749,6 +793,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         new NoopChatHandlerProvider(),
@@ -760,7 +805,7 @@ public class CompactionTaskTest
   public void testTargetPartitionSizeWithPartitionConfig() throws IOException, 
SegmentLoadingException
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
-        5,
+        6,
         500000,
         1000000L,
         null,
@@ -786,15 +831,133 @@ public class CompactionTaskTest
         null
     );
     expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be 
used with");
+    expectedException.expectMessage("targetCompactionSizeBytes[6] cannot be 
used with");
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
-        new PartitionConfigurationManager(5L, tuningConfig),
+        new PartitionConfigurationManager(6L, tuningConfig),
         null,
         keepSegmentGranularity,
+        null,
+        objectMapper
+    );
+  }
+
+  @Test
+  public void testSegmentGranularity() throws IOException, 
SegmentLoadingException
+  {
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
+        null,
+        null,
+        new PeriodGranularity(Period.months(3), null, null),
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+        new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
+    );
+
+    ingestionSpecs.sort(
+        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+        )
+    );
+    Assert.assertEquals(1, ingestionSpecs.size());
+    assertIngestionSchema(
+        ingestionSpecs,
+        expectedDimensionsSpec,
+        Collections.singletonList(COMPACTION_INTERVAL),
+        new PeriodGranularity(Period.months(3), null, null)
+    );
+  }
+
+  @Test
+  public void testSegmentGranularityWithFalseKeepSegmentGranularity() throws 
IOException, SegmentLoadingException
+  {
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
+        null,
+        false,
+        new PeriodGranularity(Period.months(3), null, null),
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
+        new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
+    );
+
+    ingestionSpecs.sort(
+        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+        )
+    );
+    Assert.assertEquals(1, ingestionSpecs.size());
+    assertIngestionSchema(
+        ingestionSpecs,
+        expectedDimensionsSpec,
+        Collections.singletonList(COMPACTION_INTERVAL),
+        new PeriodGranularity(Period.months(3), null, null)
+    );
+  }
+
+  @Test
+  public void testNullSegmentGranularityAndNullKeepSegmentGranularity() throws 
IOException, SegmentLoadingException
+  {
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
+        null,
+        null,
+        null,
         objectMapper
     );
+    final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
+        true
+    );
+
+    ingestionSpecs.sort(
+        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+        )
+    );
+    Assert.assertEquals(6, ingestionSpecs.size());
+    assertIngestionSchema(
+        ingestionSpecs,
+        expectedDimensionsSpec,
+        SEGMENT_INTERVALS,
+        Granularities.MONTH
+    );
+  }
+
+  @Test
+  public void testUseKeepSegmentGranularityAndSegmentGranularityTogether()
+  {
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("keepSegmentGranularity and 
segmentGranularity can't be used together");
+    final CompactionTask task = new CompactionTask(
+        null,
+        null,
+        DATA_SOURCE,
+        COMPACTION_INTERVAL,
+        null,
+        null,
+        true,
+        Granularities.YEAR,
+        null,
+        createTuningConfig(),
+        ImmutableMap.of("testKey", "testContext"),
+        objectMapper,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        null,
+        rowIngestionMetersFactory
+    );
   }
 
   private static List<DimensionsSpec> 
getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
@@ -805,6 +968,7 @@ public class CompactionTaskTest
           new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double"))),
           new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double"))),
           new DimensionsSpec(getDimensionSchema(new 
StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double"))),
           new DimensionsSpec(getDimensionSchema(new 
DoubleDimensionSchema("string_to_double")))
       );
     } else {
@@ -838,6 +1002,10 @@ public class CompactionTaskTest
         new LongDimensionSchema("long_dim_3"),
         new FloatDimensionSchema("float_dim_3"),
         new DoubleDimensionSchema("double_dim_3"),
+        new StringDimensionSchema("string_dim_5"),
+        new LongDimensionSchema("long_dim_5"),
+        new FloatDimensionSchema("float_dim_5"),
+        new DoubleDimensionSchema("double_dim_5"),
         mixedTypeColumn
     );
   }
@@ -845,7 +1013,8 @@ public class CompactionTaskTest
   private static void assertIngestionSchema(
       List<IndexIngestionSpec> ingestionSchemas,
       List<DimensionsSpec> expectedDimensionsSpecs,
-      List<Interval> expectedSegmentIntervals
+      List<Interval> expectedSegmentIntervals,
+      Granularity expectedSegmentGranularity
   )
   {
     assertIngestionSchema(
@@ -877,7 +1046,8 @@ public class CompactionTaskTest
             null,
             null,
             null
-        )
+        ),
+        expectedSegmentGranularity
     );
   }
 
@@ -885,7 +1055,8 @@ public class CompactionTaskTest
       List<IndexIngestionSpec> ingestionSchemas,
       List<DimensionsSpec> expectedDimensionsSpecs,
       List<Interval> expectedSegmentIntervals,
-      IndexTuningConfig expectedTuningConfig
+      IndexTuningConfig expectedTuningConfig,
+      Granularity expectedSegmentGranularity
   )
   {
     Preconditions.checkArgument(
@@ -917,7 +1088,8 @@ public class CompactionTaskTest
                                                                     
.collect(Collectors.toSet());
       Assert.assertEquals(expectedAggregators, new 
HashSet<>(Arrays.asList(dataSchema.getAggregators())));
       Assert.assertEquals(
-          new ArbitraryGranularitySpec(
+          new UniformGranularitySpec(
+              expectedSegmentGranularity,
               Granularities.NONE,
               false,
               Collections.singletonList(expectedSegmentIntervals.get(i))
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index a9616bf..b07759e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -1739,7 +1739,7 @@ public class IndexTaskTest
     );
   }
 
-  private static IndexTuningConfig createTuningConfig(
+  static IndexTuningConfig createTuningConfig(
       @Nullable Integer targetPartitionSize,
       @Nullable Integer maxRowsInMemory,
       @Nullable Long maxBytesInMemory,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index f707b86..a9d68b5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -20,40 +20,70 @@
 package org.apache.druid.indexing.common.task;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionToolbox;
 import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import org.apache.druid.metadata.SQLMetadataConnector;
 import org.apache.druid.metadata.TestDerbyConnector;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.junit.Before;
 import org.junit.Rule;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.Map;
+
 public abstract class IngestionTestBase
 {
+  public static final String DATA_SOURCE = "test";
+
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new 
TestDerbyConnector.DerbyConnectorRule();
 
   private final TestUtils testUtils = new TestUtils();
   private final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
-  private final TaskStorage taskStorage = new HeapMemoryTaskStorage(new 
TaskStorageConfig(null));
-  private final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+  private TaskStorage taskStorage;
+  private IndexerSQLMetadataStorageCoordinator storageCoordinator;
+  private TaskLockbox lockbox;
 
-  public IngestionTestBase()
+  @Before
+  public void setUp()
   {
+    final SQLMetadataConnector connector = derbyConnectorRule.getConnector();
+    connector.createTaskTables();
+    taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
+    storageCoordinator = new IndexerSQLMetadataStorageCoordinator(
+        objectMapper,
+        derbyConnectorRule.metadataTablesConfigSupplier().get(),
+        derbyConnectorRule.getConnector()
+    );
+    lockbox = new TaskLockbox(taskStorage);
   }
 
-  public TaskActionClient createActionClient(Task task)
+  public LocalTaskActionClient createActionClient(Task task)
   {
     return new LocalTaskActionClient(task, taskStorage, 
createTaskActionToolbox(), new TaskAuditLogConfig(false));
   }
@@ -64,6 +94,11 @@ public abstract class IngestionTestBase
     taskStorage.insert(task, TaskStatus.running(task.getId()));
   }
 
+  public void shutdownTask(Task task)
+  {
+    lockbox.remove(task);
+  }
+
   public ObjectMapper getObjectMapper()
   {
     return objectMapper;
@@ -81,11 +116,6 @@ public abstract class IngestionTestBase
 
   public TaskActionToolbox createTaskActionToolbox()
   {
-    final IndexerSQLMetadataStorageCoordinator storageCoordinator = new 
IndexerSQLMetadataStorageCoordinator(
-        objectMapper,
-        derbyConnectorRule.metadataTablesConfigSupplier().get(),
-        derbyConnectorRule.getConnector()
-    );
     storageCoordinator.start();
     return new TaskActionToolbox(
         lockbox,
@@ -106,4 +136,54 @@ public abstract class IngestionTestBase
   {
     return testUtils.getTestIndexMergerV9();
   }
+
+  public IndexTask.IndexIngestionSpec createIngestionSpec(
+      File baseDir,
+      ParseSpec parseSpec,
+      GranularitySpec granularitySpec,
+      IndexTuningConfig tuningConfig,
+      boolean appendToExisting
+  )
+  {
+    return createIngestionSpec(baseDir, parseSpec, TransformSpec.NONE, 
granularitySpec, tuningConfig, appendToExisting);
+  }
+
+  public IndexTask.IndexIngestionSpec createIngestionSpec(
+      File baseDir,
+      ParseSpec parseSpec,
+      TransformSpec transformSpec,
+      GranularitySpec granularitySpec,
+      IndexTuningConfig tuningConfig,
+      boolean appendToExisting
+  )
+  {
+    return new IndexTask.IndexIngestionSpec(
+        new DataSchema(
+            DATA_SOURCE,
+            objectMapper.convertValue(
+                new StringInputRowParser(parseSpec, null),
+                Map.class
+            ),
+            new AggregatorFactory[]{
+                new LongSumAggregatorFactory("val", "val")
+            },
+            granularitySpec != null ? granularitySpec : new 
UniformGranularitySpec(
+                Granularities.DAY,
+                Granularities.MINUTE,
+                Collections.singletonList(Intervals.of("2014/2015"))
+            ),
+            transformSpec,
+            objectMapper
+        ),
+        new IndexTask.IndexIOConfig(
+            new LocalFirehoseFactory(
+                baseDir,
+                "druid*",
+                null
+            ),
+            appendToExisting
+        ),
+        tuningConfig
+    );
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
index c73b1b7..fae680a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
@@ -143,7 +143,7 @@ public class UniformGranularitySpec implements 
GranularitySpec
 
     UniformGranularitySpec that = (UniformGranularitySpec) o;
 
-    if (segmentGranularity != that.segmentGranularity) {
+    if (!segmentGranularity.equals(that.segmentGranularity)) {
       return false;
     }
     if (!queryGranularity.equals(that.queryGranularity)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to