kfaraz commented on a change in pull request #12331:
URL: https://github.com/apache/druid/pull/12331#discussion_r831171244



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.java
##########
@@ -58,6 +84,188 @@ public String getName()
     return PHASE_NAME;
   }
 
+  @Override
+  public void collectReport(DimensionDistributionReport report)
+  {
+    // Send an empty report to TaskMonitor
+    super.collectReport(new DimensionDistributionReport(report.getTaskId(), 
null));
+
+    // Extract the distributions in a separate thread to unblock HTTP requests
+    if (executor.isShutdown()) {
+      // this should never happen
+      throw new ISE("Executor is already shutdown. Cannot process more 
reports.");
+    }
+    executor.submit(() -> extractDistributionsFromReport(report));
+  }
+
+  /**
+   * Map from an interval to PartitionBoundaries calculated by applying the 
target
+   * row size on the final StringDistribution. The final distribution for an
+   * interval is obtained by merging the distributions reported by all the
+   * sub-tasks for that interval.
+   */
+  public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(
+      DimensionRangePartitionsSpec partitionsSpec
+  )
+  {
+    waitToProcessPendingReports();
+
+    // Do not proceed if a shutdown has been requested
+    if (getStopReason() != null) {
+      throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s", 
getStopReason());
+    }
+
+    // Merge distributions only from succeeded sub-tasks
+    final Set<String> succeededTaskIds = super.getReports().keySet();
+    final Map<Interval, PartitionBoundaries> intervalToPartitions = new 
HashMap<>();
+    intervalToTaskIds.forEach(
+        (interval, subTaskIds) -> {
+          final File intervalDir = getIntervalDistributionDir(interval);
+          final StringDistributionMerger merger = new StringSketchMerger();
+          subTaskIds
+              .stream()
+              .filter(succeededTaskIds::contains)
+              .map(subTaskId -> readDistributionFromFile(intervalDir, 
subTaskId))
+              .forEach(merger::merge);
+          final StringDistribution mergedDistribution = merger.getResult();
+
+          final PartitionBoundaries partitions;
+          Integer targetRowsPerSegment = 
partitionsSpec.getTargetRowsPerSegment();
+          if (targetRowsPerSegment == null) {
+            partitions = 
mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
+          } else {
+            partitions = 
mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
+          }
+
+          intervalToPartitions.put(interval, partitions);
+        }
+    );
+
+    return intervalToPartitions;
+  }
+
+  /**
+   * Extracts the distributions from the given report and writes them to the 
task
+   * temp directory.
+   * <p>
+   * This method is not thread-safe as the reports are processed by a 
single-threaded executor.
+   */
+  private void extractDistributionsFromReport(DimensionDistributionReport 
report)
+  {
+    log.debug("Started writing distributions from Task ID [%s]", 
report.getTaskId());
+
+    if (report.getIntervalToDistribution() == null) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to