This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7462b0b  Allow missing intervals for Parallel task with hash/range 
partitioning (#10592)
7462b0b is described below

commit 7462b0b953e87890a683a0bee7b7480465450342
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Nov 25 14:50:22 2020 -0800

    Allow missing intervals for Parallel task with hash/range partitioning 
(#10592)
    
    * Allow missing intervals for Parallel task
    
    * fix row filter
    
    * fix tests
    
    * fix log
---
 .../common/actions/SurrogateTaskActionClient.java  |  45 ++++++
 .../task/OverlordCoordinatingSegmentAllocator.java |  48 +++---
 .../batch/parallel/ParallelIndexIngestionSpec.java |   5 +
 .../parallel/ParallelIndexSupervisorTask.java      | 180 +++++++++++++--------
 .../parallel/PartialDimensionCardinalityTask.java  |  31 ++--
 .../parallel/PartialDimensionDistributionTask.java |  31 ++--
 ...HashSegmentGenerateParallelIndexTaskRunner.java |  11 +-
 .../parallel/PartialHashSegmentGenerateTask.java   |  47 ++++--
 .../parallel/PartialRangeSegmentGenerateTask.java  |   8 +-
 .../batch/parallel/PartialSegmentGenerateTask.java |   5 +
 .../batch/parallel/PartialSegmentMergeTask.java    |   4 +
 .../batch/parallel/PerfectRollupWorkerTask.java    |   7 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |   6 +-
 .../AbstractMultiPhaseParallelIndexingTest.java    |   4 +-
 .../parallel/DimensionCardinalityReportTest.java   |  60 +++++--
 ...ashPartitionMultiPhaseParallelIndexingTest.java |  99 +++++++++---
 .../ParallelIndexSupervisorTaskSerdeTest.java      |  19 ---
 .../PartialDimensionCardinalityTaskTest.java       |  13 --
 .../PartialDimensionDistributionTaskTest.java      |  13 --
 .../PartialGenericSegmentMergeTaskTest.java        |  28 ++++
 .../PartialHashSegmentGenerateTaskTest.java        |  65 ++++++++
 .../parallel/PerfectRollupWorkerTaskTest.java      |  11 --
 ...ngePartitionMultiPhaseParallelIndexingTest.java |  23 +--
 23 files changed, 521 insertions(+), 242 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
new file mode 100644
index 0000000..deafd20
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateTaskActionClient.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.actions;
+
+import java.io.IOException;
+
+/**
+ * A {@link TaskActionClient} that wraps a given {@link TaskAction} with 
{@link SurrogateAction}.
+ * All subtasks of {@link 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask}
 must
+ * use this client or wrap taskActions manually.
+ */
+public class SurrogateTaskActionClient implements TaskActionClient
+{
+  private final String supervisorTaskId;
+  private final TaskActionClient delegate;
+
+  public SurrogateTaskActionClient(String supervisorTaskId, TaskActionClient 
delegate)
+  {
+    this.supervisorTaskId = supervisorTaskId;
+    this.delegate = delegate;
+  }
+
+  @Override
+  public <RetType> RetType submit(TaskAction<RetType> taskAction) throws 
IOException
+  {
+    return delegate.submit(new SurrogateAction<>(supervisorTaskId, 
taskAction));
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index 87daaa8..6efd39a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -25,7 +25,8 @@ import 
org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.actions.SurrogateAction;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
 import 
org.apache.druid.indexing.common.task.TaskLockHelper.OverwritingRootGenerationPartitions;
 import 
org.apache.druid.indexing.common.task.batch.parallel.SupervisorTaskAccess;
 import org.apache.druid.java.util.common.ISE;
@@ -58,8 +59,12 @@ public class OverlordCoordinatingSegmentAllocator implements 
SegmentAllocatorFor
       final PartitionsSpec partitionsSpec
   )
   {
+    final TaskActionClient taskActionClient =
+        supervisorTaskAccess == null
+        ? toolbox.getTaskActionClient()
+        : new 
SurrogateTaskActionClient(supervisorTaskAccess.getSupervisorTaskId(), 
toolbox.getTaskActionClient());
     this.internalAllocator = new ActionBasedSegmentAllocator(
-        toolbox.getTaskActionClient(),
+        taskActionClient,
         dataSchema,
         (schema, row, sequenceName, previousSegmentId, 
skipSegmentLineageCheck) -> {
           final GranularitySpec granularitySpec = schema.getGranularitySpec();
@@ -72,34 +77,17 @@ public class OverlordCoordinatingSegmentAllocator 
implements SegmentAllocatorFor
               taskLockHelper,
               interval
           );
-          if (supervisorTaskAccess != null) {
-            return new SurrogateAction<>(
-                supervisorTaskAccess.getSupervisorTaskId(),
-                new SegmentAllocateAction(
-                    schema.getDataSource(),
-                    row.getTimestamp(),
-                    schema.getGranularitySpec().getQueryGranularity(),
-                    schema.getGranularitySpec().getSegmentGranularity(),
-                    sequenceName,
-                    previousSegmentId,
-                    skipSegmentLineageCheck,
-                    partialShardSpec,
-                    taskLockHelper.getLockGranularityToUse()
-                )
-            );
-          } else {
-            return new SegmentAllocateAction(
-                schema.getDataSource(),
-                row.getTimestamp(),
-                schema.getGranularitySpec().getQueryGranularity(),
-                schema.getGranularitySpec().getSegmentGranularity(),
-                sequenceName,
-                previousSegmentId,
-                skipSegmentLineageCheck,
-                partialShardSpec,
-                taskLockHelper.getLockGranularityToUse()
-            );
-          }
+          return new SegmentAllocateAction(
+              schema.getDataSource(),
+              row.getTimestamp(),
+              schema.getGranularitySpec().getQueryGranularity(),
+              schema.getGranularitySpec().getSegmentGranularity(),
+              sequenceName,
+              previousSegmentId,
+              skipSegmentLineageCheck,
+              partialShardSpec,
+              taskLockHelper.getLockGranularityToUse()
+          );
         }
     );
     this.sequenceNameFunction = new 
LinearlyPartitionedSequenceNameFunction(taskId);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
index 06baae4..5d057cd 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.java
@@ -63,6 +63,11 @@ public class ParallelIndexIngestionSpec extends 
IngestionSpec<ParallelIndexIOCon
     this.tuningConfig = tuningConfig == null ? 
ParallelIndexTuningConfig.defaultConfig() : tuningConfig;
   }
 
+  public ParallelIndexIngestionSpec withDataSchema(DataSchema dataSchema)
+  {
+    return new ParallelIndexIngestionSpec(dataSchema, ioConfig, tuningConfig);
+  }
+
   @Override
   @JsonProperty("dataSchema")
   public DataSchema getDataSchema()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 5cbfc01..3d272b4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -188,10 +188,6 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
     if (isGuaranteedRollup(ingestionSchema.getIOConfig(), 
ingestionSchema.getTuningConfig())) {
       
checkPartitionsSpecForForceGuaranteedRollup(ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec());
-
-      if 
(ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
-        throw new ISE("forceGuaranteedRollup is set but intervals is missing 
in granularitySpec");
-      }
     }
 
     this.baseInputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
@@ -290,7 +286,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @VisibleForTesting
   PartialHashSegmentGenerateParallelIndexTaskRunner 
createPartialHashSegmentGenerateRunner(
       TaskToolbox toolbox,
-      Integer numShardsOverride
+      ParallelIndexIngestionSpec ingestionSchema,
+      @Nullable Map<Interval, Integer> intervalToNumShardsOverride
   )
   {
     return new PartialHashSegmentGenerateParallelIndexTaskRunner(
@@ -299,7 +296,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         getGroupId(),
         ingestionSchema,
         getContext(),
-        numShardsOverride
+        intervalToNumShardsOverride
     );
   }
 
@@ -318,7 +315,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @VisibleForTesting
   PartialRangeSegmentGenerateParallelIndexTaskRunner 
createPartialRangeSegmentGenerateRunner(
       TaskToolbox toolbox,
-      Map<Interval, PartitionBoundaries> intervalToPartitions
+      Map<Interval, PartitionBoundaries> intervalToPartitions,
+      ParallelIndexIngestionSpec ingestionSchema
   )
   {
     return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
@@ -334,16 +332,17 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   @VisibleForTesting
   PartialGenericSegmentMergeParallelIndexTaskRunner 
createPartialGenericSegmentMergeRunner(
       TaskToolbox toolbox,
-      List<PartialGenericSegmentMergeIOConfig> ioConfigs
+      List<PartialGenericSegmentMergeIOConfig> ioConfigs,
+      ParallelIndexIngestionSpec ingestionSchema
   )
   {
     return new PartialGenericSegmentMergeParallelIndexTaskRunner(
         toolbox,
         getId(),
         getGroupId(),
-        getIngestionSchema().getDataSchema(),
+        ingestionSchema.getDataSchema(),
         ioConfigs,
-        getIngestionSchema().getTuningConfig(),
+        ingestionSchema.getTuningConfig(),
         getContext()
     );
   }
@@ -529,9 +528,30 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
            : runHashPartitionMultiPhaseParallel(toolbox);
   }
 
+  private static ParallelIndexIngestionSpec 
rewriteIngestionSpecWithIntervalsIfMissing(
+      ParallelIndexIngestionSpec ingestionSchema,
+      Collection<Interval> intervals
+  )
+  {
+    if 
(ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
+      return ingestionSchema
+          .withDataSchema(
+              ingestionSchema.getDataSchema().withGranularitySpec(
+                  ingestionSchema
+                      .getDataSchema()
+                      .getGranularitySpec()
+                      .withIntervals(new ArrayList<>(intervals))
+              )
+          );
+    } else {
+      return ingestionSchema;
+    }
+  }
+
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) 
throws Exception
   {
     TaskState state;
+    ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
 
     if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof 
HashedPartitionsSpec)) {
       // only range and hash partitioning is supported for multiphase parallel 
ingestion, see runMultiPhaseParallel()
@@ -541,49 +561,64 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
       );
     }
 
