This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 122caec  Add support targetCompactionSizeBytes for compactionTask 
(#6203)
122caec is described below

commit 122caec7b187820a0a7fc89e85eb8216dd57df21
Author: Jihoon Son <[email protected]>
AuthorDate: Fri Sep 28 11:16:35 2018 -0700

    Add support targetCompactionSizeBytes for compactionTask (#6203)
    
    * Add support targetCompactionSizeBytes for compactionTask
    
    * fix test
    
    * fix a bug in keepSegmentGranularity
    
    * fix wrong noinspection comment
    
    * address comments
---
 .../druid/timeline/partition/PartitionHolder.java  |   7 +
 .../druid/indexing/common/task/CompactionTask.java | 205 +++++++++--
 .../druid/indexing/common/task/IndexTask.java      | 182 +++++++---
 .../task/batch/parallel/ParallelIndexSubTask.java  |  20 +-
 .../indexing/common/task/CompactionTaskTest.java   | 387 +++++++++++++++++++--
 .../druid/indexing/common/task/TaskSerdeTest.java  |  23 +-
 .../coordinator/DataSourceCompactionConfig.java    |   3 +-
 7 files changed, 693 insertions(+), 134 deletions(-)

diff --git 
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java 
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 1ed0ae6..5e5f676 100644
--- 
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ 
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
 import java.util.Iterator;
 import java.util.List;
 import java.util.SortedSet;
+import java.util.Spliterator;
 import java.util.TreeSet;
 
 /**
@@ -130,6 +131,12 @@ public class PartitionHolder<T> implements 
Iterable<PartitionChunk<T>>
     return holderSet.iterator();
   }
 
+  @Override
+  public Spliterator<PartitionChunk<T>> spliterator()
+  {
+    return holderSet.spliterator();
+  }
+
   public Iterable<T> payloads()
   {
     return Iterables.transform(this, PartitionChunk::getObject);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index ea1186a..cce1a74 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -68,6 +68,7 @@ import 
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -86,6 +87,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -101,13 +103,15 @@ public class CompactionTask extends AbstractTask
   private final List<DataSegment> segments;
   private final DimensionsSpec dimensionsSpec;
   private final boolean keepSegmentGranularity;
+  @Nullable
+  private final Long targetCompactionSizeBytes;
+  @Nullable
   private final IndexTuningConfig tuningConfig;
   private final ObjectMapper jsonMapper;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
-
   @JsonIgnore
-  private List<IndexTask> indexTaskSpecs;
+  private final PartitionConfigurationManager partitionConfigurationManager;
 
   @JsonIgnore
   private final AuthorizerMapper authorizerMapper;
@@ -118,6 +122,9 @@ public class CompactionTask extends AbstractTask
   @JsonIgnore
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
 
+  @JsonIgnore
+  private List<IndexTask> indexTaskSpecs;
+
   @JsonCreator
   public CompactionTask(
       @JsonProperty("id") final String id,
@@ -127,6 +134,7 @@ public class CompactionTask extends AbstractTask
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @Nullable @JsonProperty("dimensions") final DimensionsSpec 
dimensionsSpec,
       @Nullable @JsonProperty("keepSegmentGranularity") final Boolean 
keepSegmentGranularity,
+      @Nullable @JsonProperty("targetCompactionSizeBytes") final Long 
targetCompactionSizeBytes,
       @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig 
tuningConfig,
       @Nullable @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject ObjectMapper jsonMapper,
@@ -149,9 +157,11 @@ public class CompactionTask extends AbstractTask
     this.keepSegmentGranularity = keepSegmentGranularity == null
                                   ? DEFAULT_KEEP_SEGMENT_GRANULARITY
                                   : keepSegmentGranularity;
+    this.targetCompactionSizeBytes = targetCompactionSizeBytes;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
     this.segmentProvider = segments == null ? new SegmentProvider(dataSource, 
interval) : new SegmentProvider(segments);
+    this.partitionConfigurationManager = new 
PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@@ -181,6 +191,14 @@ public class CompactionTask extends AbstractTask
     return keepSegmentGranularity;
   }
 
+  @Nullable
+  @JsonProperty
+  public Long getTargetCompactionSizeBytes()
+  {
+    return targetCompactionSizeBytes;
+  }
+
+  @Nullable
   @JsonProperty
   public IndexTuningConfig getTuningConfig()
   {
@@ -220,9 +238,9 @@ public class CompactionTask extends AbstractTask
       indexTaskSpecs = createIngestionSchema(
           toolbox,
           segmentProvider,
+          partitionConfigurationManager,
           dimensionsSpec,
           keepSegmentGranularity,
-          tuningConfig,
           jsonMapper
       ).stream()
       .map(spec -> new IndexTask(
@@ -271,12 +289,12 @@ public class CompactionTask extends AbstractTask
    */
   @VisibleForTesting
   static List<IndexIngestionSpec> createIngestionSchema(
-      TaskToolbox toolbox,
-      SegmentProvider segmentProvider,
-      DimensionsSpec dimensionsSpec,
-      boolean keepSegmentGranularity,
-      IndexTuningConfig tuningConfig,
-      ObjectMapper jsonMapper
+      final TaskToolbox toolbox,
+      final SegmentProvider segmentProvider,
+      final PartitionConfigurationManager partitionConfigurationManager,
+      final DimensionsSpec dimensionsSpec,
+      final boolean keepSegmentGranularity,
+      final ObjectMapper jsonMapper
   ) throws IOException, SegmentLoadingException
   {
     Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, 
DataSegment>>> pair = prepareSegments(
@@ -290,26 +308,52 @@ public class CompactionTask extends AbstractTask
       return Collections.emptyList();
     }
 
+    // find metadata for interval
+    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = 
loadSegments(
+        timelineSegments,
+        segmentFileMap,
+        toolbox.getIndexIO()
+    );
+
+    final IndexTuningConfig compactionTuningConfig = 
partitionConfigurationManager.computeTuningConfig(
+        queryableIndexAndSegments
+    );
+
     if (keepSegmentGranularity) {
-      // if keepSegmentGranularity = true, create indexIngestionSpec per 
segment interval, so that we can run an index
+      // If keepSegmentGranularity = true, create indexIngestionSpec per 
segment interval, so that we can run an index
       // task per segment interval.
-      final List<IndexIngestionSpec> specs = new 
ArrayList<>(timelineSegments.size());
-      for (TimelineObjectHolder<String, DataSegment> holder : 
timelineSegments) {
+
+      //noinspection unchecked,ConstantConditions
+      final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> 
intervalToSegments = queryableIndexAndSegments
+          .stream()
+          .collect(
+              Collectors.toMap(
+                  // rhs can't be null here so we skip null checking and 
supress the warning with the above comment
+                  p -> p.rhs.getInterval(),
+                  Lists::newArrayList,
+                  (l1, l2) -> {
+                    l1.addAll(l2);
+                    return l1;
+                  }
+              )
+          );
+      final List<IndexIngestionSpec> specs = new 
ArrayList<>(intervalToSegments.size());
+      for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : 
intervalToSegments.entrySet()) {
+        final Interval interval = entry.getKey();
+        final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = 
entry.getValue();
         final DataSchema dataSchema = createDataSchema(
             segmentProvider.dataSource,
-            holder.getInterval(),
-            Collections.singletonList(holder),
+            interval,
+            segmentsToCompact,
             dimensionsSpec,
-            toolbox.getIndexIO(),
-            jsonMapper,
-            segmentFileMap
+            jsonMapper
         );
 
         specs.add(
             new IndexIngestionSpec(
                 dataSchema,
-                createIoConfig(toolbox, dataSchema, holder.getInterval()),
-                tuningConfig
+                createIoConfig(toolbox, dataSchema, interval),
+                compactionTuningConfig
             )
         );
       }
@@ -319,18 +363,16 @@ public class CompactionTask extends AbstractTask
       final DataSchema dataSchema = createDataSchema(
           segmentProvider.dataSource,
           segmentProvider.interval,
-          timelineSegments,
+          queryableIndexAndSegments,
           dimensionsSpec,
-          toolbox.getIndexIO(),
-          jsonMapper,
-          segmentFileMap
+          jsonMapper
       );
 
       return Collections.singletonList(
           new IndexIngestionSpec(
               dataSchema,
               createIoConfig(toolbox, dataSchema, segmentProvider.interval),
-              tuningConfig
+              compactionTuningConfig
           )
       );
     }
@@ -368,21 +410,11 @@ public class CompactionTask extends AbstractTask
   private static DataSchema createDataSchema(
       String dataSource,
       Interval totalInterval,
-      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
       DimensionsSpec dimensionsSpec,
-      IndexIO indexIO,
-      ObjectMapper jsonMapper,
-      Map<DataSegment, File> segmentFileMap
+      ObjectMapper jsonMapper
   )
-      throws IOException
   {
-    // find metadata for interval
-    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = 
loadSegments(
-        timelineObjectHolder,
-        segmentFileMap,
-        indexIO
-    );
-
     // find merged aggregators
     for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
       final QueryableIndex index = pair.lhs;
@@ -621,4 +653,105 @@ public class CompactionTask extends AbstractTask
       return usedSegments;
     }
   }
+
+  @VisibleForTesting
+  static class PartitionConfigurationManager
+  {
+    @Nullable
+    private final Long targetCompactionSizeBytes;
+    @Nullable
+    private final IndexTuningConfig tuningConfig;
+
+    PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, 
@Nullable IndexTuningConfig tuningConfig)
+    {
+      this.targetCompactionSizeBytes = 
getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig);
+      this.tuningConfig = tuningConfig;
+    }
+
+    @Nullable
+    IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex, 
DataSegment>> queryableIndexAndSegments)
+    {
+      if (!hasPartitionConfig(tuningConfig)) {
+        final long nonNullTargetCompactionSizeBytes = 
Preconditions.checkNotNull(
+            targetCompactionSizeBytes,
+            "targetCompactionSizeBytes"
+        );
+        // Find IndexTuningConfig.targetPartitionSize which is the number of 
rows per segment.
+        // Assume that the segment size is proportional to the number of rows. 
We can improve this later.
+        final long totalNumRows = queryableIndexAndSegments
+            .stream()
+            .mapToLong(queryableIndexAndDataSegment -> 
queryableIndexAndDataSegment.lhs.getNumRows())
+            .sum();
+        final long totalSizeBytes = queryableIndexAndSegments
+            .stream()
+            .mapToLong(queryableIndexAndDataSegment -> 
queryableIndexAndDataSegment.rhs.getSize())
+            .sum();
+
+        if (totalSizeBytes == 0L) {
+          throw new ISE("Total input segment size is 0 byte");
+        }
+
+        final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
+        final int targetPartitionSize = 
Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
+        Preconditions.checkState(targetPartitionSize > 0, "Negative 
targetPartitionSize[%s]", targetPartitionSize);
+
+        log.info(
+            "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * 
targetCompactionSizeBytes[%d]",
+            targetPartitionSize,
+            avgRowsPerByte,
+            nonNullTargetCompactionSizeBytes
+        );
+        return (tuningConfig == null ? IndexTuningConfig.createDefault() : 
tuningConfig)
+            .withTargetPartitionSize(targetPartitionSize);
+      } else {
+        return tuningConfig;
+      }
+    }
+
+    /**
+     * Check the validity of {@link #targetCompactionSizeBytes} and return a 
valid value. Note that
+     * targetCompactionSizeBytes cannot be used with {@link 
IndexTuningConfig#targetPartitionSize},
+     * {@link IndexTuningConfig#maxTotalRows}, or {@link 
IndexTuningConfig#numShards} together.
+     * {@link #hasPartitionConfig} checks one of those configs is set.
+     *
+     * This throws an {@link IllegalArgumentException} if 
targetCompactionSizeBytes is set and hasPartitionConfig
+     * returns true. If targetCompactionSizeBytes is not set, this returns 
null or
+     * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} 
according to the result of
+     * hasPartitionConfig.
+     */
+    @Nullable
+    private static Long getValidTargetCompactionSizeBytes(
+        @Nullable Long targetCompactionSizeBytes,
+        @Nullable IndexTuningConfig tuningConfig
+    )
+    {
+      if (targetCompactionSizeBytes != null) {
+        Preconditions.checkArgument(
+            !hasPartitionConfig(tuningConfig),
+            "targetCompactionSizeBytes[%s] cannot be used with 
targetPartitionSize[%s], maxTotalRows[%s],"
+            + " or numShards[%s] of tuningConfig",
+            targetCompactionSizeBytes,
+            tuningConfig == null ? null : 
tuningConfig.getTargetPartitionSize(),
+            tuningConfig == null ? null : tuningConfig.getMaxTotalRows(),
+            tuningConfig == null ? null : tuningConfig.getNumShards()
+        );
+        return targetCompactionSizeBytes;
+      } else {
+        return hasPartitionConfig(tuningConfig)
+               ? null
+               : 
DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+      }
+    }
+
+    private static boolean hasPartitionConfig(@Nullable IndexTuningConfig 
tuningConfig)
+    {
+      if (tuningConfig != null) {
+        return tuningConfig.getTargetPartitionSize() != null
+               || tuningConfig.getMaxTotalRows() != null
+               || tuningConfig.getNumShards() != null;
+      } else {
+        return false;
+      }
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5d6bef1..f00cd0a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -427,7 +427,12 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       FileUtils.forceMkdir(firehoseTempDir);
 
       ingestionState = IngestionState.DETERMINE_PARTITIONS;
-      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, 
firehoseFactory, firehoseTempDir);
+
+      // Initialize maxRowsPerSegment and maxTotalRows lazily
+      final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
+      @Nullable final Integer targetPartitionSize = 
getValidTargetPartitionSize(tuningConfig);
+      @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
+      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, 
firehoseFactory, firehoseTempDir, targetPartitionSize);
       final DataSchema dataSchema;
       final Map<Interval, String> versions;
       if (determineIntervals) {
@@ -457,7 +462,16 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       }
 
       ingestionState = IngestionState.BUILD_SEGMENTS;
