imply-cheddar commented on a change in pull request #12331:
URL: https://github.com/apache/druid/pull/12331#discussion_r830775165
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.java
##########
@@ -58,6 +84,188 @@ public String getName()
return PHASE_NAME;
}
+ @Override
+ public void collectReport(DimensionDistributionReport report)
+ {
+ // Send an empty report to TaskMonitor
+ super.collectReport(new DimensionDistributionReport(report.getTaskId(),
null));
+
+ // Extract the distributions in a separate thread to unblock HTTP requests
+ if (executor.isShutdown()) {
+ // this should never happen
+ throw new ISE("Executor is already shutdown. Cannot process more
reports.");
+ }
+ executor.submit(() -> extractDistributionsFromReport(report));
+ }
+
+ /**
+ * Map from an interval to PartitionBoundaries calculated by applying the
target
+ * row size on the final StringDistribution. The final distribution for an
+ * interval is obtained by merging the distributions reported by all the
+ * sub-tasks for that interval.
+ */
+ public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(
+ DimensionRangePartitionsSpec partitionsSpec
+ )
+ {
+ waitToProcessPendingReports();
+
+ // Do not proceed if a shutdown has been requested
+ if (getStopReason() != null) {
+ throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s",
getStopReason());
+ }
+
+ // Merge distributions only from succeeded sub-tasks
+ final Set<String> succeededTaskIds = super.getReports().keySet();
+ final Map<Interval, PartitionBoundaries> intervalToPartitions = new
HashMap<>();
+ intervalToTaskIds.forEach(
+ (interval, subTaskIds) -> {
+ final File intervalDir = getIntervalDistributionDir(interval);
+ final StringDistributionMerger merger = new StringSketchMerger();
+ subTaskIds
+ .stream()
+ .filter(succeededTaskIds::contains)
+ .map(subTaskId -> readDistributionFromFile(intervalDir,
subTaskId))
+ .forEach(merger::merge);
+ final StringDistribution mergedDistribution = merger.getResult();
+
+ final PartitionBoundaries partitions;
+ Integer targetRowsPerSegment =
partitionsSpec.getTargetRowsPerSegment();
+ if (targetRowsPerSegment == null) {
+ partitions =
mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
+ } else {
+ partitions =
mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
+ }
+
+ intervalToPartitions.put(interval, partitions);
+ }
+ );
+
+ return intervalToPartitions;
+ }
+
+ /**
+ * Extracts the distributions from the given report and writes them to the
task
+ * temp directory.
+ * <p>
+ * This method is not thread-safe as the reports are processed by a
single-threaded executor.
+ */
+ private void extractDistributionsFromReport(DimensionDistributionReport
report)
+ {
+ log.debug("Started writing distributions from Task ID [%s]",
report.getTaskId());
+
+ if (report.getIntervalToDistribution() == null) {
+ return;
+ }
+ report.getIntervalToDistribution().forEach(
+ (interval, distribution) -> {
+ Set<String> taskIds = intervalToTaskIds.computeIfAbsent(interval, i
-> new HashSet<>());
+ final String subTaskId = report.getTaskId();
+ if (!taskIds.contains(subTaskId)) {
Review comment:
Is there a reason this should ever happen? Seems like if this actually
occurred it would indicate a problem?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.java
##########
@@ -58,6 +84,188 @@ public String getName()
return PHASE_NAME;
}
+ @Override
+ public void collectReport(DimensionDistributionReport report)
+ {
+ // Send an empty report to TaskMonitor
+ super.collectReport(new DimensionDistributionReport(report.getTaskId(),
null));
+
+ // Extract the distributions in a separate thread to unblock HTTP requests
+ if (executor.isShutdown()) {
+ // this should never happen
+ throw new ISE("Executor is already shutdown. Cannot process more
reports.");
+ }
+ executor.submit(() -> extractDistributionsFromReport(report));
+ }
+
+ /**
+ * Map from an interval to PartitionBoundaries calculated by applying the
target
+ * row size on the final StringDistribution. The final distribution for an
+ * interval is obtained by merging the distributions reported by all the
+ * sub-tasks for that interval.
+ */
+ public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(
+ DimensionRangePartitionsSpec partitionsSpec
+ )
+ {
+ waitToProcessPendingReports();
+
+ // Do not proceed if a shutdown has been requested
+ if (getStopReason() != null) {
+ throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s",
getStopReason());
+ }
+
+ // Merge distributions only from succeeded sub-tasks
+ final Set<String> succeededTaskIds = super.getReports().keySet();
+ final Map<Interval, PartitionBoundaries> intervalToPartitions = new
HashMap<>();
+ intervalToTaskIds.forEach(
+ (interval, subTaskIds) -> {
+ final File intervalDir = getIntervalDistributionDir(interval);
+ final StringDistributionMerger merger = new StringSketchMerger();
+ subTaskIds
+ .stream()
+ .filter(succeededTaskIds::contains)
+ .map(subTaskId -> readDistributionFromFile(intervalDir,
subTaskId))
+ .forEach(merger::merge);
+ final StringDistribution mergedDistribution = merger.getResult();
+
+ final PartitionBoundaries partitions;
+ Integer targetRowsPerSegment =
partitionsSpec.getTargetRowsPerSegment();
+ if (targetRowsPerSegment == null) {
+ partitions =
mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
+ } else {
+ partitions =
mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
+ }
+
+ intervalToPartitions.put(interval, partitions);
+ }
+ );
+
+ return intervalToPartitions;
+ }
+
+ /**
+ * Extracts the distributions from the given report and writes them to the
task
+ * temp directory.
+ * <p>
+ * This method is not thread-safe as the reports are processed by a
single-threaded executor.
+ */
+ private void extractDistributionsFromReport(DimensionDistributionReport
report)
+ {
+ log.debug("Started writing distributions from Task ID [%s]",
report.getTaskId());
+
+ if (report.getIntervalToDistribution() == null) {
Review comment:
nit: you are calling the same getter twice, grab the reference and only
call it once. Every once in a while, someone chooses to throw some
side-effects into a getter and these "call twice" sites then become sad. A
valid argument could be made that the person who added the side-effect is to
blame, but that's not necessarily a reason not to be a bit defensive here too.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.java
##########
@@ -58,6 +84,188 @@ public String getName()
return PHASE_NAME;
}
+ @Override
+ public void collectReport(DimensionDistributionReport report)
+ {
+ // Send an empty report to TaskMonitor
+ super.collectReport(new DimensionDistributionReport(report.getTaskId(),
null));
+
+ // Extract the distributions in a separate thread to unblock HTTP requests
+ if (executor.isShutdown()) {
+ // this should never happen
+ throw new ISE("Executor is already shutdown. Cannot process more
reports.");
+ }
+ executor.submit(() -> extractDistributionsFromReport(report));
+ }
+
+ /**
+ * Map from an interval to PartitionBoundaries calculated by applying the
target
+ * row size on the final StringDistribution. The final distribution for an
+ * interval is obtained by merging the distributions reported by all the
+ * sub-tasks for that interval.
+ */
+ public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(
+ DimensionRangePartitionsSpec partitionsSpec
+ )
+ {
+ waitToProcessPendingReports();
+
+ // Do not proceed if a shutdown has been requested
+ if (getStopReason() != null) {
+ throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s",
getStopReason());
+ }
+
+ // Merge distributions only from succeeded sub-tasks
+ final Set<String> succeededTaskIds = super.getReports().keySet();
+ final Map<Interval, PartitionBoundaries> intervalToPartitions = new
HashMap<>();
+ intervalToTaskIds.forEach(
+ (interval, subTaskIds) -> {
+ final File intervalDir = getIntervalDistributionDir(interval);
+ final StringDistributionMerger merger = new StringSketchMerger();
+ subTaskIds
+ .stream()
+ .filter(succeededTaskIds::contains)
+ .map(subTaskId -> readDistributionFromFile(intervalDir,
subTaskId))
+ .forEach(merger::merge);
+ final StringDistribution mergedDistribution = merger.getResult();
+
+ final PartitionBoundaries partitions;
+ Integer targetRowsPerSegment =
partitionsSpec.getTargetRowsPerSegment();
+ if (targetRowsPerSegment == null) {
+ partitions =
mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
+ } else {
+ partitions =
mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
+ }
+
+ intervalToPartitions.put(interval, partitions);
+ }
+ );
+
+ return intervalToPartitions;
+ }
+
+ /**
+ * Extracts the distributions from the given report and writes them to the
task
+ * temp directory.
+ * <p>
+ * This method is not thread-safe as the reports are processed by a
single-threaded executor.
+ */
+ private void extractDistributionsFromReport(DimensionDistributionReport
report)
+ {
+ log.debug("Started writing distributions from Task ID [%s]",
report.getTaskId());
+
+ if (report.getIntervalToDistribution() == null) {
+ return;
+ }
+ report.getIntervalToDistribution().forEach(
+ (interval, distribution) -> {
+ Set<String> taskIds = intervalToTaskIds.computeIfAbsent(interval, i
-> new HashSet<>());
+ final String subTaskId = report.getTaskId();
+ if (!taskIds.contains(subTaskId)) {
+ writeDistributionToFile(interval, subTaskId, distribution);
+ taskIds.add(subTaskId);
+ }
+ }
+ );
+
+ log.debug("Finished writing distributions from Task ID [%s]",
report.getTaskId());
Review comment:
This debug log won't exist in the case that the intervalsToDistribution
map was empty. Perhaps add a debug line to that too?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionParallelIndexTaskRunner.java
##########
@@ -58,6 +84,188 @@ public String getName()
return PHASE_NAME;
}
+ @Override
+ public void collectReport(DimensionDistributionReport report)
+ {
+ // Send an empty report to TaskMonitor
+ super.collectReport(new DimensionDistributionReport(report.getTaskId(),
null));
+
+ // Extract the distributions in a separate thread to unblock HTTP requests
+ if (executor.isShutdown()) {
+ // this should never happen
+ throw new ISE("Executor is already shutdown. Cannot process more
reports.");
+ }
+ executor.submit(() -> extractDistributionsFromReport(report));
+ }
+
+ /**
+ * Map from an interval to PartitionBoundaries calculated by applying the
target
+ * row size on the final StringDistribution. The final distribution for an
+ * interval is obtained by merging the distributions reported by all the
+ * sub-tasks for that interval.
+ */
+ public Map<Interval, PartitionBoundaries> getIntervalToPartitionBoundaries(
+ DimensionRangePartitionsSpec partitionsSpec
+ )
+ {
+ waitToProcessPendingReports();
+
+ // Do not proceed if a shutdown has been requested
+ if (getStopReason() != null) {
+ throw new ISE("DimensionDistributionPhaseRunner has been stopped. %s",
getStopReason());
+ }
+
+ // Merge distributions only from succeeded sub-tasks
+ final Set<String> succeededTaskIds = super.getReports().keySet();
+ final Map<Interval, PartitionBoundaries> intervalToPartitions = new
HashMap<>();
+ intervalToTaskIds.forEach(
+ (interval, subTaskIds) -> {
+ final File intervalDir = getIntervalDistributionDir(interval);
+ final StringDistributionMerger merger = new StringSketchMerger();
+ subTaskIds
+ .stream()
+ .filter(succeededTaskIds::contains)
+ .map(subTaskId -> readDistributionFromFile(intervalDir,
subTaskId))
+ .forEach(merger::merge);
+ final StringDistribution mergedDistribution = merger.getResult();
+
+ final PartitionBoundaries partitions;
+ Integer targetRowsPerSegment =
partitionsSpec.getTargetRowsPerSegment();
+ if (targetRowsPerSegment == null) {
+ partitions =
mergedDistribution.getEvenPartitionsByMaxSize(partitionsSpec.getMaxRowsPerSegment());
+ } else {
+ partitions =
mergedDistribution.getEvenPartitionsByTargetSize(targetRowsPerSegment);
+ }
+
+ intervalToPartitions.put(interval, partitions);
+ }
+ );
+
+ return intervalToPartitions;
+ }
+
+ /**
+ * Extracts the distributions from the given report and writes them to the
task
+ * temp directory.
+ * <p>
+ * This method is not thread-safe as the reports are processed by a
single-threaded executor.
+ */
+ private void extractDistributionsFromReport(DimensionDistributionReport
report)
+ {
+ log.debug("Started writing distributions from Task ID [%s]",
report.getTaskId());
+
+ if (report.getIntervalToDistribution() == null) {
+ return;
+ }
+ report.getIntervalToDistribution().forEach(
+ (interval, distribution) -> {
+ Set<String> taskIds = intervalToTaskIds.computeIfAbsent(interval, i
-> new HashSet<>());
+ final String subTaskId = report.getTaskId();
+ if (!taskIds.contains(subTaskId)) {
+ writeDistributionToFile(interval, subTaskId, distribution);
+ taskIds.add(subTaskId);
+ }
+ }
+ );
+
+ log.debug("Finished writing distributions from Task ID [%s]",
report.getTaskId());
+ }
+
+ /**
+ * Writes the given distribution to the task temp directory.
+ * <p>
+ * If this operation fails, it requests a graceful shutdown of the runner via
+ * {@link #stopGracefully(String)}.
+ */
+ private void writeDistributionToFile(
+ Interval interval,
+ String subTaskId,
+ StringDistribution distribution
+ )
+ {
+ try {
+ File intervalDir = getIntervalDistributionDir(interval);
+ FileUtils.mkdirp(intervalDir);
+
+ File distributionJsonFile = getDistributionJsonFile(intervalDir,
subTaskId);
+ getToolbox().getJsonMapper().writeValue(distributionJsonFile,
distribution);
+ }
+ catch (IOException e) {
+ String errorMsg = StringUtils.format(
+ "Exception while writing distribution file for Interval [%s], Task
ID [%s]",
+ interval,
+ subTaskId
+ );
+ stopGracefully(errorMsg);
+ throw new ISE(e, errorMsg);
+ }
+ }
+
+ private StringDistribution readDistributionFromFile(File intervalDir, String
subTaskId)
+ {
+ try {
+ File distributionJsonFile = getDistributionJsonFile(intervalDir,
subTaskId);
+ return getToolbox().getJsonMapper().readValue(distributionJsonFile,
StringDistribution.class);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "Error while reading distribution for Interval [%s],
Task ID [%s]",
+ intervalDir.getName(), subTaskId
+ );
+ }
+ }
+
+ private File getIntervalDistributionDir(Interval interval)
+ {
+ return new File(tempDistributionsDir, toIntervalString(interval));
+ }
+
+ private File getDistributionJsonFile(File intervalDir, String subTaskId)
+ {
+ return new File(intervalDir, subTaskId);
+ }
+
+ /**
+ * Waits for distributions from pending reports (if any) to be extracted.
+ */
+ private void waitToProcessPendingReports()
+ {
+ log.info("Waiting to extract distributions from sub-task reports.");
+ try {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
Review comment:
if I just read this logic, I wouldn't read it as "oh, it's waiting for
all task reports to happen" I'd read it as "this is a shutdown routine". Given
that the task knows how many sub tasks are happening, it might be nicer to have
a countdown latch initialized with the number of subtasks that this waits on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]