-    final Integer numShardsOverride;
+    final Map<Interval, Integer> intervalToNumShards;
     HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) 
ingestionSchema.getTuningConfig().getPartitionsSpec();
-    if (partitionsSpec.getNumShards() == null) {
-      // 0. need to determine numShards by scanning the data
-      LOG.info("numShards is unspecified, beginning %s phase.", 
PartialDimensionCardinalityTask.TYPE);
+    final boolean needsInputSampling =
+        partitionsSpec.getNumShards() == null
+        || 
ingestionSchemaToUse.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
+    if (needsInputSampling) {
+      // 0. need to determine intervals and numShards by scanning the data
+      LOG.info("Needs to determine intervals or numShards, beginning %s 
phase.", PartialDimensionCardinalityTask.TYPE);
       ParallelIndexTaskRunner<PartialDimensionCardinalityTask, 
DimensionCardinalityReport> cardinalityRunner =
           createRunner(
               toolbox,
               this::createPartialDimensionCardinalityRunner
           );
 
-      if (cardinalityRunner == null) {
-        throw new ISE("Could not create cardinality runner for hash 
partitioning.");
-      }
-
       state = runNextPhase(cardinalityRunner);
       if (state.isFailure()) {
         return TaskStatus.failure(getId());
       }
 
-      int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() 
== null
-                                       ? 
PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
-                                       : partitionsSpec.getMaxRowsPerSegment();
-      LOG.info("effective maxRowsPerSegment is: " + 
effectiveMaxRowsPerSegment);
+      if (cardinalityRunner.getReports().isEmpty()) {
+        String msg = "No valid rows for hash partitioning."
+                     + " All rows may have invalid timestamps or have been 
filtered out.";
+        LOG.warn(msg);
+        return TaskStatus.success(getId(), msg);
+      }
+
+      if (partitionsSpec.getNumShards() == null) {
+        int effectiveMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() 
== null
+                                         ? 
PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
+                                         : 
partitionsSpec.getMaxRowsPerSegment();
+        LOG.info("effective maxRowsPerSegment is: " + 
effectiveMaxRowsPerSegment);
 
-      if (cardinalityRunner.getReports() == null) {
-        throw new ISE("Could not determine cardinalities for hash 
partitioning.");
+        intervalToNumShards = determineNumShardsFromCardinalityReport(
+            cardinalityRunner.getReports().values(),
+            effectiveMaxRowsPerSegment
+        );
+      } else {
+        intervalToNumShards = CollectionUtils.mapValues(
+            mergeCardinalityReports(cardinalityRunner.getReports().values()),
+            k -> partitionsSpec.getNumShards()
+        );
       }
-      numShardsOverride = determineNumShardsFromCardinalityReport(
-          cardinalityRunner.getReports().values(),
-          effectiveMaxRowsPerSegment
-      );
 
-      LOG.info("Automatically determined numShards: " + numShardsOverride);
+      ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
+          ingestionSchemaToUse,
+          intervalToNumShards.keySet()
+      );
     } else {
-      numShardsOverride = null;
+      // numShards will be determined in PartialHashSegmentGenerateTask
+      intervalToNumShards = null;
     }
 
     // 1. Partial segment generation phase
+    final ParallelIndexIngestionSpec segmentCreateIngestionSpec = 
ingestionSchemaToUse;
     ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, 
GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
         createRunner(
             toolbox,
-            f -> createPartialHashSegmentGenerateRunner(toolbox, 
numShardsOverride)
+            f -> createPartialHashSegmentGenerateRunner(toolbox, 
segmentCreateIngestionSpec, intervalToNumShards)
         );
 
     state = runNextPhase(indexingRunner);
@@ -600,9 +635,10 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         partitionToLocations
     );
 
+    final ParallelIndexIngestionSpec segmentMergeIngestionSpec = 
ingestionSchemaToUse;
     final ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, 
PushedSegmentsReport> mergeRunner = createRunner(
         toolbox,
-        tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
+        tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, 
segmentMergeIngestionSpec)
     );
     state = runNextPhase(mergeRunner);
     if (state.isSuccess()) {
@@ -615,6 +651,7 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
   private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) 
throws Exception
   {
+    ParallelIndexIngestionSpec ingestionSchemaToUse = ingestionSchema;
     ParallelIndexTaskRunner<PartialDimensionDistributionTask, 
DimensionDistributionReport> distributionRunner =
         createRunner(
             toolbox,
@@ -631,13 +668,22 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
 
     if (intervalToPartitions.isEmpty()) {
       String msg = "No valid rows for single dimension partitioning."
-          + " All rows may have invalid timestamps or multiple dimension 
values.";
+                   + " All rows may have invalid timestamps or multiple 
dimension values.";
       LOG.warn(msg);
       return TaskStatus.success(getId(), msg);
     }
 
+    ingestionSchemaToUse = rewriteIngestionSpecWithIntervalsIfMissing(
+        ingestionSchemaToUse,
+        intervalToPartitions.keySet()
+    );
+
+    final ParallelIndexIngestionSpec segmentCreateIngestionSpec = 
ingestionSchemaToUse;
     ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, 
GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
-        createRunner(toolbox, tb -> 
createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions));
+        createRunner(
+            toolbox,
+            tb -> createPartialRangeSegmentGenerateRunner(tb, 
intervalToPartitions, segmentCreateIngestionSpec)
+        );
 
     TaskState indexingState = runNextPhase(indexingRunner);
     if (indexingState.isFailure()) {
@@ -652,9 +698,10 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
         partitionToLocations
     );
 
+    final ParallelIndexIngestionSpec segmentMergeIngestionSpec = 
ingestionSchemaToUse;
     ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, 
PushedSegmentsReport> mergeRunner = createRunner(
         toolbox,
-        tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
+        tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs, 
segmentMergeIngestionSpec)
     );
     TaskState mergeState = runNextPhase(mergeRunner);
     if (mergeState.isSuccess()) {
@@ -664,48 +711,45 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
     return TaskStatus.fromCode(getId(), mergeState);
   }
 
-  @VisibleForTesting
-  public static int determineNumShardsFromCardinalityReport(
-      Collection<DimensionCardinalityReport> reports,
-      int maxRowsPerSegment
-  )
+  private static Map<Interval, Union> 
mergeCardinalityReports(Collection<DimensionCardinalityReport> reports)
   {
-    // aggregate all the sub-reports
     Map<Interval, Union> finalCollectors = new HashMap<>();
     reports.forEach(report -> {
       Map<Interval, byte[]> intervalToCardinality = 
report.getIntervalToCardinalities();
       for (Map.Entry<Interval, byte[]> entry : 
intervalToCardinality.entrySet()) {
-        Union union = finalCollectors.computeIfAbsent(
-            entry.getKey(),
-            (key) -> {
-              return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
-            }
-        );
         HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
-        union.update(entryHll);
+        finalCollectors.computeIfAbsent(
+            entry.getKey(),
+            k -> new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K)
+        ).update(entryHll);
       }
     });
+    return finalCollectors;
+  }
 
-    // determine the highest cardinality in any interval
-    long maxCardinality = 0;
-    for (Union union : finalCollectors.values()) {
-      maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
-    }
-
-    LOG.info("Estimated max cardinality: " + maxCardinality);
-
-    // determine numShards based on maxRowsPerSegment and the highest 
per-interval cardinality
-    long numShards = maxCardinality / maxRowsPerSegment;
-    if (maxCardinality % maxRowsPerSegment != 0) {
-      // if there's a remainder add 1 so we stay under maxRowsPerSegment
-      numShards += 1;
-    }
-    try {
-      return Math.toIntExact(numShards);
-    }
-    catch (ArithmeticException ae) {
-      throw new ISE("Estimated numShards [%s] exceeds integer bounds.", 
numShards);
-    }
+  @VisibleForTesting
+  public static Map<Interval, Integer> determineNumShardsFromCardinalityReport(
+      Collection<DimensionCardinalityReport> reports,
+      int maxRowsPerSegment
+  )
+  {
+    // aggregate all the sub-reports
+    Map<Interval, Union> finalCollectors = mergeCardinalityReports(reports);
+
+    return CollectionUtils.mapValues(
+        finalCollectors,
+        union -> {
+          final double estimatedCardinality = union.getEstimate();
+          // determine numShards based on maxRowsPerSegment and the cardinality
+          final long estimatedNumShards = Math.round(estimatedCardinality / 
maxRowsPerSegment);
+          try {
+            return Math.max(Math.toIntExact(estimatedNumShards), 1);
+          }
+          catch (ArithmeticException ae) {
+            throw new ISE("Estimated numShards [%s] exceeds integer bounds.", 
estimatedNumShards);
+          }
+        }
+    );
   }
 
   private Map<Interval, PartitionBoundaries> 
determineAllRangePartitions(Collection<DimensionDistributionReport> reports)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index f6759ec..48be3e7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.datasketches.hll.HllSketch;
 import org.apache.druid.data.input.InputFormat;
@@ -32,12 +33,12 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -52,11 +53,11 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
 {
   public static final String TYPE = "partial_dimension_cardinality";
-  private static final Logger LOG = new 
Logger(PartialDimensionCardinalityTask.class);
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
@@ -125,10 +126,14 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
-    return tryTimeChunkLock(
-        taskActionClient,
-        
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
-    );
+    if 
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
+      return tryTimeChunkLock(
+          new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+          
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+      );
+    } else {
+      return true;
+    }
   }
 
   @Override