-      return generateAndPublishSegments(toolbox, dataSchema, shardSpecs, 
versions, firehoseFactory, firehoseTempDir);
+      return generateAndPublishSegments(
+          toolbox,
+          dataSchema,
+          shardSpecs,
+          versions,
+          firehoseFactory,
+          firehoseTempDir,
+          targetPartitionSize,
+          maxTotalRows
+      );
     }
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
@@ -583,7 +597,8 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
   private ShardSpecs determineShardSpecs(
       final TaskToolbox toolbox,
       final FirehoseFactory firehoseFactory,
-      final File firehoseTempDir
+      final File firehoseTempDir,
+      @Nullable final Integer targetPartitionSize
   ) throws IOException
   {
     final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@@ -618,7 +633,8 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
           granularitySpec,
           tuningConfig,
           determineIntervals,
-          determineNumPartitions
+          determineNumPartitions,
+          targetPartitionSize
       );
     }
   }
@@ -666,7 +682,8 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       GranularitySpec granularitySpec,
       IndexTuningConfig tuningConfig,
       boolean determineIntervals,
-      boolean determineNumPartitions
+      boolean determineNumPartitions,
+      @Nullable Integer targetPartitionSize
   ) throws IOException
   {
     log.info("Determining intervals and shardSpecs");
@@ -690,8 +707,10 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
 
       final int numShards;
       if (determineNumPartitions) {
-        final long numRows = collector.estimateCardinalityRound();
-        numShards = (int) Math.ceil((double) numRows / 
tuningConfig.getTargetPartitionSize());
+        final long numRows = Preconditions.checkNotNull(collector, "HLL 
collector").estimateCardinalityRound();
+        numShards = (int) Math.ceil(
+            (double) numRows / Preconditions.checkNotNull(targetPartitionSize, 
"targetPartitionSize")
+        );
         log.info("Estimated [%,d] rows of data for interval [%s], creating 
[%,d] shards", numRows, interval, numShards);
       } else {
         numShards = defaultNumShards;
@@ -856,7 +875,9 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       final ShardSpecs shardSpecs,
       final Map<Interval, String> versions,
       final FirehoseFactory firehoseFactory,
-      final File firehoseTempDir
+      final File firehoseTempDir,
+      @Nullable final Integer targetPartitionSize,
+      @Nullable final Long maxTotalRows
   ) throws IOException, InterruptedException
   {
     final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
@@ -1004,8 +1025,8 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
           if (addResult.isOk()) {
             // incremental segment publishment is allowed only when rollup 
don't have to be perfect.
             if (!isGuaranteedRollup &&
-                (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), 
tuningConfig) ||
-                 
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), 
tuningConfig))) {
+                (exceedMaxRowsInSegment(targetPartitionSize, 
addResult.getNumRowsInSegment()) ||
+                 exceedMaxRowsInAppenderator(maxTotalRows, 
addResult.getTotalNumRowsInAppenderator()))) {
               // There can be some segments waiting for being published even 
though any rows won't be added to them.
               // If those segments are not published here, the available space 
in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -1069,6 +1090,40 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
     }
   }
 
