jihoonson closed pull request #6203: Add support targetCompactionSizeBytes for 
compactionTask
URL: https://github.com/apache/incubator-druid/pull/6203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java 
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index 1ed0ae64bca..5e5f676e42f 100644
--- 
a/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ 
b/common/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.SortedSet;
+import java.util.Spliterator;
 import java.util.TreeSet;
 
 /**
@@ -130,6 +131,12 @@ public boolean isComplete()
     return holderSet.iterator();
   }
 
+  @Override
+  public Spliterator<PartitionChunk<T>> spliterator()
+  {
+    return holderSet.spliterator();
+  }
+
   public Iterable<T> payloads()
   {
     return Iterables.transform(this, PartitionChunk::getObject);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index ea1186aa3a7..cce1a743757 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -68,6 +68,7 @@
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
@@ -86,6 +87,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -101,13 +103,15 @@
   private final List<DataSegment> segments;
   private final DimensionsSpec dimensionsSpec;
   private final boolean keepSegmentGranularity;
+  @Nullable
+  private final Long targetCompactionSizeBytes;
+  @Nullable
   private final IndexTuningConfig tuningConfig;
   private final ObjectMapper jsonMapper;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
-
   @JsonIgnore
-  private List<IndexTask> indexTaskSpecs;
+  private final PartitionConfigurationManager partitionConfigurationManager;
 
   @JsonIgnore
   private final AuthorizerMapper authorizerMapper;
@@ -118,6 +122,9 @@
   @JsonIgnore
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
 
+  @JsonIgnore
+  private List<IndexTask> indexTaskSpecs;
+
   @JsonCreator
   public CompactionTask(
       @JsonProperty("id") final String id,
@@ -127,6 +134,7 @@ public CompactionTask(
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @Nullable @JsonProperty("dimensions") final DimensionsSpec 
dimensionsSpec,
       @Nullable @JsonProperty("keepSegmentGranularity") final Boolean 
keepSegmentGranularity,
+      @Nullable @JsonProperty("targetCompactionSizeBytes") final Long 
targetCompactionSizeBytes,
       @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig 
tuningConfig,
       @Nullable @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject ObjectMapper jsonMapper,
@@ -149,9 +157,11 @@ public CompactionTask(
     this.keepSegmentGranularity = keepSegmentGranularity == null
                                   ? DEFAULT_KEEP_SEGMENT_GRANULARITY
                                   : keepSegmentGranularity;
+    this.targetCompactionSizeBytes = targetCompactionSizeBytes;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
     this.segmentProvider = segments == null ? new SegmentProvider(dataSource, 
interval) : new SegmentProvider(segments);
+    this.partitionConfigurationManager = new 
PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@@ -181,6 +191,14 @@ public boolean isKeepSegmentGranularity()
     return keepSegmentGranularity;
   }
 
+  @Nullable
+  @JsonProperty
+  public Long getTargetCompactionSizeBytes()
+  {
+    return targetCompactionSizeBytes;
+  }
+
+  @Nullable
   @JsonProperty
   public IndexTuningConfig getTuningConfig()
   {
@@ -220,9 +238,9 @@ public TaskStatus run(final TaskToolbox toolbox) throws 
Exception
       indexTaskSpecs = createIngestionSchema(
           toolbox,
           segmentProvider,
+          partitionConfigurationManager,
           dimensionsSpec,
           keepSegmentGranularity,
-          tuningConfig,
           jsonMapper
       ).stream()
       .map(spec -> new IndexTask(
@@ -271,12 +289,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws 
Exception
    */
   @VisibleForTesting
   static List<IndexIngestionSpec> createIngestionSchema(
-      TaskToolbox toolbox,
-      SegmentProvider segmentProvider,
-      DimensionsSpec dimensionsSpec,
-      boolean keepSegmentGranularity,
-      IndexTuningConfig tuningConfig,
-      ObjectMapper jsonMapper
+      final TaskToolbox toolbox,
+      final SegmentProvider segmentProvider,
+      final PartitionConfigurationManager partitionConfigurationManager,
+      final DimensionsSpec dimensionsSpec,
+      final boolean keepSegmentGranularity,
+      final ObjectMapper jsonMapper
   ) throws IOException, SegmentLoadingException
   {
     Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, 
DataSegment>>> pair = prepareSegments(
@@ -290,26 +308,52 @@ public TaskStatus run(final TaskToolbox toolbox) throws 
Exception
       return Collections.emptyList();
     }
 
+    // find metadata for interval
+    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = 
loadSegments(
+        timelineSegments,
+        segmentFileMap,
+        toolbox.getIndexIO()
+    );
+
+    final IndexTuningConfig compactionTuningConfig = 
partitionConfigurationManager.computeTuningConfig(
+        queryableIndexAndSegments
+    );
+
     if (keepSegmentGranularity) {
-      // if keepSegmentGranularity = true, create indexIngestionSpec per 
segment interval, so that we can run an index
+      // If keepSegmentGranularity = true, create indexIngestionSpec per 
segment interval, so that we can run an index
       // task per segment interval.
-      final List<IndexIngestionSpec> specs = new 
ArrayList<>(timelineSegments.size());
-      for (TimelineObjectHolder<String, DataSegment> holder : 
timelineSegments) {
+
+      //noinspection unchecked,ConstantConditions
+      final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> 
intervalToSegments = queryableIndexAndSegments
+          .stream()
+          .collect(
+              Collectors.toMap(
+                  // rhs can't be null here so we skip null checking and 
supress the warning with the above comment
+                  p -> p.rhs.getInterval(),
+                  Lists::newArrayList,
+                  (l1, l2) -> {
+                    l1.addAll(l2);
+                    return l1;
+                  }
+              )
+          );
+      final List<IndexIngestionSpec> specs = new 
ArrayList<>(intervalToSegments.size());
+      for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : 
intervalToSegments.entrySet()) {
+        final Interval interval = entry.getKey();
+        final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = 
entry.getValue();
         final DataSchema dataSchema = createDataSchema(
             segmentProvider.dataSource,
-            holder.getInterval(),
-            Collections.singletonList(holder),
+            interval,
+            segmentsToCompact,
             dimensionsSpec,
-            toolbox.getIndexIO(),
-            jsonMapper,
-            segmentFileMap
+            jsonMapper
         );
 
         specs.add(
             new IndexIngestionSpec(
                 dataSchema,
-                createIoConfig(toolbox, dataSchema, holder.getInterval()),
-                tuningConfig
+                createIoConfig(toolbox, dataSchema, interval),
+                compactionTuningConfig
             )
         );
       }
@@ -319,18 +363,16 @@ public TaskStatus run(final TaskToolbox toolbox) throws 
Exception
       final DataSchema dataSchema = createDataSchema(
           segmentProvider.dataSource,
           segmentProvider.interval,
-          timelineSegments,
+          queryableIndexAndSegments,
           dimensionsSpec,
-          toolbox.getIndexIO(),
-          jsonMapper,
-          segmentFileMap
+          jsonMapper
       );
 
       return Collections.singletonList(
           new IndexIngestionSpec(
               dataSchema,
               createIoConfig(toolbox, dataSchema, segmentProvider.interval),
-              tuningConfig
+              compactionTuningConfig
           )
       );
     }
@@ -368,21 +410,11 @@ private static IndexIOConfig createIoConfig(TaskToolbox 
toolbox, DataSchema data
   private static DataSchema createDataSchema(
       String dataSource,
       Interval totalInterval,
-      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
       DimensionsSpec dimensionsSpec,
-      IndexIO indexIO,
-      ObjectMapper jsonMapper,
-      Map<DataSegment, File> segmentFileMap
+      ObjectMapper jsonMapper
   )
-      throws IOException
   {
-    // find metadata for interval
-    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = 
loadSegments(
-        timelineObjectHolder,
-        segmentFileMap,
-        indexIO
-    );
-
     // find merged aggregators
     for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
       final QueryableIndex index = pair.lhs;
@@ -621,4 +653,105 @@ private static DimensionSchema createDimensionSchema(
       return usedSegments;
     }
   }
+
+  @VisibleForTesting
+  static class PartitionConfigurationManager
+  {
+    @Nullable
+    private final Long targetCompactionSizeBytes;
+    @Nullable
+    private final IndexTuningConfig tuningConfig;
+
+    PartitionConfigurationManager(@Nullable Long targetCompactionSizeBytes, 
@Nullable IndexTuningConfig tuningConfig)
+    {
+      this.targetCompactionSizeBytes = 
getValidTargetCompactionSizeBytes(targetCompactionSizeBytes, tuningConfig);
+      this.tuningConfig = tuningConfig;
+    }
+
+    @Nullable
+    IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex, 
DataSegment>> queryableIndexAndSegments)
+    {
+      if (!hasPartitionConfig(tuningConfig)) {
+        final long nonNullTargetCompactionSizeBytes = 
Preconditions.checkNotNull(
+            targetCompactionSizeBytes,
+            "targetCompactionSizeBytes"
+        );
+        // Find IndexTuningConfig.targetPartitionSize which is the number of 
rows per segment.
+        // Assume that the segment size is proportional to the number of rows. 
We can improve this later.
+        final long totalNumRows = queryableIndexAndSegments
+            .stream()
+            .mapToLong(queryableIndexAndDataSegment -> 
queryableIndexAndDataSegment.lhs.getNumRows())
+            .sum();
+        final long totalSizeBytes = queryableIndexAndSegments
+            .stream()
+            .mapToLong(queryableIndexAndDataSegment -> 
queryableIndexAndDataSegment.rhs.getSize())
+            .sum();
+
+        if (totalSizeBytes == 0L) {
+          throw new ISE("Total input segment size is 0 byte");
+        }
+
+        final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
+        final int targetPartitionSize = 
Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
+        Preconditions.checkState(targetPartitionSize > 0, "Negative 
targetPartitionSize[%s]", targetPartitionSize);
+
+        log.info(
+            "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * 
targetCompactionSizeBytes[%d]",
+            targetPartitionSize,
+            avgRowsPerByte,
+            nonNullTargetCompactionSizeBytes
+        );
+        return (tuningConfig == null ? IndexTuningConfig.createDefault() : 
tuningConfig)
+            .withTargetPartitionSize(targetPartitionSize);
+      } else {
+        return tuningConfig;
+      }
+    }
+
+    /**
+     * Check the validity of {@link #targetCompactionSizeBytes} and return a 
valid value. Note that
+     * targetCompactionSizeBytes cannot be used with {@link 
IndexTuningConfig#targetPartitionSize},
+     * {@link IndexTuningConfig#maxTotalRows}, or {@link 
IndexTuningConfig#numShards} together.
+     * {@link #hasPartitionConfig} checks one of those configs is set.
+     *
+     * This throws an {@link IllegalArgumentException} if 
targetCompactionSizeBytes is set and hasPartitionConfig
+     * returns true. If targetCompactionSizeBytes is not set, this returns 
null or
+     * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} 
according to the result of
+     * hasPartitionConfig.
+     */
+    @Nullable
+    private static Long getValidTargetCompactionSizeBytes(
+        @Nullable Long targetCompactionSizeBytes,
+        @Nullable IndexTuningConfig tuningConfig
+    )
+    {
+      if (targetCompactionSizeBytes != null) {
+        Preconditions.checkArgument(
+            !hasPartitionConfig(tuningConfig),
+            "targetCompactionSizeBytes[%s] cannot be used with 
targetPartitionSize[%s], maxTotalRows[%s],"
+            + " or numShards[%s] of tuningConfig",
+            targetCompactionSizeBytes,
+            tuningConfig == null ? null : 
tuningConfig.getTargetPartitionSize(),
+            tuningConfig == null ? null : tuningConfig.getMaxTotalRows(),
+            tuningConfig == null ? null : tuningConfig.getNumShards()
+        );
+        return targetCompactionSizeBytes;
+      } else {
+        return hasPartitionConfig(tuningConfig)
+               ? null
+               : 
DataSourceCompactionConfig.DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+      }
+    }
+
+    private static boolean hasPartitionConfig(@Nullable IndexTuningConfig 
tuningConfig)
+    {
+      if (tuningConfig != null) {
+        return tuningConfig.getTargetPartitionSize() != null
+               || tuningConfig.getMaxTotalRows() != null
+               || tuningConfig.getNumShards() != null;
+      } else {
+        return false;
+      }
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 5d6bef1b859..f00cd0abc84 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -427,7 +427,12 @@ public TaskStatus run(final TaskToolbox toolbox)
       FileUtils.forceMkdir(firehoseTempDir);
 
       ingestionState = IngestionState.DETERMINE_PARTITIONS;
-      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, 
firehoseFactory, firehoseTempDir);
+
+      // Initialize maxRowsPerSegment and maxTotalRows lazily
+      final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
+      @Nullable final Integer targetPartitionSize = 
getValidTargetPartitionSize(tuningConfig);
+      @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
+      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, 
firehoseFactory, firehoseTempDir, targetPartitionSize);
       final DataSchema dataSchema;
       final Map<Interval, String> versions;
       if (determineIntervals) {
@@ -457,7 +462,16 @@ public TaskStatus run(final TaskToolbox toolbox)
       }
 
       ingestionState = IngestionState.BUILD_SEGMENTS;
-      return generateAndPublishSegments(toolbox, dataSchema, shardSpecs, 
versions, firehoseFactory, firehoseTempDir);
+      return generateAndPublishSegments(
+          toolbox,
+          dataSchema,
+          shardSpecs,
+          versions,
+          firehoseFactory,
+          firehoseTempDir,
+          targetPartitionSize,
+          maxTotalRows
+      );
     }
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
@@ -583,7 +597,8 @@ private static boolean isExtendableShardSpecs(IndexIOConfig 
ioConfig, IndexTunin
   private ShardSpecs determineShardSpecs(
       final TaskToolbox toolbox,
       final FirehoseFactory firehoseFactory,
-      final File firehoseTempDir
+      final File firehoseTempDir,
+      @Nullable final Integer targetPartitionSize
   ) throws IOException
   {
     final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@@ -618,7 +633,8 @@ private ShardSpecs determineShardSpecs(
           granularitySpec,
           tuningConfig,
           determineIntervals,
-          determineNumPartitions
+          determineNumPartitions,
+          targetPartitionSize
       );
     }
   }
@@ -666,7 +682,8 @@ private ShardSpecs createShardSpecsFromInput(
       GranularitySpec granularitySpec,
       IndexTuningConfig tuningConfig,
       boolean determineIntervals,
-      boolean determineNumPartitions
+      boolean determineNumPartitions,
+      @Nullable Integer targetPartitionSize
   ) throws IOException
   {
     log.info("Determining intervals and shardSpecs");
@@ -690,8 +707,10 @@ private ShardSpecs createShardSpecsFromInput(
 
       final int numShards;
       if (determineNumPartitions) {
-        final long numRows = collector.estimateCardinalityRound();
-        numShards = (int) Math.ceil((double) numRows / 
tuningConfig.getTargetPartitionSize());
+        final long numRows = Preconditions.checkNotNull(collector, "HLL 
collector").estimateCardinalityRound();
+        numShards = (int) Math.ceil(
+            (double) numRows / Preconditions.checkNotNull(targetPartitionSize, 
"targetPartitionSize")
+        );
         log.info("Estimated [%,d] rows of data for interval [%s], creating 
[%,d] shards", numRows, interval, numShards);
       } else {
         numShards = defaultNumShards;
@@ -856,7 +875,9 @@ private TaskStatus generateAndPublishSegments(
       final ShardSpecs shardSpecs,
       final Map<Interval, String> versions,
       final FirehoseFactory firehoseFactory,
-      final File firehoseTempDir
+      final File firehoseTempDir,
+      @Nullable final Integer targetPartitionSize,
+      @Nullable final Long maxTotalRows
   ) throws IOException, InterruptedException
   {
     final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
@@ -1004,8 +1025,8 @@ private TaskStatus generateAndPublishSegments(
           if (addResult.isOk()) {
             // incremental segment publishment is allowed only when rollup 
don't have to be perfect.
             if (!isGuaranteedRollup &&
-                (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), 
tuningConfig) ||
-                 
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), 
tuningConfig))) {
+                (exceedMaxRowsInSegment(targetPartitionSize, 
addResult.getNumRowsInSegment()) ||
+                 exceedMaxRowsInAppenderator(maxTotalRows, 
addResult.getTotalNumRowsInAppenderator()))) {
               // There can be some segments waiting for being published even 
though any rows won't be added to them.
               // If those segments are not published here, the available space 
in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -1069,6 +1090,40 @@ private TaskStatus generateAndPublishSegments(
     }
   }
 
+  /**
+   * Return the valid target partition size. If {@link 
IndexTuningConfig#numShards} is valid, this returns null.
+   * Otherwise, this returns {@link 
IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given
+   * {@link IndexTuningConfig#targetPartitionSize}.
+   */
+  public static Integer getValidTargetPartitionSize(IndexTuningConfig 
tuningConfig)
+  {
+    @Nullable final Integer numShards = tuningConfig.numShards;
+    @Nullable final Integer targetPartitionSize = 
tuningConfig.targetPartitionSize;
+    if (numShards == null || numShards == -1) {
+      return targetPartitionSize == null || targetPartitionSize.equals(-1)
+             ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE
+             : targetPartitionSize;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Return the valid target partition size. If {@link 
IndexTuningConfig#numShards} is valid, this returns null.
+   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} 
or the given
+   * {@link IndexTuningConfig#maxTotalRows}.
+   */
+  public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig)
+  {
+    @Nullable final Integer numShards = tuningConfig.numShards;
+    @Nullable final Long maxTotalRows = tuningConfig.maxTotalRows;
+    if (numShards == null || numShards == -1) {
+      return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : 
maxTotalRows;
+    } else {
+      return null;
+    }
+  }
+
   private void handleParseException(ParseException e)
   {
     if (e.isFromPartiallyValidRow()) {
@@ -1092,17 +1147,20 @@ private void handleParseException(ParseException e)
     }
   }
 
-  private static boolean exceedMaxRowsInSegment(int numRowsInSegment, 
IndexTuningConfig indexTuningConfig)
+  private static boolean exceedMaxRowsInSegment(
+      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if 
numShards is set in indexTuningConfig
+      int numRowsInSegment
+  )
   {
-    // maxRowsInSegment should be null if numShards is set in indexTuningConfig
-    final Integer maxRowsInSegment = 
indexTuningConfig.getTargetPartitionSize();
     return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
   }
 
-  private static boolean exceedMaxRowsInAppenderator(long 
numRowsInAppenderator, IndexTuningConfig indexTuningConfig)
+  private static boolean exceedMaxRowsInAppenderator(
+      // maxRowsInAppenderator can be null if numShards is set in 
indexTuningConfig
+      @Nullable final Long maxRowsInAppenderator,
+      long numRowsInAppenderator
+  )
   {
-    // maxRowsInAppenderator should be null if numShards is set in 
indexTuningConfig
-    final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
     return maxRowsInAppenderator != null && maxRowsInAppenderator <= 
numRowsInAppenderator;
   }
 
@@ -1271,7 +1329,9 @@ public boolean isAppendToExisting()
   @JsonTypeName("index")
   public static class IndexTuningConfig implements TuningConfig, 
AppenderatorConfig
   {
-    private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+    static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000;
+    static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
     private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
     private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
@@ -1279,12 +1339,13 @@ public boolean isAppendToExisting()
     private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
-    static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;
-
+    @Nullable
     private final Integer targetPartitionSize;
     private final int maxRowsInMemory;
     private final long maxBytesInMemory;
+    @Nullable
     private final Long maxTotalRows;
+    @Nullable
     private final Integer numShards;
     private final IndexSpec indexSpec;
     private final File basePersistDirectory;
@@ -1312,6 +1373,11 @@ public boolean isAppendToExisting()
     @Nullable
     private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
 
+    public static IndexTuningConfig createDefault()
+    {
+      return new IndexTuningConfig();
+    }
+
     @JsonCreator
     public IndexTuningConfig(
         @JsonProperty("targetPartitionSize") @Nullable Integer 
targetPartitionSize,
@@ -1385,12 +1451,14 @@ private IndexTuningConfig(
           "targetPartitionSize and numShards cannot both be set"
       );
 
-      this.targetPartitionSize = initializeTargetPartitionSize(numShards, 
targetPartitionSize);
+      this.targetPartitionSize = (targetPartitionSize != null && 
targetPartitionSize == -1)
+                                 ? null
+                                 : targetPartitionSize;
       this.maxRowsInMemory = maxRowsInMemory == null ? 
TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see 
server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
-      this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows);
+      this.maxTotalRows = maxTotalRows;
       this.numShards = numShards == null || numShards.equals(-1) ? null : 
numShards;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
       this.maxPendingPersists = maxPendingPersists == null ? 
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
@@ -1422,27 +1490,29 @@ private IndexTuningConfig(
                                 : logParseExceptions;
     }
 
-    private static Integer initializeTargetPartitionSize(Integer numShards, 
Integer targetPartitionSize)
-    {
-      if (numShards == null || numShards == -1) {
-        return targetPartitionSize == null || targetPartitionSize.equals(-1)
-               ? DEFAULT_TARGET_PARTITION_SIZE
-               : targetPartitionSize;
-      } else {
-        return null;
-      }
-    }
-
-    private static Long initializeMaxTotalRows(Integer numShards, Long 
maxTotalRows)
+    public IndexTuningConfig withBasePersistDirectory(File dir)
     {
-      if (numShards == null || numShards == -1) {
-        return maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
-      } else {
-        return null;
-      }
+      return new IndexTuningConfig(
+          targetPartitionSize,
+          maxRowsInMemory,
+          maxBytesInMemory,
+          maxTotalRows,
+          numShards,
+          indexSpec,
+          maxPendingPersists,
+          forceExtendableShardSpecs,
+          forceGuaranteedRollup,
+          reportParseExceptions,
+          pushTimeout,
+          dir,
+          segmentWriteOutMediumFactory,
+          logParseExceptions,
+          maxParseExceptions,
+          maxSavedParseExceptions
+      );
     }
 
-    public IndexTuningConfig withBasePersistDirectory(File dir)
+    public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize)
     {
       return new IndexTuningConfig(
           targetPartitionSize,
@@ -1456,7 +1526,7 @@ public IndexTuningConfig withBasePersistDirectory(File 
dir)
           forceGuaranteedRollup,
           reportParseExceptions,
           pushTimeout,
-          dir,
+          basePersistDirectory,
           segmentWriteOutMediumFactory,
           logParseExceptions,
           maxParseExceptions,
@@ -1464,6 +1534,11 @@ public IndexTuningConfig withBasePersistDirectory(File 
dir)
       );
     }
 
+    /**
+     * Return the target number of rows per segment. This returns null if it's 
not specified in tuningConfig.
+     * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get 
the valid value.
+     */
+    @Nullable
     @JsonProperty
     public Integer getTargetPartitionSize()
     {
@@ -1484,6 +1559,10 @@ public long getMaxBytesInMemory()
       return maxBytesInMemory;
     }
 
+    /**
+     * Return the max number of total rows in appenderator. This returns null 
if it's not specified in tuningConfig.
+     * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the 
valid value.
+     */
     @JsonProperty
     @Override
     @Nullable
@@ -1633,5 +1712,28 @@ public int hashCode()
           maxSavedParseExceptions
       );
     }
+
+    @Override
+    public String toString()
+    {
+      return "IndexTuningConfig{" +
+             "targetPartitionSize=" + targetPartitionSize +
+             ", maxRowsInMemory=" + maxRowsInMemory +
+             ", maxBytesInMemory=" + maxBytesInMemory +
+             ", maxTotalRows=" + maxTotalRows +
+             ", numShards=" + numShards +
+             ", indexSpec=" + indexSpec +
+             ", basePersistDirectory=" + basePersistDirectory +
+             ", maxPendingPersists=" + maxPendingPersists +
+             ", forceExtendableShardSpecs=" + forceExtendableShardSpecs +
+             ", forceGuaranteedRollup=" + forceGuaranteedRollup +
+             ", reportParseExceptions=" + reportParseExceptions +
+             ", pushTimeout=" + pushTimeout +
+             ", logParseExceptions=" + logParseExceptions +
+             ", maxParseExceptions=" + maxParseExceptions +
+             ", maxSavedParseExceptions=" + maxSavedParseExceptions +
+             ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory +
+             '}';
+    }
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 46cf03e65e8..20871afc6d1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -303,7 +303,10 @@ private SegmentAllocator createSegmentAllocator(
       );
     }
 
+    // Initialize maxRowsPerSegment and maxTotalRows lazily
     final ParallelIndexTuningConfig tuningConfig = 
ingestionSchema.getTuningConfig();
+    @Nullable final Integer targetPartitionSize = 
IndexTask.getValidTargetPartitionSize(tuningConfig);
+    @Nullable final Long maxTotalRows = 
IndexTask.getValidMaxTotalRows(tuningConfig);
     final long pushTimeout = tuningConfig.getPushTimeout();
     final boolean explicitIntervals = 
granularitySpec.bucketIntervals().isPresent();
     final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, 
taskClient, ingestionSchema);
@@ -348,8 +351,8 @@ private SegmentAllocator createSegmentAllocator(
           final AppenderatorDriverAddResult addResult = driver.add(inputRow, 
sequenceName);
 
           if (addResult.isOk()) {
-            if (exceedMaxRowsInSegment(addResult.getNumRowsInSegment(), 
tuningConfig) ||
-                
exceedMaxRowsInAppenderator(addResult.getTotalNumRowsInAppenderator(), 
tuningConfig)) {
+            if (exceedMaxRowsInSegment(targetPartitionSize, 
addResult.getNumRowsInSegment()) ||
+                exceedMaxRowsInAppenderator(maxTotalRows, 
addResult.getTotalNumRowsInAppenderator())) {
               // There can be some segments waiting for being published even 
though any rows won't be added to them.
               // If those segments are not published here, the available space 
in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -384,22 +387,19 @@ private SegmentAllocator createSegmentAllocator(
   }
 
   private static boolean exceedMaxRowsInSegment(
-      int numRowsInSegment,
-      ParallelIndexTuningConfig indexTuningConfig
+      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if 
numShards is set in indexTuningConfig
+      int numRowsInSegment
   )
   {
-    // maxRowsInSegment should be null if numShards is set in indexTuningConfig
-    final Integer maxRowsInSegment = 
indexTuningConfig.getTargetPartitionSize();
     return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
   }
 
   private static boolean exceedMaxRowsInAppenderator(
-      long numRowsInAppenderator,
-      ParallelIndexTuningConfig indexTuningConfig
+      // maxRowsInAppenderator can be null if numShards is set in 
indexTuningConfig
+      @Nullable Long maxRowsInAppenderator,
+      long numRowsInAppenderator
   )
   {
-    // maxRowsInAppenderator should be null if numShards is set in 
indexTuningConfig
-    final Long maxRowsInAppenderator = indexTuningConfig.getMaxTotalRows();
     return maxRowsInAppenderator != null && maxRowsInAppenderator <= 
numRowsInAppenderator;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 381b51c549a..5cd5f1fbdbd 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -28,8 +28,6 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.inject.Binder;
-import com.google.inject.Module;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -49,6 +47,7 @@
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import 
org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
 import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
@@ -59,6 +58,7 @@
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@@ -71,8 +71,14 @@
 import org.apache.druid.segment.Metadata;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.SimpleQueryableIndex;
+import org.apache.druid.segment.column.BitmapIndex;
 import org.apache.druid.segment.column.Column;
-import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.GenericColumn;
+import org.apache.druid.segment.column.SpatialIndex;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
@@ -84,6 +90,7 @@
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import org.apache.druid.segment.transform.TransformingInputRowParser;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.AuthTestUtils;
@@ -120,6 +127,8 @@
 @RunWith(Parameterized.class)
 public class CompactionTaskTest
 {
+  private static final long SEGMENT_SIZE_BYTES = 100;
+  private static final int NUM_ROWS_PER_SEGMENT = 10;
   private static final String DATA_SOURCE = "dataSource";
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
@@ -204,7 +213,7 @@ public static void setupClass()
               new ArrayList<>(AGGREGATORS.keySet()),
               new NumberedShardSpec(0, 1),
               0,
-              1
+              SEGMENT_SIZE_BYTES
           ),
           new File("file_" + i)
       );
@@ -225,16 +234,11 @@ private static ObjectMapper 
setupInjectablesInObjectMapper(ObjectMapper objectMa
     );
     GuiceInjectableValues injectableValues = new GuiceInjectableValues(
         GuiceInjectors.makeStartupInjectorWithModules(
-            ImmutableList.<Module>of(
-                new Module()
-                {
-                  @Override
-                  public void configure(Binder binder)
-                  {
-                    
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
-                    binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
-                    
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
-                  }
+            ImmutableList.of(
+                binder -> {
+                  
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+                  binder.bind(ChatHandlerProvider.class).toInstance(new 
NoopChatHandlerProvider());
+                  
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
                 }
             )
         )
@@ -265,7 +269,7 @@ public void configure(Binder binder)
   private static IndexTuningConfig createTuningConfig()
   {
     return new IndexTuningConfig(
-        5000000,
+        null, // null to compute targetPartitionSize automatically
         500000,
         1000000L,
         null,
@@ -329,6 +333,7 @@ public void testSerdeWithInterval() throws IOException
         null,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -359,6 +364,7 @@ public void testSerdeWithSegments() throws IOException
         SEGMENTS,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -373,19 +379,21 @@ public void testSerdeWithSegments() throws IOException
     Assert.assertEquals(task.getInterval(), fromJson.getInterval());
     Assert.assertEquals(task.getSegments(), fromJson.getSegments());
     Assert.assertEquals(task.getDimensionsSpec(), 
fromJson.getDimensionsSpec());
+    Assert.assertEquals(task.isKeepSegmentGranularity(), 
fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(task.getTargetCompactionSizeBytes(), 
fromJson.getTargetCompactionSizeBytes());
     Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
     Assert.assertEquals(task.getContext(), fromJson.getContext());
   }
 
   @Test
-  public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws 
IOException, SegmentLoadingException
+  public void testCreateIngestionSchema() throws IOException, 
SegmentLoadingException
   {
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -393,6 +401,12 @@ public void 
testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOExcep
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
     } else {
@@ -402,14 +416,39 @@ public void 
testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOExcep
   }
 
   @Test
-  public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws 
IOException, SegmentLoadingException
+  public void testCreateIngestionSchemaWithTargetPartitionSize() throws 
IOException, SegmentLoadingException
   {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        5,
+        500000,
+        1000000L,
+        null,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -417,11 +456,142 @@ public void 
testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOExc
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
Collections.singletonList(COMPACTION_INTERVAL));
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
+        500000,
+        1000000L,
+        5L,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithNumShards() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
+        500000,
+        1000000L,
+        null,
+        null,
+        3,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS, tuningConfig);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          tuningConfig
+      );
     }
   }
 
@@ -458,13 +628,19 @@ public void 
testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         customSpec,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
       IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
@@ -489,9 +665,9 @@ public void testCreateIngestionSchemaWithCustomSegments() 
throws IOException, Se
     final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(SEGMENTS),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = 
getExpectedDimensionsSpecForAutoGeneration(
@@ -499,6 +675,12 @@ public void testCreateIngestionSchemaWithCustomSegments() 
throws IOException, Se
     );
 
     if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
       Assert.assertEquals(5, ingestionSpecs.size());
       assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, 
SEGMENT_INTERVALS);
     } else {
@@ -518,9 +700,9 @@ public void 
testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
     CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(segments),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
   }
@@ -537,9 +719,9 @@ public void testMissingMetadata() throws IOException, 
SegmentLoadingException
     CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(segments),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         keepSegmentGranularity,
-        TUNING_CONFIG,
         objectMapper
     );
   }
@@ -560,6 +742,7 @@ public void testEmptyInterval()
         null,
         null,
         null,
+        null,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         new NoopChatHandlerProvider(),
@@ -567,6 +750,46 @@ public void testEmptyInterval()
     );
   }
 
+  @Test
+  public void testTargetPartitionSizeWithPartitionConfig() throws IOException, 
SegmentLoadingException
+  {
+    final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        5,
+        500000,
+        1000000L,
+        null,
+        null,
+        null,
+        new IndexSpec(
+            new RoaringBitmapSerdeFactory(true),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        5000,
+        true,
+        false,
+        true,
+        false,
+        null,
+        100L,
+        null,
+        null,
+        null,
+        null
+    );
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("targetCompactionSizeBytes[5] cannot be 
used with");
+    final List<IndexIngestionSpec> ingestionSpecs = 
CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(5L, tuningConfig),
+        null,
+        keepSegmentGranularity,
+        objectMapper
+    );
+  }
+
   private static List<DimensionsSpec> 
getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
   {
     if (keepSegmentGranularity) {
@@ -617,6 +840,45 @@ private static void assertIngestionSchema(
       List<DimensionsSpec> expectedDimensionsSpecs,
       List<Interval> expectedSegmentIntervals
   )
+  {
+    assertIngestionSchema(
+        ingestionSchemas,
+        expectedDimensionsSpecs,
+        expectedSegmentIntervals,
+        new IndexTuningConfig(
+            41943040, // automatically computed targetPartitionSize
+            500000,
+            1000000L,
+            null,
+            null,
+            null,
+            new IndexSpec(
+                new RoaringBitmapSerdeFactory(true),
+                CompressionStrategy.LZ4,
+                CompressionStrategy.LZF,
+                LongEncodingStrategy.LONGS
+            ),
+            5000,
+            true,
+            false,
+            true,
+            false,
+            null,
+            100L,
+            null,
+            null,
+            null,
+            null
+        )
+    );
+  }
+
+  private static void assertIngestionSchema(
+      List<IndexIngestionSpec> ingestionSchemas,
+      List<DimensionsSpec> expectedDimensionsSpecs,
+      List<Interval> expectedSegmentIntervals,
+      IndexTuningConfig expectedTuningConfig
+  )
   {
     Preconditions.checkArgument(
         ingestionSchemas.size() == expectedDimensionsSpecs.size(),
@@ -677,7 +939,7 @@ private static void assertIngestionSchema(
       );
 
       // assert tuningConfig
-      Assert.assertEquals(createTuningConfig(), 
ingestionSchema.getTuningConfig());
+      Assert.assertEquals(expectedTuningConfig, 
ingestionSchema.getTuningConfig());
     }
   }
 
@@ -841,17 +1103,74 @@ void removeMetadata(File file)
 
   private static Column createColumn(DimensionSchema dimensionSchema)
   {
-    return new ColumnBuilder()
-        .setType(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()))
-        .setDictionaryEncodedColumn(() -> null)
-        .setBitmapIndex(() -> null)
-        .build();
+    return new 
TestColumn(IncrementalIndex.TYPE_MAP.get(dimensionSchema.getValueType()));
   }
 
   private static Column createColumn(AggregatorFactory aggregatorFactory)
   {
-    return new ColumnBuilder()
-        .setType(ValueType.fromString(aggregatorFactory.getTypeName()))
-        .build();
+    return new 
TestColumn(ValueType.fromString(aggregatorFactory.getTypeName()));
+  }
+
+  private static class TestColumn implements Column
+  {
+    private final ColumnCapabilities columnCapabilities;
+
+    TestColumn(ValueType type)
+    {
+      columnCapabilities = new ColumnCapabilitiesImpl()
+          .setType(type)
+          .setDictionaryEncoded(type == ValueType.STRING) // set a fake value 
to make string columns
+          .setHasBitmapIndexes(type == ValueType.STRING)
+          .setHasSpatialIndexes(false)
+          .setHasMultipleValues(false);
+    }
+
+    @Override
+    public ColumnCapabilities getCapabilities()
+    {
+      return columnCapabilities;
+    }
+
+    @Override
+    public int getLength()
+    {
+      return NUM_ROWS_PER_SEGMENT;
+    }
+
+    @Override
+    public DictionaryEncodedColumn getDictionaryEncoding()
+    {
+      return null;
+    }
+
+    @Override
+    public GenericColumn getGenericColumn()
+    {
+      return null;
+    }
+
+    @Override
+    public ComplexColumn getComplexColumn()
+    {
+      return null;
+    }
+
+    @Override
+    public BitmapIndex getBitmapIndex()
+    {
+      return null;
+    }
+
+    @Override
+    public SpatialIndex getSpatialIndex()
+    {
+      return null;
+    }
+
+    @Override
+    public SettableColumnValueSelector makeSettableColumnValueSelector()
+    {
+      return null;
+    }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index a2c742e84f1..31e25168174 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -106,14 +106,14 @@ public void testIndexTaskTuningConfigDefaults() throws 
Exception
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs());
-    Assert.assertEquals(false, tuningConfig.isReportParseExceptions());
+    Assert.assertFalse(tuningConfig.isForceExtendableShardSpecs());
+    Assert.assertFalse(tuningConfig.isReportParseExceptions());
     Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
     Assert.assertEquals(new Period(Integer.MAX_VALUE), 
tuningConfig.getIntermediatePersistPeriod());
     Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
     Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
-    Assert.assertEquals(null, tuningConfig.getNumShards());
-    Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
   }
 
   @Test
@@ -125,14 +125,14 @@ public void 
testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc
     );
 
     Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
-    Assert.assertEquals(null, tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
         "{\"type\":\"index\", \"numShards\":10}",
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -140,7 +140,7 @@ public void 
testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -148,7 +148,7 @@ public void 
testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getNumShards());
     Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
 
     tuningConfig = jsonMapper.readValue(
@@ -156,11 +156,8 @@ public void 
testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exc
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(null, tuningConfig.getNumShards());
-    Assert.assertEquals(
-        IndexTask.IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE,
-        (int) tuningConfig.getTargetPartitionSize()
-    );
+    Assert.assertNull(tuningConfig.getNumShards());
+    Assert.assertNull(tuningConfig.getTargetPartitionSize());
   }
 
   @Test
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 1ddcc9e6e73..6bc13365e6b 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -31,9 +31,10 @@
 
 public class DataSourceCompactionConfig
 {
+  public static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 * 
1024; // 400MB
+
   // should be synchronized with Tasks.DEFAULT_MERGE_TASK_PRIORITY
   private static final int DEFAULT_COMPACTION_TASK_PRIORITY = 25;
-  private static final long DEFAULT_TARGET_COMPACTION_SIZE_BYTES = 400 * 1024 
* 1024; // 400MB
   private static final int DEFAULT_NUM_TARGET_COMPACTION_SEGMENTS = 150;
   private static final Period DEFAULT_SKIP_OFFSET_FROM_LATEST = new 
Period("P1D");
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to