@@ -159,6 +164,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
         tuningConfig.getMaxParseExceptions(),
         tuningConfig.getMaxSavedParseExceptions()
     );
+    final boolean determineIntervals = 
granularitySpec.inputIntervals().isEmpty();
 
     try (
         final CloseableIterator<InputRow> inputRowIterator = 
AbstractBatchIndexTask.inputSourceReader(
@@ -166,7 +172,7 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
             dataSchema,
             inputSource,
             inputFormat,
-            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            determineIntervals ? Objects::nonNull : 
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
             buildSegmentsMeters,
             parseExceptionHandler
         );
@@ -197,8 +203,15 @@ public class PartialDimensionCardinalityTask extends 
PerfectRollupWorkerTask
       InputRow inputRow = inputRowIterator.next();
       // null rows are filtered out by FilteringCloseableInputRowIterator
       DateTime timestamp = inputRow.getTimestamp();
-      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns 
rows with present intervals)
-      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+      final Interval interval;
+      if (granularitySpec.inputIntervals().isEmpty()) {
+        interval = 
granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+      } else {
+        final Optional<Interval> optInterval = 
granularitySpec.bucketInterval(inputRow.getTimestamp());
+        // this interval must exist since it passed the rowFilter
+        assert optInterval.isPresent();
+        interval = optInterval.get();
+      }
       Granularity queryGranularity = granularitySpec.getQueryGranularity();
 
       HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 53019dd..6bec35d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.hash.BloomFilter;