+  /**
+   * Return the valid target partition size. If {@link 
IndexTuningConfig#numShards} is valid, this returns null.
+   * Otherwise, this returns {@link 
IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given
+   * {@link IndexTuningConfig#targetPartitionSize}.
+   */
+  public static Integer getValidTargetPartitionSize(IndexTuningConfig 
tuningConfig)
+  {
+    @Nullable final Integer numShards = tuningConfig.numShards;
+    @Nullable final Integer targetPartitionSize = 
tuningConfig.targetPartitionSize;
+    if (numShards == null || numShards == -1) {
+      return targetPartitionSize == null || targetPartitionSize.equals(-1)
+             ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE
+             : targetPartitionSize;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Return the valid target partition size. If {@link 
IndexTuningConfig#numShards} is valid, this returns null.
+   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} 
or the given
+   * {@link IndexTuningConfig#maxTotalRows}.
+   */
+  public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig)
+  {
+    @Nullable final Integer numShards = tuningConfig.numShards;
+    @Nullable final Long maxTotalRows = tuningConfig.maxTotalRows;
+    if (numShards == null || numShards == -1) {
+      return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : 
maxTotalRows;
+    } else {
+      return null;
+    }
+  }
+
   private void handleParseException(ParseException e)
   {
     if (e.isFromPartiallyValidRow()) {
@@ -1092,17 +1147,20 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
     }
   }
 
-  private static boolean exceedMaxRowsInSegment(int numRowsInSegment, 
IndexTuningConfig indexTuningConfig)
+  private static boolean exceedMaxRowsInSegment(
+      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if 
numShards is set in indexTuningConfig
+      int numRowsInSegment
+  )
   {
-    // maxRowsInSegment should be null if numShards is set in indexTuningConfig
-    final Integer maxRowsInSegment = 
indexTuningConfig.getTargetPartitionSize();
     return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
   }
 
-  private static boolean exceedMaxRowsInAppenderator(long 
numRowsInAppenderator, IndexTuningConfig indexTuningConfig)
+  private static boolean exceedMaxRowsInAppenderator(
+      // maxRowsInAppenderator can be null if numShards is set in 
indexTuningConfig
+      @Nullable final Long maxRowsInAppenderator,
+      long numRowsInAppenderator
+  )
   {
-    // maxRowsInAppenderator should be null if numShards is set in 
indexTuningConfig
-    final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
     return maxRowsInAppenderator != null && maxRowsInAppenderator <= 
numRowsInAppenderator;
   }
 
@@ -1271,7 +1329,9 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
   @JsonTypeName("index")
   public static class IndexTuningConfig implements TuningConfig, 
AppenderatorConfig
   {
-    private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+    static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000;
+    static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
     private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
     private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
@@ -1279,12 +1339,13 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
     private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
-    static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
-
+    @Nullable
     private final Integer targetPartitionSize;
     private final int maxRowsInMemory;
     private final long maxBytesInMemory;
+    @Nullable
     private final Long maxTotalRows;
+    @Nullable
     private final Integer numShards;
     private final IndexSpec indexSpec;
     private final File basePersistDirectory;
@@ -1312,6 +1373,11 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
     @Nullable
     private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
 
+    public static IndexTuningConfig createDefault()
+    {
+      return new IndexTuningConfig();
+    }
+
     @JsonCreator
     public IndexTuningConfig(
         @JsonProperty("targetPartitionSize") @Nullable Integer 
targetPartitionSize,
@@ -1385,12 +1451,14 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
           "targetPartitionSize and numShards cannot both be set"
       );
 
-      this.targetPartitionSize = initializeTargetPartitionSize(numShards, 
targetPartitionSize);
+      this.targetPartitionSize = (targetPartitionSize != null && 
targetPartitionSize == -1)
+                                 ? null
+                                 : targetPartitionSize;
       this.maxRowsInMemory = maxRowsInMemory == null ? 
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see 
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
-      this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows);
+      this.maxTotalRows = maxTotalRows;
       this.numShards = numShards == null || numShards.equals(-1) ? null : 
numShards;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
       this.maxPendingPersists = maxPendingPersists == null ? 
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
@@ -1422,27 +1490,29 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
                                 : logParseExceptions;
     }
 
-    private static Integer initializeTargetPartitionSize(Integer numShards, 
Integer targetPartitionSize)
-    {
-      if (numShards == null || numShards == -1) {
-        return targetPartitionSize == null || targetPartitionSize.equals(-1)
-               ? DEFAULT_TARGET_PARTITION_SIZE
-               : targetPartitionSize;
-      } else {
-        return null;
-      }
-    }
-
-    private static Long initializeMaxTotalRows(Integer numShards, Long 
maxTotalRows)
+    public IndexTuningConfig withBasePersistDirectory(File dir)
     {
-      if (numShards == null || numShards == -1) {
-        return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
-      } else {
-        return null;
-      }
+      return new IndexTuningConfig(
+          targetPartitionSize,
+          maxRowsInMemory,
+          maxBytesInMemory,
+          maxTotalRows,
+          numShards,
+          indexSpec,
+          maxPendingPersists,
+          forceExtendableShardSpecs,
+          forceGuaranteedRollup,
+          reportParseExceptions,
+          pushTimeout,
+          dir,
+          segmentWriteOutMediumFactory,
+          logParseExceptions,
+          maxParseExceptions,
+          maxSavedParseExceptions
+      );
     }
 
-    public IndexTuningConfig withBasePersistDirectory(File dir)
+    public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize)
     {
       return new IndexTuningConfig(
           targetPartitionSize,
@@ -1456,7 +1526,7 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout,
-          dir,
+          basePersistDirectory,
           segmentWriteOutMediumFactory,
           logParseExceptions,
           maxParseExceptions,
@@ -1464,6 +1534,11 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       );
     }
 
