ccaominh commented on a change in pull request #8925: Parallel indexing single 
dim partitions
URL: https://github.com/apache/incubator-druid/pull/8925#discussion_r352804004
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 ##########
 @@ -519,6 +574,101 @@ private TaskStatus 
runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throw
     return TaskStatus.fromCode(getId(), state);
   }
 
+  private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) 
throws Exception
+  {
+    assertDataSketchesAvailable();
+
+    ParallelIndexTaskRunner<PartialDimensionDistributionTask, 
DimensionDistributionReport> distributionRunner =
+        createRunner(
+            toolbox,
+            this::createPartialDimensionDistributionRunner
+        );
+
+    TaskState distributionState = runNextPhase(distributionRunner);
+    if (distributionState.isFailure()) {
+      return TaskStatus.failure(getId());
+    }
+
+    Map<Interval, String[]> intervalToPartitions =
+        determineAllRangePartitions(distributionRunner.getReports().values());
+
+    if (intervalToPartitions.isEmpty()) {
+      String msg = "No valid rows for single dimension partitioning."
+          + " All rows may have invalid timestamps or multiple dimension 
values.";
+      LOG.warn(msg);
+      return TaskStatus.success(getId(), msg);
+    }
+
+    ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, 
GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
+        createRunner(toolbox, tb -> 
createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions));
+
+    TaskState indexingState = runNextPhase(indexingRunner);
+    if (indexingState.isFailure()) {
+      return TaskStatus.failure(getId());
+    }
+
+    // partition (interval, partitionId) -> partition locations
+    Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> 
partitionToLocations =
+        
groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
+    final List<PartialGenericSegmentMergeIOConfig> ioConfigs = 
createGenericMergeIOConfigs(
+        ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
+        partitionToLocations
+    );
+
+    ParallelIndexTaskRunner<PartialGenericSegmentMergeTask, 
PushedSegmentsReport> mergeRunner = createRunner(
+        toolbox,
+        tb -> createPartialGenericSegmentMergeRunner(tb, ioConfigs)
+    );
+    TaskState mergeState = runNextPhase(mergeRunner);
+    if (mergeState.isSuccess()) {
+      publishSegments(toolbox, mergeRunner.getReports());
+    }
+
+    return TaskStatus.fromCode(getId(), mergeState);
+  }
+
+  private static void assertDataSketchesAvailable()
+  {
+    try {
+      //noinspection ResultOfObjectAllocationIgnored
+      new StringSketch();
+    }
+    catch (Throwable t) {
 
 Review comment:
   Changed to catch `NoClassDefFoundError`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to