jihoonson commented on a change in pull request #8257: Add support for parallel 
native indexing with shuffle for perfect rollup
URL: https://github.com/apache/incubator-druid/pull/8257#discussion_r314482631
 
 

 ##########
 File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 ##########
 @@ -324,31 +412,206 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
     }
   }
 
-  private boolean isParallelMode()
+  private void initializeSubTaskCleaner()
   {
-    if (baseFirehoseFactory.isSplittable() && 
ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1) {
-      return true;
+    if (isParallelMode()) {
+      currentSubTaskHolder = new CurrentSubTaskHolder((currentRunnerObject, 
taskConfig) -> {
+        final ParallelIndexTaskRunner runner = (ParallelIndexTaskRunner) 
currentRunnerObject;
+        runner.stopGracefully();
+      });
     } else {
-      return false;
+      currentSubTaskHolder = new CurrentSubTaskHolder((taskObject, taskConfig) 
-> {
+        final IndexTask task = (IndexTask) taskObject;
+        task.stopGracefully(taskConfig);
+      });
     }
+    registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
   }
 
-  @VisibleForTesting
-  void setToolbox(TaskToolbox toolbox)
+  private boolean isParallelMode()
+  {
+    return baseFirehoseFactory.isSplittable() && 
ingestionSchema.getTuningConfig().getMaxNumSubTasks() > 1;
+  }
+
+  /**
+   * Run the single phase parallel indexing for best-effort rollup. In this 
mode, each sub task created by
+   * the supervisor task reads data and generates segments individually.
+   */
+  private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws 
Exception
+  {
+    final ParallelIndexTaskRunner<SinglePhaseSubTask, PushedSegmentsReport> 
runner = createRunner(
+        toolbox,
+        this::createSinglePhaseTaskRunner
+    );
+
+    final TaskState state = runNextPhase(runner);
+    if (state.isSuccess()) {
+      //noinspection ConstantConditions
+      publishSegments(toolbox, runner.getReports());
+    }
+    return TaskStatus.fromCode(getId(), state);
+  }
+
+  /**
+   * Run the multi phase parallel indexing for perfect rollup. In this mode, 
the parallel indexing is currently
+   * executed in two phases.
+   *
+   * - In the first phase, each task partitions input data and stores those 
partitions in local storage.
+   *   - The partition is created based on the segment granularity (primary 
partition key) and the partition dimension
+   *     values in {@link org.apache.druid.indexer.partitions.PartitionsSpec} 
(secondary partition key).
+   *   - Partitioned data is maintained by {@link 
org.apache.druid.indexing.worker.IntermediaryDataManager}.
+   * - In the second phase, each task reads partitioned data from the 
intermediary data server (middleManager
+   *   or indexer) and merges them to create the final segments.
+   */
+  private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws 
Exception
+  {
+    // 1. Partial segment generation phase
+    final ParallelIndexTaskRunner<PartialSegmentGenerateTask, 
GeneratedPartitionsReport> indexingRunner = createRunner(
+        toolbox,
+        this::createPartialSegmentGenerateRunner
+    );
+
+    TaskState state = runNextPhase(indexingRunner);
+    if (state.isFailure()) {
+      return TaskStatus.failure(getId());
+    }
+
+    // 2. Partial segment merge phase
+
+    // partition (interval, partitionId) -> partition locations
+    //noinspection ConstantConditions
+    Map<Pair<Interval, Integer>, List<PartitionLocation>> partitionToLocations 
= groupPartitionLocationsPerPartition(
+        indexingRunner.getReports()
+    );
+    final List<PartialSegmentMergeIOConfig> ioConfigs = createMergeIOConfigs(
+        ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
+        partitionToLocations
+    );
+
+    final ParallelIndexTaskRunner<PartialSegmentMergeTask, 
PushedSegmentsReport> mergeRunner = createRunner(
+        toolbox,
+        tb -> createPartialSegmentMergeRunner(tb, ioConfigs)
+    );
+    state = runNextPhase(mergeRunner);
+    if (state.isSuccess()) {
+      //noinspection ConstantConditions
+      publishSegments(toolbox, mergeRunner.getReports());
+    }
+
+    return TaskStatus.fromCode(getId(), state);
+  }
+
+  private static Map<Pair<Interval, Integer>, List<PartitionLocation>> 
groupPartitionLocationsPerPartition(
+      // subTaskId -> report
+      Map<String, GeneratedPartitionsReport> reports
+  )
   {
-    this.toolbox = toolbox;
+    // partition (interval, partitionId) -> partition locations
+    final Map<Pair<Interval, Integer>, List<PartitionLocation>> 
partitionToLocations = new HashMap<>();
+    for (Entry<String, GeneratedPartitionsReport> entry : reports.entrySet()) {
+      final String subTaskId = entry.getKey();
+      final GeneratedPartitionsReport report = entry.getValue();
+      for (PartitionStat partitionStat : report.getPartitionStats()) {
+        final List<PartitionLocation> locationsOfSamePartition = 
partitionToLocations.computeIfAbsent(
+            Pair.of(partitionStat.getInterval(), 
partitionStat.getPartitionId()),
+            k -> new ArrayList<>()
+        );
+        locationsOfSamePartition.add(
+            new PartitionLocation(
+                partitionStat.getTaskExecutorHost(),
+                partitionStat.getTaskExecutorPort(),
+                subTaskId,
+                partitionStat.getInterval(),
+                partitionStat.getPartitionId()
+            )
+        );
+      }
+    }
+
+    return partitionToLocations;
+  }
+
+  private static List<PartialSegmentMergeIOConfig> createMergeIOConfigs(
+      int totalNumMergeTasks,
+      Map<Pair<Interval, Integer>, List<PartitionLocation>> 
partitionToLocations
+  )
+  {
+    final int numMergeTasks = Math.min(totalNumMergeTasks, 
partitionToLocations.size());
+    LOG.info(
+        "Number of merge tasks is set to [%d] based on totalNumMergeTasks[%d] 
and number of partitions[%d]",
+        numMergeTasks,
+        totalNumMergeTasks,
+        partitionToLocations.size()
+    );
+    // Randomly shuffle partitionIds to evenly distribute partitions of 
potentially different sizes
+    // This will be improved once we collect partition stats properly.
+    // See PartitionStat in GeneratedPartitionsReport.
+    final List<Pair<Interval, Integer>> partitions = new 
ArrayList<>(partitionToLocations.keySet());
+    Collections.shuffle(partitions, ThreadLocalRandom.current());
+    final int numPartitionsPerTask = (int) Math.round(partitions.size() / 
(double) numMergeTasks);
+
+    final List<PartialSegmentMergeIOConfig> assignedPartitionLocations = new 
ArrayList<>(numMergeTasks);
+    for (int i = 0; i < numMergeTasks - 1; i++) {
+      final List<PartitionLocation> assingedToSameTask = partitions
 
 Review comment:
   Thanks, fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to