+    /**
+     * Return the target number of rows per segment. This returns null if it's 
not specified in tuningConfig.
+     * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get 
the valid value.
+     */
+    @Nullable
     @JsonProperty
     public Integer getTargetPartitionSize()
     {
@@ -1484,6 +1559,10 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
       return maxBytesInMemory;
     }
 
+    /**
+     * Return the max number of total rows in appenderator. This returns null 
if it's not specified in tuningConfig.
+     * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the 
valid value.
+     */
     @JsonProperty
     @Override
     @Nullable
@@ -1633,5 +1712,28 @@ public class IndexTask extends AbstractTask implements 
ChatHandler
           maxSavedParseExceptions
       );
     }
+
+    @Override
+    public String toString()
+    {
+      return "IndexTuningConfig{" +
+             "targetPartitionSize=" + targetPartitionSize +
+             ", maxRowsInMemory=" + maxRowsInMemory +
+             ", maxBytesInMemory=" + maxBytesInMemory +
+             ", maxTotalRows=" + maxTotalRows +
+             ", numShards=" + numShards +
+             ", indexSpec=" + indexSpec +
+             ", basePersistDirectory=" + basePersistDirectory +
+             ", maxPendingPersists=" + maxPendingPersists +
+             ", forceExtendableShardSpecs=" + forceExtendableShardSpecs +
+             ", forceGuaranteedRollup=" + forceGuaranteedRollup +
+             ", reportParseExceptions=" + reportParseExceptions +
+             ", pushTimeout=" + pushTimeout +
+             ", logParseExceptions=" + logParseExceptions +
+             ", maxParseExceptions=" + maxParseExceptions +
+             ", maxSavedParseExceptions=" + maxSavedParseExceptions +
+             ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
+             '}';
+    }
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 46cf03e..20871af 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -303,7 +303,10 @@ public class ParallelIndexSubTask extends AbstractTask
       );
     }
 