@@ -34,6 +35,7 @@ import org.apache.druid.data.input.Rows;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
@@ -55,6 +57,7 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 /**
@@ -163,10 +166,14 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
-    return tryTimeChunkLock(
-        taskActionClient,
-        
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
-    );
+    if 
(!getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals().isEmpty())
 {
+      return tryTimeChunkLock(
+          new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+          
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+      );
+    } else {
+      return true;
+    }
   }
 
   @Override
@@ -195,6 +202,7 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
         tuningConfig.getMaxParseExceptions(),
         tuningConfig.getMaxSavedParseExceptions()
     );
+    final boolean determineIntervals = 
granularitySpec.inputIntervals().isEmpty();
 
     try (
         final CloseableIterator<InputRow> inputRowIterator = 
AbstractBatchIndexTask.inputSourceReader(
@@ -202,7 +210,7 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
             dataSchema,
             inputSource,
             inputFormat,
-            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            determineIntervals ? Objects::nonNull : 
AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
             buildSegmentsMeters,
             parseExceptionHandler
         );
@@ -243,10 +251,15 @@ public class PartialDimensionDistributionTask extends 
PerfectRollupWorkerTask
         continue;
       }
 
-      DateTime timestamp = inputRow.getTimestamp();
-
-      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns 
rows with present intervals)
-      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+      final Interval interval;
+      if (granularitySpec.inputIntervals().isEmpty()) {
+        interval = 
granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
+      } else {
+        final Optional<Interval> optInterval = 
granularitySpec.bucketInterval(inputRow.getTimestamp());
+        // this interval must exist since it passed the rowFilter
+        assert optInterval.isPresent();
+        interval = optInterval.get();
+      }
       String partitionDimensionValue = 
Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
 
       if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
index 39024a1..8358606 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
@@ -21,7 +21,9 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 /**
@@ -32,7 +34,8 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
 {
   private static final String PHASE_NAME = "partial segment generation";
 
-  private Integer numShardsOverride;
+  @Nullable
+  private final Map<Interval, Integer> intervalToNumShardsOverride;
 
   PartialHashSegmentGenerateParallelIndexTaskRunner(
       TaskToolbox toolbox,
@@ -40,11 +43,11 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
       String groupId,
       ParallelIndexIngestionSpec ingestionSchema,
       Map<String, Object> context,
-      Integer numShardsOverride
+      @Nullable Map<Interval, Integer> intervalToNumShardsOverride
   )
   {
     super(toolbox, taskId, groupId, ingestionSchema, context);
-    this.numShardsOverride = numShardsOverride;
+    this.intervalToNumShardsOverride = intervalToNumShardsOverride;
   }
 
   @Override
@@ -82,7 +85,7 @@ class PartialHashSegmentGenerateParallelIndexTaskRunner
             numAttempts,
             subTaskIngestionSpec,
             context,
-            numShardsOverride
+            intervalToNumShardsOverride
         );
       }
     };
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 3b1f7ba..b252e5d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -57,7 +58,8 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
   private final String supervisorTaskId;
-  private final Integer numShardsOverride;
+  @Nullable
+  private final Map<Interval, Integer> intervalToNumShardsOverride;
 
   @JsonCreator
   public PartialHashSegmentGenerateTask(
@@ -69,7 +71,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
       @JsonProperty("numAttempts") final int numAttempts, // zero-based 
counting
       @JsonProperty(PROP_SPEC) final ParallelIndexIngestionSpec 
ingestionSchema,
       @JsonProperty("context") final Map<String, Object> context,
-      @Nullable @JsonProperty("numShardsOverride") final Integer 
numShardsOverride
+      @JsonProperty("intervalToNumShardsOverride") @Nullable final 
Map<Interval, Integer> intervalToNumShardsOverride
   )
   {
     super(
@@ -85,7 +87,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
     this.numAttempts = numAttempts;
     this.ingestionSchema = ingestionSchema;
     this.supervisorTaskId = supervisorTaskId;
-    this.numShardsOverride = numShardsOverride;
+    this.intervalToNumShardsOverride = intervalToNumShardsOverride;
   }
 
   @JsonProperty
@@ -106,6 +108,13 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
     return supervisorTaskId;
   }
 
+  @Nullable
+  @JsonProperty
+  public Map<Interval, Integer> getIntervalToNumShardsOverride()
+  {
+    return intervalToNumShardsOverride;
+  }
+
   @Override
   public String getType()
   {
@@ -116,7 +125,7 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
     return tryTimeChunkLock(
-        taskActionClient,
+        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
         
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
     );
   }
@@ -134,7 +143,11 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
         getId(),
         granularitySpec,
         new SupervisorTaskAccess(supervisorTaskId, taskClient),
-        createHashPartitionAnalysisFromPartitionsSpec(granularitySpec, 
partitionsSpec, numShardsOverride)
+        createHashPartitionAnalysisFromPartitionsSpec(
+            granularitySpec,
+            partitionsSpec,
+            intervalToNumShardsOverride
+        )
     );
   }
 
@@ -170,22 +183,24 @@ public class PartialHashSegmentGenerateTask extends 
PartialSegmentGenerateTask<G
   public static HashPartitionAnalysis 
createHashPartitionAnalysisFromPartitionsSpec(
       GranularitySpec granularitySpec,
       @Nonnull HashedPartitionsSpec partitionsSpec,
-      @Nullable Integer numShardsOverride
+      @Nullable Map<Interval, Integer> intervalToNumShardsOverride
   )
   {
-    final SortedSet<Interval> intervals = 
granularitySpec.bucketIntervals().get();
+    final HashPartitionAnalysis partitionAnalysis = new 
HashPartitionAnalysis(partitionsSpec);
 
-    final int numBucketsPerInterval;
-    if (numShardsOverride != null) {
-      numBucketsPerInterval = numShardsOverride;
+    if (intervalToNumShardsOverride != null) {
+      // Some intervals populated from granularitySpec can be missing in 
intervalToNumShardsOverride
+      // because intervalToNumShardsOverride contains only the intervals which 
exist in input data.
+      // We only care about the intervals in intervalToNumShardsOverride here.
+      intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
     } else {
-      numBucketsPerInterval = partitionsSpec.getNumShards() == null
-                              ? 1
-                              : partitionsSpec.getNumShards();
-    }
+      final SortedSet<Interval> intervals = 
granularitySpec.bucketIntervals().get();
+      final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
+                                        ? 1
+                                        : partitionsSpec.getNumShards();
 
-    final HashPartitionAnalysis partitionAnalysis = new 
HashPartitionAnalysis(partitionsSpec);
-    intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, 
numBucketsPerInterval));
+      intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, 
numBucketsPerInterval));
+    }
     return partitionAnalysis;
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 57978f4..98c84cf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
 import org.apache.druid.indexing.common.task.SegmentAllocators;
@@ -135,9 +136,12 @@ public class PartialRangeSegmentGenerateTask extends 
PartialSegmentGenerateTask<
   }
 
   @Override
-  public boolean isReady(TaskActionClient taskActionClient)
+  public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
-    return true;
+    return tryTimeChunkLock(
+        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+        
getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index ec8530b..a69827c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -81,6 +82,10 @@ abstract class PartialSegmentGenerateTask<T extends 
GeneratedPartitionsReport> e
         context
     );
 
+    Preconditions.checkArgument(
+        
!ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(),
+        "Missing intervals in granularitySpec"
+    );
     this.ingestionSchema = ingestionSchema;
     this.supervisorTaskId = supervisorTaskId;
     this.inputRowIteratorBuilder = inputRowIteratorBuilder;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 099b24a..fad7b07 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -101,6 +101,10 @@ abstract class PartialSegmentMergeTask<S extends 
ShardSpec, P extends PartitionL
         context
     );
 
+    Preconditions.checkArgument(
+        !dataSchema.getGranularitySpec().inputIntervals().isEmpty(),
+        "Missing intervals in granularitySpec"
+    );
     this.ioConfig = ioConfig;
     this.numAttempts = numAttempts;
     this.supervisorTaskId = supervisorTaskId;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
index 7cce1d3..792182d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTask.java
@@ -62,12 +62,7 @@ abstract class PerfectRollupWorkerTask extends 
AbstractBatchIndexTask
 
     checkPartitionsSpec(tuningConfig.getGivenOrDefaultPartitionsSpec());
 
-    granularitySpec = dataSchema.getGranularitySpec();
-    Preconditions.checkArgument(
-        !granularitySpec.inputIntervals().isEmpty(),
-        "Missing intervals in granularitySpec"
-    );
-
+    this.granularitySpec = dataSchema.getGranularitySpec();
     this.dataSchema = dataSchema;
     this.tuningConfig = tuningConfig;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 543de61..2cc9f72 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.BatchAppenderators;
@@ -149,7 +150,10 @@ public class SinglePhaseSubTask extends 
AbstractBatchIndexTask
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
-    return determineLockGranularityAndTryLock(taskActionClient, 
ingestionSchema.getDataSchema().getGranularitySpec());
+    return determineLockGranularityAndTryLock(
+        new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
+        ingestionSchema.getDataSchema().getGranularitySpec()
+    );
   }
 
   @JsonProperty
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a6e2806..b57cede 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
@@ -71,6 +72,7 @@ import java.util.Set;
 abstract class AbstractMultiPhaseParallelIndexingTest extends 
AbstractParallelIndexSupervisorTaskTest
 {
   protected static final String DATASOURCE = "dataSource";
+  protected static final Granularity SEGMENT_GRANULARITY = Granularities.DAY;
 
   private static final ScanQueryRunnerFactory SCAN_QUERY_RUNNER_FACTORY = new 
ScanQueryRunnerFactory(
       new ScanQueryQueryToolChest(
@@ -176,7 +178,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
   )
   {
     GranularitySpec granularitySpec = new UniformGranularitySpec(
-        Granularities.DAY,
+        SEGMENT_GRANULARITY,
         Granularities.MINUTE,
         interval == null ? null : Collections.singletonList(interval)
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
index 102b5f8..29bbe5c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
@@ -109,34 +109,74 @@ public class DimensionCardinalityReportTest
     reports.add(report3);
 
     // first interval in test has cardinality 4
-    int numShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+    Map<Interval, Integer> intervalToNumShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
         reports,
         1
     );
-    Assert.assertEquals(4L, numShards);
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            4,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
 
-    numShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+    intervalToNumShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
         reports,
         2
     );
-    Assert.assertEquals(2L, numShards);
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            2,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
 
-    numShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+    intervalToNumShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
         reports,
         3
     );
-    Assert.assertEquals(2L, numShards);
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
 
-    numShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+    intervalToNumShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
         reports,
         4
     );
-    Assert.assertEquals(1L, numShards);
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
 
-    numShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+    intervalToNumShards = 
ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
         reports,
         5
     );
-    Assert.assertEquals(1L, numShards);
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index 2689584..e725ab9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -31,8 +31,10 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -86,35 +88,44 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
   private static final Interval INTERVAL_TO_INDEX = 
Intervals.of("2017-12/P1M");
 
   @Parameterized.Parameters(
-      name = "lockGranularity={0}, useInputFormatApi={1}, 
maxNumConcurrentSubTasks={2}, numShards={3}"
+      name = "lockGranularity={0}, useInputFormatApi={1}, 
maxNumConcurrentSubTasks={2}, intervalToIndex={3}, numShards={4}"
   )
   public static Iterable<Object[]> constructorFeeder()
   {
     return ImmutableList.of(
-        new Object[]{LockGranularity.TIME_CHUNK, false, 2, 2},
-        new Object[]{LockGranularity.TIME_CHUNK, true, 2, 2},
-        new Object[]{LockGranularity.TIME_CHUNK, true, 1, 2},
-        new Object[]{LockGranularity.SEGMENT, true, 2, 2},
-        new Object[]{LockGranularity.TIME_CHUNK, true, 2, null},
-        new Object[]{LockGranularity.TIME_CHUNK, true, 1, null},
-        new Object[]{LockGranularity.SEGMENT, true, 2, null}
+        new Object[]{LockGranularity.TIME_CHUNK, false, 2, INTERVAL_TO_INDEX, 
2},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, 
2},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, 2},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, 
2},
+        new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, 2},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 2, INTERVAL_TO_INDEX, 
null},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 2, null, null},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 1, INTERVAL_TO_INDEX, 
null},
+        new Object[]{LockGranularity.SEGMENT, true, 2, INTERVAL_TO_INDEX, null}
     );
   }
 
   private final int maxNumConcurrentSubTasks;
+  @Nullable
+  private final Interval intervalToIndex;
+  @Nullable
   private final Integer numShards;
 
   private File inputDir;
+  // sorted input intervals
+  private List<Interval> inputIntervals;
 
   public HashPartitionMultiPhaseParallelIndexingTest(
       LockGranularity lockGranularity,
       boolean useInputFormatApi,
       int maxNumConcurrentSubTasks,
+      @Nullable Interval intervalToIndex,
       @Nullable Integer numShards
   )
   {
     super(lockGranularity, useInputFormatApi);
     this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+    this.intervalToIndex = intervalToIndex;
     this.numShards = numShards;
   }
 
@@ -122,6 +133,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
   public void setup() throws IOException
   {
     inputDir = temporaryFolder.newFolder("data");
+    final Set<Interval> intervals = new HashSet<>();
     // set up data
     for (int i = 0; i < 10; i++) {
       try (final Writer writer =
@@ -129,6 +141,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
         for (int j = 0; j < 10; j++) {
           writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j 
+ 1, i + 10, i));
           writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", j 
+ 2, i + 11, i));
+          
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d",
 j + 1))));
+          
intervals.add(SEGMENT_GRANULARITY.bucket(DateTimes.of(StringUtils.format("2017-12-%d",
 j + 2))));
         }
       }
     }
@@ -139,33 +153,70 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
         writer.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", i + 
1, i + 10, i));
       }
     }
+    inputIntervals = new ArrayList<>(intervals);
+    inputIntervals.sort(Comparators.intervalsByStartThenEnd());
   }
 
   @Test
   public void testRun() throws Exception
   {
+    final Integer maxRowsPerSegment = numShards == null ? 10 : null;
     final Set<DataSegment> publishedSegments = runTestTask(
-        new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", 
"dim2")),
+        new HashedPartitionsSpec(
+            maxRowsPerSegment,
+            numShards,
+            ImmutableList.of("dim1", "dim2")
+        ),
         TaskState.SUCCESS,
         false
     );
 
-    // we don't specify maxRowsPerSegment so it defaults to 
DEFAULT_MAX_ROWS_PER_SEGMENT,
-    // which is 5 million, so assume that there will only be 1 shard if 
numShards is not set.
-    int expectedSegmentCount = numShards != null ? numShards : 1;
-
-    assertHashedPartition(publishedSegments, expectedSegmentCount);
+    final Map<Interval, Integer> expectedIntervalToNumSegments = 
computeExpectedIntervalToNumSegments(
+        maxRowsPerSegment,
+        numShards
+    );
+    assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
   }
 
   @Test
   public void testRunWithHashPartitionFunction() throws Exception
   {
+    final Integer maxRowsPerSegment = numShards == null ? 10 : null;
     final Set<DataSegment> publishedSegments = runTestTask(
-        new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"), 
HashPartitionFunction.MURMUR3_32_ABS),
+        new HashedPartitionsSpec(
+            maxRowsPerSegment,
+            numShards,
+            ImmutableList.of("dim1", "dim2"),
+            HashPartitionFunction.MURMUR3_32_ABS
+        ),
         TaskState.SUCCESS,
         false
     );
-    assertHashedPartition(publishedSegments, 2);
+    final Map<Interval, Integer> expectedIntervalToNumSegments = 
computeExpectedIntervalToNumSegments(
+        maxRowsPerSegment,
+        numShards
+    );
+    assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
+  }
+
+  private Map<Interval, Integer> computeExpectedIntervalToNumSegments(
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Integer numShards
+  )
+  {
+    final Map<Interval, Integer> expectedIntervalToNumSegments = new 
HashMap<>();
+    for (int i = 0; i < inputIntervals.size(); i++) {
+      if (numShards == null) {
+        if (i == 0 || i == inputIntervals.size() - 1) {
+          expectedIntervalToNumSegments.put(inputIntervals.get(i), 
Math.round((float) 10 / maxRowsPerSegment));
+        } else {
+          expectedIntervalToNumSegments.put(inputIntervals.get(i), 
Math.round((float) 20 / maxRowsPerSegment));
+        }
+      } else {
+        expectedIntervalToNumSegments.put(inputIntervals.get(i), numShards);
+      }
+    }
+    return expectedIntervalToNumSegments;
   }
 
   @Test
@@ -236,7 +287,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
           DIMENSIONS_SPEC,
           INPUT_FORMAT,
           null,
-          INTERVAL_TO_INDEX,
+          intervalToIndex,
           inputDir,
           "test_*",
           partitionsSpec,
@@ -250,7 +301,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
           null,
           null,
           PARSE_SPEC,
-          INTERVAL_TO_INDEX,
+          intervalToIndex,
           inputDir,
           "test_*",
           partitionsSpec,
@@ -261,15 +312,21 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
     }
   }
 
-  private void assertHashedPartition(Set<DataSegment> publishedSegments, int 
expectedNumSegments) throws IOException
+  private void assertHashedPartition(
+      Set<DataSegment> publishedSegments,
+      Map<Interval, Integer> expectedIntervalToNumSegments
+  ) throws IOException
   {
     final Map<Interval, List<DataSegment>> intervalToSegments = new 
HashMap<>();
     publishedSegments.forEach(
         segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k 
-> new ArrayList<>()).add(segment)
     );
+    Assert.assertEquals(new HashSet<>(inputIntervals), 
intervalToSegments.keySet());
     final File tempSegmentDir = temporaryFolder.newFolder();
-    for (List<DataSegment> segmentsInInterval : intervalToSegments.values()) {
-      Assert.assertEquals(expectedNumSegments, segmentsInInterval.size());
+    for (Entry<Interval, List<DataSegment>> entry : 
intervalToSegments.entrySet()) {
+      Interval interval = entry.getKey();
+      List<DataSegment> segmentsInInterval = entry.getValue();
+      
Assert.assertEquals(expectedIntervalToNumSegments.get(interval).intValue(), 
segmentsInInterval.size());
       for (DataSegment segment : segmentsInInterval) {
         Assert.assertSame(HashBasedNumberedShardSpec.class, 
segment.getShardSpec().getClass());
         final HashBasedNumberedShardSpec shardSpec = 
(HashBasedNumberedShardSpec) segment.getShardSpec();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index a80e3a8..b4a7125 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -92,25 +92,6 @@ public class ParallelIndexSupervisorTaskSerdeTest
   }
 
   @Test
-  public void forceGuaranteedRollupWithMissingIntervals()
-  {
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage(
-        "forceGuaranteedRollup is set but intervals is missing in 
granularitySpec"
-    );
-
-    Integer numShards = 2;
-    new ParallelIndexSupervisorTaskBuilder()
-        .ingestionSpec(
-            new ParallelIndexIngestionSpecBuilder()
-                .forceGuaranteedRollup(true)
-                .partitionsSpec(new HashedPartitionsSpec(null, numShards, 
null))
-                .build()
-        )
-        .build();
-  }
-
-  @Test
   public void forceGuaranteedRollupWithHashPartitionsMissingNumShards()
   {
     Integer numShards = null;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index 0ad4dde..0d65f86 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -109,19 +109,6 @@ public class PartialDimensionCardinalityTaskTest
     }
 
     @Test
-    public void requiresGranularitySpecInputIntervals()
-    {
-      exception.expect(IllegalArgumentException.class);
-      exception.expectMessage("Missing intervals in granularitySpec");
-
-      DataSchema dataSchema = 
ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
-
-      new PartialDimensionCardinalityTaskBuilder()
-          .dataSchema(dataSchema)
-          .build();
-    }
-
-    @Test
     public void serializesDeserializes()
     {
       PartialDimensionCardinalityTask task = new 
PartialDimensionCardinalityTaskBuilder()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 87de8c2..7dbb6e3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -112,19 +112,6 @@ public class PartialDimensionDistributionTaskTest
     }
 
     @Test
-    public void requiresGranularitySpecInputIntervals()
-    {
-      exception.expect(IllegalArgumentException.class);
-      exception.expectMessage("Missing intervals in granularitySpec");
-
-      DataSchema dataSchema = 
ParallelIndexTestingFactory.createDataSchema(Collections.emptyList());
-
-      new PartialDimensionDistributionTaskBuilder()
-          .dataSchema(dataSchema)
-          .build();
-    }
-
-    @Test
     public void serializesDeserializes()
     {
       PartialDimensionDistributionTask task = new 
PartialDimensionDistributionTaskBuilder()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
index 71a4858..a579c18 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
@@ -24,7 +24,9 @@ import org.apache.druid.segment.TestHelper;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
 
@@ -54,6 +56,9 @@ public class PartialGenericSegmentMergeTaskTest extends 
AbstractParallelIndexSup
               .build()
       );
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   private PartialGenericSegmentMergeTask target;
 
   @Before
@@ -82,4 +87,27 @@ public class PartialGenericSegmentMergeTaskTest extends 
AbstractParallelIndexSup
     String id = target.getId();
     Assert.assertThat(id, 
Matchers.startsWith(PartialGenericSegmentMergeTask.TYPE));
   }
+
+  @Test
+  public void requiresGranularitySpecInputIntervals()
+  {
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Missing intervals in granularitySpec");
+
+    new PartialGenericSegmentMergeTask(
+        ParallelIndexTestingFactory.AUTOMATIC_ID,
+        ParallelIndexTestingFactory.GROUP_ID,
+        ParallelIndexTestingFactory.TASK_RESOURCE,
+        ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
+        ParallelIndexTestingFactory.NUM_ATTEMPTS,
+        new PartialGenericSegmentMergeIngestionSpec(
+            ParallelIndexTestingFactory.createDataSchema(null),
+            IO_CONFIG,
+            new ParallelIndexTestingFactory.TuningConfigBuilder()
+                .partitionsSpec(PARTITIONS_SPEC)
+                .build()
+        ),
+        ParallelIndexTestingFactory.CONTEXT
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
index ac32c81..845fc44 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -33,10 +34,13 @@ import org.hamcrest.Matchers;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.util.List;
+import java.util.Map;
 
 public class PartialHashSegmentGenerateTaskTest
 {
@@ -48,6 +52,9 @@ public class PartialHashSegmentGenerateTaskTest
       
ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
   );
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   private PartialHashSegmentGenerateTask target;
 
   @Before
@@ -102,4 +109,62 @@ public class PartialHashSegmentGenerateTaskTest
       Assert.assertEquals(expectedNumBuckets, 
partitionAnalysis.getBucketAnalysis(interval).intValue());
     }
   }
+
+  @Test
+  public void 
testCreateHashPartitionAnalysisFromPartitionsSpecWithNumShardsMap()
+  {
+    final List<Interval> intervals = ImmutableList.of(
+        Intervals.of("2020-01-01/2020-01-02"),
+        Intervals.of("2020-01-02/2020-01-03"),
+        Intervals.of("2020-01-03/2020-01-04")
+    );
+    final Map<Interval, Integer> intervalToNumShards = ImmutableMap.of(
+        Intervals.of("2020-01-01/2020-01-02"),
+        1,
+        Intervals.of("2020-01-02/2020-01-03"),
+        2,
+        Intervals.of("2020-01-03/2020-01-04"),
+        3
+    );
+    final HashPartitionAnalysis partitionAnalysis = 
PartialHashSegmentGenerateTask
+        .createHashPartitionAnalysisFromPartitionsSpec(
+            new UniformGranularitySpec(
+                Granularities.DAY,
+                Granularities.NONE,
+                intervals
+            ),
+            new HashedPartitionsSpec(null, null, null),
+            intervalToNumShards
+        );
+    Assert.assertEquals(intervals.size(), 
partitionAnalysis.getNumTimePartitions());
+    for (Interval interval : intervals) {
+      Assert.assertEquals(
+          intervalToNumShards.get(interval).intValue(),
+          partitionAnalysis.getBucketAnalysis(interval).intValue()
+      );
+    }
+  }
+
+  @Test
+  public void requiresGranularitySpecInputIntervals()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Missing intervals in granularitySpec");
+
+    new PartialHashSegmentGenerateTask(
+        ParallelIndexTestingFactory.AUTOMATIC_ID,
+        ParallelIndexTestingFactory.GROUP_ID,
+        ParallelIndexTestingFactory.TASK_RESOURCE,
+        ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
+        ParallelIndexTestingFactory.NUM_ATTEMPTS,
+        ParallelIndexTestingFactory.createIngestionSpec(
+            new LocalInputSource(new File("baseDir"), "filer"),
+            new JsonInputFormat(null, null, null),
+            new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
+            ParallelIndexTestingFactory.createDataSchema(null)
+        ),
+        ParallelIndexTestingFactory.CONTEXT,
+        null
+    );
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
index af4cdf3..783aadc 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PerfectRollupWorkerTaskTest.java
@@ -64,17 +64,6 @@ public class PerfectRollupWorkerTaskTest
   }
 
   @Test
-  public void requiresGranularitySpecInputIntervals()
-  {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("Missing intervals in granularitySpec");
-
-    new PerfectRollupWorkerTaskBuilder()
-        .granularitySpecInputIntervals(Collections.emptyList())
-        .build();
-  }
-
-  @Test
   public void succeedsWithValidPartitionsSpec()
   {
     new PerfectRollupWorkerTaskBuilder().build();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 20e4f34..598aed9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -108,15 +108,16 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
       0
   );
 
-  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, 
maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, 
maxNumConcurrentSubTasks={2}, useMultiValueDim={3}, intervalToIndex={4}")
   public static Iterable<Object[]> constructorFeeder()
   {
     return ImmutableList.of(
-        new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM},
-        new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM},
-        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM},
-        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, 
!USE_MULTIVALUE_DIM},  // will spawn subtask
-        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, 
USE_MULTIVALUE_DIM}  // expected to fail
+        new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+        new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+        new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM, null},
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, 
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, 
!USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX},  // will spawn subtask
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, 
USE_MULTIVALUE_DIM, INTERVAL_TO_INDEX}  // expected to fail
     );
   }
 
@@ -132,17 +133,21 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
 
   private final int maxNumConcurrentSubTasks;
   private final boolean useMultivalueDim;
+  @Nullable
+  private final Interval intervalToIndex;
 
   public RangePartitionMultiPhaseParallelIndexingTest(
       LockGranularity lockGranularity,
       boolean useInputFormatApi,
       int maxNumConcurrentSubTasks,
-      boolean useMultivalueDim
+      boolean useMultivalueDim,
+      @Nullable Interval intervalToIndex
   )
   {
     super(lockGranularity, useInputFormatApi);
     this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
     this.useMultivalueDim = useMultivalueDim;
+    this.intervalToIndex = intervalToIndex;
   }
 
   @Before
@@ -309,7 +314,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
           DIMENSIONS_SPEC,
           INPUT_FORMAT,
           null,
-          INTERVAL_TO_INDEX,
+          intervalToIndex,
           inputDir,
           TEST_FILE_NAME_PREFIX + "*",
           partitionsSpec,
@@ -323,7 +328,7 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
           null,
           null,
           PARSE_SPEC,
-          INTERVAL_TO_INDEX,
+          intervalToIndex,
           inputDir,
           TEST_FILE_NAME_PREFIX + "*",
           partitionsSpec,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to