gianm commented on code in PR #16911:
URL: https://github.com/apache/druid/pull/16911#discussion_r1722765106
##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -382,29 +388,42 @@ private void runWorkersIfPossible()
@GuardedBy("runWorkersLock")
private void setAllDoneIfPossible()
{
- if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
- // No input data -- generate empty output channels.
- final ClusterByPartitions partitions = getOutputPartitions();
- final List<OutputChannel> channels = new ArrayList<>(partitions.size());
+ try {
+ if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
+ // No input data -- generate empty output channels.
+ final ClusterByPartitions partitions = getOutputPartitions();
+ final List<OutputChannel> channels = new
ArrayList<>(partitions.size());
- for (int partitionNum = 0; partitionNum < partitions.size();
partitionNum++) {
- channels.add(outputChannelFactory.openNilChannel(partitionNum));
- }
+ for (int partitionNum = 0; partitionNum < partitions.size();
partitionNum++) {
+ channels.add(outputChannelFactory.openNilChannel(partitionNum));
+ }
+
+ // OK to use wrap, not wrapReadOnly, because nil channels are already
read-only.
+ allDone.set(OutputChannels.wrap(channels));
+ } else if (rowLimit == 0 && activeProcessors == 0) {
+ // We had a row limit, and got it all the way down to zero.
+ // Generate empty output channels for any partitions that we haven't
written yet.
+ superSorterProgressTracker.markTriviallyComplete();
Review Comment:
Oh, yeah, you're right. I changed this to instead call
`addMergedBatchesForLevel` to "fill in" the empty batches.
##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -382,29 +388,42 @@ private void runWorkersIfPossible()
@GuardedBy("runWorkersLock")
private void setAllDoneIfPossible()
{
- if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
- // No input data -- generate empty output channels.
- final ClusterByPartitions partitions = getOutputPartitions();
- final List<OutputChannel> channels = new ArrayList<>(partitions.size());
+ try {
+ if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
+ // No input data -- generate empty output channels.
+ final ClusterByPartitions partitions = getOutputPartitions();
+ final List<OutputChannel> channels = new
ArrayList<>(partitions.size());
- for (int partitionNum = 0; partitionNum < partitions.size();
partitionNum++) {
- channels.add(outputChannelFactory.openNilChannel(partitionNum));
- }
+ for (int partitionNum = 0; partitionNum < partitions.size();
partitionNum++) {
+ channels.add(outputChannelFactory.openNilChannel(partitionNum));
+ }
+
+ // OK to use wrap, not wrapReadOnly, because nil channels are already
read-only.
+ allDone.set(OutputChannels.wrap(channels));
+ } else if (rowLimit == 0 && activeProcessors == 0) {
+ // We had a row limit, and got it all the way down to zero.
+ // Generate empty output channels for any partitions that we haven't
written yet.
+ superSorterProgressTracker.markTriviallyComplete();
Review Comment:
Oh, yeah, you're right. I changed this to instead call
`addMergedBatchesForLevel` to "fill in" the empty channels.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]