+    // Initialize maxRowsPerSegment and maxTotalRows lazily
     final ParallelIndexTuningConfig tuningConfig = 
ingestionSchema.getTuningConfig();
+    @Nullable final Integer targetPartitionSize = 
IndexTask.getValidTargetPartitionSize(tuningConfig);
+    @Nullable final Long maxTotalRows = 
IndexTask.getValidMaxTotalRows(tuningConfig);
     final long pushTimeout = tuningConfig.getPushTimeout();
     final boolean explicitIntervals = 
granularitySpec.bucketIntervals().isPresent();
     final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, 
taskClient, ingestionSchema);
@@ -348,8 +351,8 @@ public class ParallelIndexSubTask extends AbstractTask
           final AppenderatorDriverAddResult addResult = driver.add(inputRow, 
sequenceName);
 
           if (addResult.isOk()) {
-            if (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), 
tuningConfig) ||
-                
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), 
tuningConfig)) {
+            if (exceedMaxRowsInSegment(targetPartitionSize, 
addResult.getNumRowsInSegment()) ||
+                exceedMaxRowsInAppenderator(maxTotalRows, 
addResult.getTotalNumRowsInAppenderator())) {
               // There can be some segments waiting for being published even 
though any rows won't be added to them.
               // If those segments are not published here, the available space 
in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -384,22 +387,19 @@ public class ParallelIndexSubTask extends AbstractTask
   }
 
   private static boolean exceedMaxRowsInSegment(
-      int numRowsInSegment,
-      ParallelIndexTuningConfig indexTuningConfig
+      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if 
numShards is set in indexTuningConfig
+      int numRowsInSegment
   )
   {
-    // maxRowsInSegment should be null if numShards is set in indexTuningConfig
-    final Integer maxRowsInSegment = 
indexTuningConfig.getTargetPartitionSize();
     return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
   }
 
   private static boolean exceedMaxRowsInAppenderator(
-      long numRowsInAppenderator,
-      ParallelIndexTuningConfig indexTuningConfig
+      // maxRowsInAppenderator can be null if numShards is set in 
indexTuningConfig
+      @Nullable Long maxRowsInAppenderator,
+      long numRowsInAppenderator
   )
   {
-    // maxRowsInAppenderator should be null if numShards is set in 
indexTuningConfig
-    final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
     return maxRowsInAppenderator != null && maxRowsInAppenderator <= 
numRowsInAppenderator;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 381b51c..5cd5f1f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -28,8 +28,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.inject.Binder;
-import com.google.inject.Module;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -49,6 +47,7 @@ import 
org.apache.druid.indexing.common.actions.SegmentListUsedAction;
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import 
org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
 import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
@@ -59,6 +58,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@@ -71,8 +71,14 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.SimpleQueryableIndex;
+import org.apache.druid.segment.column.BitmapIndex;
 import org.apache.druid.segment.column.Column;
-import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.GenericColumn;
+import org.apache.druid.segment.column.SpatialIndex;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
@@ -84,6 +90,7 @@ import 
org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import org.apache.druid.segment.transform.TransformingInputRowParser;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.AuthTestUtils;
@@ -120,6 +127,8 @@ import java.util.stream.IntStream;
 @RunWith(Parameterized.class)
 public class CompactionTaskTest
 {
+  private static final long SEGMENT_SIZE_BYTES = 100;
+  private static final int NUM_ROWS_PER_SEGMENT = 10;
   private static final String DATA_SOURCE = "dataSource";
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
@@ -204,7 +213,7 @@ public class CompactionTaskTest
               new ArrayList<>(AGGREGATORS.keySet()),
               new NumberedShardSpec(0, 1),
               0,
-              1
+              SEGMENT_SIZE_BYTES
           ),
           new File("file_" + i)
       );
@@ -225,16 +234,11 @@ public class CompactionTaskTest
     );
     GuiceInjectableValues injectableValues = new GuiceInjectableValues(
         GuiceInjectors.makeStartupInjectorWithModules(
-            ImmutableList.<Module>of(
-                new Module()
-                {
-                  @Override
-                  public void configure(Binder binder)
-                  {
-                    
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
-                    binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
-                    
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
-                  }
+            ImmutableList.of(
+                binder -> {
+                  
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+                  binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
+                  
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
                 }
             )
         )
@@ -265,7 +269,7 @@ public class CompactionTaskTest
   private static IndexTuningConfig createTuningConfig()
   {
     return new IndexTuningConfig(
-        5000000,
+        null, // null to compute targetPartitionSize automatically
         500000,
         1000000L,
         null,
@@ -329,6 +333,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -359,6 +364,7 @@ public class CompactionTaskTest
         SEGMENTS,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -373,19 +379,21 @@ public class CompactionTaskTest
     Assert.assertEquals(task.getInterval(), fromJson.getInterval());
     Assert.assertEquals(task.getSegments(), fromJson.getSegments());
     Assert.assertEquals(task.getDimensionsSpec(), 
fromJson.getDimensionsSpec());
+    Assert.assertEquals(task.isKeepSegmentGranularity(), 
fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(task.getTargetCompactionSizeBytes(), 
fromJson.getTargetCompactionSizeBytes());
     Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
     Assert.assertEquals(task.getContext(), fromJson.getContext());
   }
 
   @Test
-  public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws 
IOException, SegmentLoadingException
+  public void testCreateIngestionSchema() throws IOException, 
SegmentLoadingException
   {
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -393,6 +401,12 @@ public class CompactionTaskTest
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
     } else {
@@ -402,14 +416,39 @@ public class CompactionTaskTest
   }
 
   @Test
-  public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws 
IOException, SegmentLoadingException
+  public void testCreateIngestionSchemaWithTargetPartitionSize() throws 
IOException, SegmentLoadingException
   {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        5,
+        500000,
+        1000000L,
+        null,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -417,11 +456,142 @@ public class CompactionTaskTest
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
Collections.singletonList(COMPACTION_INTERVAL));
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
+        500000,
+        1000000L,
+        5L,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithNumShards() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
+        500000,
+        1000000L,
+        null,
+        null,
+        3,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
     }
   }
 
@@ -458,13 +628,19 @@ public class CompactionTaskTest
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         customSpec,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
       IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
@@ -489,9 +665,9 @@ public class CompactionTaskTest
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(SEGMENTS),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -499,6 +675,12 @@ public class CompactionTaskTest
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
     } else {
@@ -518,9 +700,9 @@ public class CompactionTaskTest
     CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(segments),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
   }
@@ -537,9 +719,9 @@ public class CompactionTaskTest
     CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(segments),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
   }
@@ -560,6 +742,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         new NoopChatHandlerProvider(),
@@ -567,6 +750,46 @@ public class CompactionTaskTest
     );
   }
 
+  @Test
+  public void testTargetPartitionSizeWithPartitionConfig() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        5,
+        500000,
+        1000000L,
+        null,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be 
used with");
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(5L, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+  }
+
   private static List<DimensionsSpec> 
getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
   {
     if (keepSegmentGranularity) {
@@ -618,6 +841,45 @@ public class CompactionTaskTest
       List<Interval> expectedSegmentIntervals
   )
   {
+    assertIngestionSchema(
+        ingestionSchemas,
+        expectedDimensionsSpecs,
+        expectedSegmentIntervals,
+        new IndexTuningConfig(
+            41943040, // automatically computed targetPartitionSize
+            500000,
+            1000000L,
+            null,
+            null,
+            null,
+            new IndexSpec(
+                new RoaringBitmapSerdeFactory(true),
+                CompressionStrategy.LZ4,
+                CompressionStrategy.LZF,
+                LongEncodingStrategy.LONGS
+            ),
+            5000,
+            true,
+            false,
+            true,
+            false,
+            null,
+            100L,
+            null,
+            null,
+            null,
+            null
+        )
+    );
+  }
+
+  private static void assertIngestionSchema(
+      List<IndexIngestionSpec> ingestionSchemas,
+      List<DimensionsSpec> expectedDimensionsSpecs,
+      List<Interval> expectedSegmentIntervals,
+      IndexTuningConfig expectedTuningConfig
+  )
+  {
     Preconditions.checkArgument(
         ingestionSchemas.size() == expectedDimensionsSpecs.size(),
         "ingesionSchemas.size()[%s] should be same with 
expectedDimensionsSpecs.size()[%s]",
@@ -677,7 +939,7 @@ public class CompactionTaskTest
       );
 
       // assert tuningConfig
-      Assert.assertEquals(createTuningConfig(), 
ingestionSchema.getTuningConfig());
+      Assert.assertEquals(expectedTuningConfig, 
ingestionSchema.getTuningConfig());
     }
   }
 
@@ -841,17 +1103,74 @@ public class CompactionTaskTest
 
   private static Column createColumn(DimensionSchema dimensionSchema)
   {
-    return new ColumnBuilder()
-        .setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()))
-        .setDictionaryEncodedColumn(() -> null)
-        .setBitmapIndex(() -> null)
-        .build();
+    return new 
TestColumn(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()));
   }
 
   private static Column createColumn(AggregatorFactory aggregatorFactory)
   {
-    return new ColumnBuilder()
-        .setType(ValueType.fromString(aggregatorFactory.getTypeName()))
-        .build();
+    return new 
TestColumn(ValueType.fromString(aggregatorFactory.getTypeName()));
+  }
+
+  private static class TestColumn implements Column
+  {
+    private final ColumnCapabilities columnCapabilities;
+
+    TestColumn(ValueType type)
+    {
+      columnCapabilities = new ColumnCapabilitiesImpl()
+          .setType(type)
+          .setDictionaryEncoded(type == ValueType.STRING) // set a fake value 
to make string columns
+          .setHasBitmapIndexes(type == ValueType.STRING)
+          .setHasSpatialIndexes(false)
+          .setHasMultipleValues(false);
+    }
+
+    @Override
+    public ColumnCapabilities getCapabilities()
+    {
+      return columnCapabilities;
+    }
+
+    @Override
+    public int getLength()
+    {
+      return NUM_ROWS_PER_SEGMENT;
+    }
+
+    @Override
+    public DictionaryEncodedColumn getDictionaryEncoding()
+    {
+      return null;
+    }
+
+    @Override
+    public GenericColumn getGenericColumn()
+    {
+      return null;
+    }
+
+    @Override
+    public ComplexColumn getComplexColumn()
+    {
+      return null;
+    }
+
+    @Override
+    public BitmapIndex getBitmapIndex()
+    {
+      return null;
+    }
+
+    @Override
+    public SpatialIndex getSpatialIndex()
+    {
+      return null;
+    }
+
+    @Override
+    public SettableColumnValueSelector makeSettableColumnValueSelector()
+    {
+      return null;
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index a2c742e..31e2516 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -106,14 +106,14 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs());
-    Assert.assertEquals(false, tuningConfig.isReportParseExceptions());
+    Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs());
+    Assert.assertFalse(tuningConfig.isReportParseExceptions());
     Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
     Assert.assertEquals(new Period(Integer.MAX_VALUE), 
tuningConfig.getIntermediatePersistPeriod());
     Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
     Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
-    Assert.assertEquals(null, tuningConfig.getNumShards());
-    Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
   }
 
   @Test
@@ -125,14 +125,14 @@ public class TaskSerdeTest
     );
 
     Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
-    Assert.assertEquals(null, tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
         "{\"type\":\"index\", \"numShards\":10}",
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -140,7 +140,7 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -148,7 +148,7 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getNumShards());
     Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
 
     tuningConfig = jsonMapper.readValue(
@@ -156,11 +156,8 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getNumShards());
-    Assert.assertEquals(
-        IndexTask.IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE,
-        (int) tuningConfig.getTargetPartitionSize()
-    );
+    Assert.assertNull(tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
   }
 
   @Test
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 1ddcc9e..6bc1336 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -31,9 +31,10 @@ import java.util.Objects;
 
 public class DataSourceCompactionConfig
 {
+  public static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 
1024; // 400MB
+
   // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
   private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
-  private static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 
* 1024; // 400MB
   private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
   private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new 
Period("P1D");
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to