paul-rogers commented on code in PR #12848:
URL: https://github.com/apache/druid/pull/12848#discussion_r937150038


##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableFrameChannel.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+/**
+ * Interface for reading a sequence of frames. Supports nonblocking reads 
through the {@link #canRead()} and
+ * {@link #readabilityFuture()} methods.
+ *
+ * May be implemented using an in-memory queue, disk file, stream, etc.
+ */
+public interface ReadableFrameChannel
+{
+  /**
+   * Returns whether this channel is finished. Finished channels will not 
generate any further frames or errors.
+   *
+   * Generally, once you discover that a channel is finished, you should call 
{@link #doneReading()} and then
+   * discard it.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean isFinished();
+
+  /**
+   * Returns whether this channel has a frame or error condition currently 
available. If this method returns true, then
+   * you can call {@link #read()} to retrieve the frame or error.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean canRead();
+
+  /**
+   * Returns the next available frame from this channel.
+   *
+   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a frame or
+   * error available.
+   *
+   * @throws java.util.NoSuchElementException if there is no frame currently 
available
+   */
+  Frame read();

Review Comment:
   I see the point, and the logic is clear. Perhaps the enum is overkill. 
Still, the minimize-steps-in-the-inner-loop code I had in mind is:
   
   ```java
     data = channel.read();
     if (data != null) {
       // handle next frame
     } else if (channel.isFinished()) { 
       // handle eof 
     } else {
       // no data available, hang out for a while
     }
   ```
   
   But, I suppose I'm guilty of pre-mature optimization: we don't know if this 
actually _is_ an inner loop, so it's fine as is.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A FrameProcessor is like an incremental version of Runnable that operates 
on {@link ReadableFrameChannel} and
+ * {@link WritableFrameChannel}.
+ *
+ * It is designed to enable interleaved non-blocking work on a fixed-size 
thread pool. Typically, this is done using
+ * an instance of {@link FrameProcessorExecutor}.
+ */
+public interface FrameProcessor<T>
+{
+  /**
+   * List of input channels. The positions of channels in this list are used 
to build the {@code readableInputs} set
+   * provided to {@link #runIncrementally}.
+   */
+  List<ReadableFrameChannel> inputChannels();
+
+  /**
+   * List of output channels.
+   */
+  List<WritableFrameChannel> outputChannels();
+
+  /**
+   * Runs some of the algorithm, without blocking, and either returns a value 
or a set of input channels
+   * to wait for. This method is called by {@link 
FrameProcessorExecutor#runFully} when all output channels are
+   * writable. Therefore, it is guaranteed that each output channel can accept 
at least one frame.
+   *
+   * This method must not read more than one frame from each readable input 
channel, and must not write more than one
+   * frame to each output channel.
+   *
+   * @param readableInputs channels from {@link #inputChannels()} that are 
either finished or ready to read.
+   *
+   * @return either a final return value or a set of input channels to wait 
for. Must be nonnull.
+   */
+  ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException;
+
+  /**
+   * Cleans up resources used by this worker, including signalling to input 
and output channels that we are
+   * done reading and writing, via {@link ReadableFrameChannel#doneReading()} 
and
+   * {@link WritableFrameChannel#doneWriting()}.
+   *
+   * This method may be called before the worker reports completion via {@link 
#runIncrementally}, especially in
+   * cases of cancellation.
+   */
+  void cleanup() throws IOException;
+}

Review Comment:
   We are free to call a method `close()` without implementing `Closeable`. 
Implementing `Closeable` is really only needed when we want to do the closing 
generically, as in a `Closer`. Given the special naming, my guess is we don't 
close these objects generically?
   
   Rather, the thought here was that we all can predict what `close()` does, so 
it is a handy way to communicate to readers the purpose of the function.
   
   At least in Eclipse, if I ask to "show all reference", I get the references 
to only this one class -- if the class doesn't implement `Closeable`.
   
   Again, just a minor point; the code works either way and we can always 
change things later, if there is a reason.



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Channel backed by an {@link InputStream}.
+ *
+ * Frame channels are expected to be nonblocking, but InputStreams cannot be 
read in nonblocking fashion.
+ * This implementation deals with that by using an {@link ExecutorService} to 
read from the stream in a
+ * separate thread.
+ */
+public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
+{
+  private final InputStream inputStream;
+  private final ReadableByteChunksFrameChannel delegate;
+  private final ExecutorService executorService;
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final byte[] buffer = new byte[8 * 1024];
+
+  @GuardedBy("lock")
+  private long totalInputStreamBytesRead = 0;
+
+  @GuardedBy("lock")
+  private boolean inputStreamFinished = false;
+
+  @GuardedBy("lock")
+  private boolean inputStreamError = false;

Review Comment:
   Sure. I just dislike the tedium of watching the debugger step over those 
lines when debugging. And, when optimizing the heck out of an inner loop, I 
tend to chuck out the nice-to-haves. But, it is obviously a trivial point.



##########
processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.file;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Response object for {@link FrameFileHttpResponseHandler}.
+ */
+public class FrameFilePartialFetch
+{
+  private final AtomicLong bytesRead = new AtomicLong(0L);

Review Comment:
   Let's see if I'm clear. The items are mutable by the thread doing the 
reading, but then immutable when returned to (passed to) the consumer. Correct?
   
   If that is true, then the "Atomic" form isn't strictly necessary since only 
one thread at a time access them: the object ping-pongs back and forth between 
the two.
   
   The question is whether the producer mutates these items while the consumer 
reads. If so, then the consumer will have a very hard time of it.



##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.channel.WritableStreamFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+/**
+ * Sorts and partitions a dataset using parallel external merge sort.
+ *
+ * Input is provided as a set of {@link ReadableFrameChannel} and output is 
provided as {@link OutputChannels}.
+ * Work is performed on a provided {@link FrameProcessorExecutor}.
+ *
+ * The most central point for SuperSorter logic is the {@link 
#runWorkersIfPossible} method, which determines what
+ * needs to be done next based on the current state of the SuperSorter. The 
logic is:
+ *
+ * 1) Read input channels into {@link #inputBuffer} using {@link 
FrameChannelBatcher}, launched via
+ * {@link #runNextBatcher()}, up to a limit of {@link 
#maxChannelsPerProcessor} per batcher.
+ *
+ * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} 
scratch files using
+ * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}.
+ *
+ * 3a) Merge level 0 scratch files into level 1 scratch files using {@link 
FrameChannelMerger} launched from
+ * {@link #runNextMiddleMerger()}, processing up to {@link 
#maxChannelsPerProcessor} files per merger.
+ * Continue this process through increasing level numbers, with the size of 
scratch files increasing by a factor
+ * of {@link #maxChannelsPerProcessor} each level.
+ *
+ * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by 
{@link #runNextMiddleMerger()} writes
+ * partitioned {@link FrameFile} scratch files. The penultimate level cannot 
be written until
+ * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by 
this point, the SuperSorter pauses.
+ * The SuperSorter resumes and writes the penultimate level's files when the 
future resolves.
+ *
+ * 4) Write the final level using {@link FrameChannelMerger} launched from 
{@link #runNextUltimateMerger()}.
+ * Outputs for this level are written to channels provided by {@link 
#outputChannelFactory}, rather than scratch files.
+ *
+ * At all points, higher level processing is preferred over lower-level 
processing. Writing to final output files
+ * is preferred over intermediate, and writing to intermediate files is 
preferred over reading inputs. These
+ * preferences ensure that the amount of data buffered up in memory does not 
grow too large.
+ *
+ * Potential future work (things we could optimize if necessary):
+ *
+ * - Collapse merging to a single level if level zero has one merger, and we 
want to write one output partition.
+ * - Skip batching, and inject directly into level 0, if input channels are 
already individually fully-sorted.
+ * - Combine (for example: aggregate) while merging.
+ */
+public class SuperSorter
+{
+  private static final Logger log = new Logger(SuperSorter.class);
+
+  public static final int UNKNOWN_LEVEL = -1;
+  public static final long UNKNOWN_TOTAL = -1;
+
+  private final List<ReadableFrameChannel> inputChannels;
+  private final FrameReader frameReader;
+  private final ClusterBy clusterBy;
+  private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
+  private final FrameProcessorExecutor exec;
+  private final File directory;
+  private final OutputChannelFactory outputChannelFactory;
+  private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
+  private final int maxChannelsPerProcessor;
+  private final int maxActiveProcessors;
+  private final long rowLimit;
+  private final String cancellationId;
+
+  private final Object runWorkersLock = new Object();
+
+  @GuardedBy("runWorkersLock")
+  private boolean batcherIsRunning = false;
+
+  @GuardedBy("runWorkersLock")
+  private IntSet inputChannelsToRead = new IntOpenHashSet();
+
+  @GuardedBy("runWorkersLock")
+  private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new 
Int2ObjectArrayMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private List<OutputChannel> outputChannels = null;
+
+  @GuardedBy("runWorkersLock")
+  private int activeProcessors = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long totalInputFrames = UNKNOWN_TOTAL;
+
+  @GuardedBy("runWorkersLock")
+  private int totalMergingLevels = UNKNOWN_LEVEL;
+
+  @GuardedBy("runWorkersLock")
+  private final Queue<Frame> inputBuffer = new ArrayDeque<>();
+
+  @GuardedBy("runWorkersLock")
+  private long inputFramesReadSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long levelZeroMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private int ultimateMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private final Map<File, FrameFile> penultimateFrameFileCache = new 
HashMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private SettableFuture<OutputChannels> allDone = null;
+
+  @GuardedBy("runWorkersLock")
+  SuperSorterProgressTracker superSorterProgressTracker;
+
+  /**
+   * See {@link #setNoWorkRunnable}.
+   */
+  @GuardedBy("runWorkersLock")
+  private Runnable noWorkRunnable = null;
+
+  /**
+   * Initializes a SuperSorter.
+   *
+   * @param inputChannels              input channels. All frames in these 
channels must be sorted according to the
+   *                                   {@link ClusterBy#getColumns()}, or else 
sorting will not produce correct
+   *                                   output.
+   * @param frameReader                frame reader for the input channels
+   * @param clusterBy                  desired sorting order
+   * @param outputPartitionsFuture     a future that resolves to the desired 
output partitions. Sorting will block
+   *                                   prior to writing out final outputs 
until this future resolves. However, the
+   *                                   sorter will be able to read all inputs 
even if this future is unresolved.
+   *                                   If output need not be partitioned, use
+   *                                   {@link 
ClusterByPartitions#oneUniversalPartition()}. In this case a single
+   *                                   sorted channel is generated.
+   * @param exec                       executor to perform work in
+   * @param temporaryDirectory         directory to use for scratch files. 
This must have enough space to store at
+   *                                   least two copies of the dataset in 
{@link FrameFile} format.
+   * @param outputChannelFactory       factory for partitioned, sorted output 
channels
+   * @param innerFrameAllocatorMaker   supplier for allocators that are used 
to make merged frames for intermediate
+   *                                   levels of merging, prior to the final 
output. Final output frame allocation is
+   *                                   controlled by outputChannelFactory. One 
allocator is created per intermediate
+   *                                   scratch file.
+   * @param maxActiveProcessors        maximum number of merging processors to 
execute at once in the provided
+   *                                   {@link FrameProcessorExecutor}
+   * @param maxChannelsPerProcessor    maximum number of channels to merge at 
once per merging processor
+   * @param rowLimit                   limit to apply during sorting. The 
limit is merely advisory: the actual number
+   *                                   of rows returned may be larger than the 
limit. The limit is applied across
+   *                                   all partitions, not to each partition 
individually.
+   * @param cancellationId             cancellation id to use when running 
processors in the provided
+   *                                   {@link FrameProcessorExecutor}.
+   * @param superSorterProgressTracker progress tracker
+   */
+  public SuperSorter(
+      final List<ReadableFrameChannel> inputChannels,
+      final FrameReader frameReader,
+      final ClusterBy clusterBy,
+      final ListenableFuture<ClusterByPartitions> outputPartitionsFuture,
+      final FrameProcessorExecutor exec,
+      final File temporaryDirectory,
+      final OutputChannelFactory outputChannelFactory,
+      final Supplier<MemoryAllocator> innerFrameAllocatorMaker,
+      final int maxActiveProcessors,
+      final int maxChannelsPerProcessor,
+      final long rowLimit,
+      @Nullable final String cancellationId,
+      final SuperSorterProgressTracker superSorterProgressTracker
+  )
+  {
+    this.inputChannels = inputChannels;
+    this.frameReader = frameReader;
+    this.clusterBy = clusterBy;
+    this.outputPartitionsFuture = outputPartitionsFuture;
+    this.exec = exec;
+    this.directory = temporaryDirectory;
+    this.outputChannelFactory = outputChannelFactory;
+    this.innerFrameAllocatorMaker = innerFrameAllocatorMaker;
+    this.maxChannelsPerProcessor = maxChannelsPerProcessor;
+    this.maxActiveProcessors = maxActiveProcessors;
+    this.rowLimit = rowLimit;
+    this.cancellationId = cancellationId;
+    this.superSorterProgressTracker = superSorterProgressTracker;
+
+    for (int i = 0; i < inputChannels.size(); i++) {
+      inputChannelsToRead.add(i);
+    }
+
+    if (maxActiveProcessors < 1) {
+      throw new IAE("maxActiveProcessors[%d] < 1", maxActiveProcessors);
+    }
+
+    if (maxChannelsPerProcessor < 2) {
+      throw new IAE("maxChannelsPerProcessor[%d] < 2", 
maxChannelsPerProcessor);
+    }
+  }
+
+  /**
+   * Starts sorting. Can only be called once. Work is performed in the {@link 
FrameProcessorExecutor} that was
+   * passed to the constructor.
+   *
+   * Returns a future containing partitioned sorted output channels.
+   */
+  public ListenableFuture<OutputChannels> run()
+  {
+    synchronized (runWorkersLock) {
+      if (allDone != null) {
+        throw new ISE("Cannot run() more than once.");
+      }
+
+      allDone = SettableFuture.create();
+      runWorkersIfPossible();
+
+      // When output partitions become known, that may unblock some additional 
layers of merging.
+      outputPartitionsFuture.addListener(
+          () -> {
+            synchronized (runWorkersLock) {
+              if (outputPartitionsFuture.isDone()) { // Update the progress 
tracker
+                
superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size());
+              }
+              runWorkersIfPossible();
+              setAllDoneIfPossible();
+            }
+          },
+          exec.getExecutorService()
+      );
+
+      return FutureUtils.futureWithBaggage(
+          allDone,
+          () -> {
+            synchronized (runWorkersLock) {
+              if (activeProcessors == 0) {
+                cleanUp();
+              }
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Sets a callback that enables tests to see when this SuperSorter cannot do 
any work. Only used for testing.
+   */
+  @VisibleForTesting
+  void setNoWorkRunnable(final Runnable runnable)
+  {
+    synchronized (runWorkersLock) {
+      this.noWorkRunnable = runnable;
+    }
+  }
+
+  /**
+   * Called when a worker finishes.
+   */
+  @GuardedBy("runWorkersLock")
+  private void workerFinished()
+  {
+    activeProcessors -= 1;
+
+    if (log.isDebugEnabled()) {
+      log.debug(stateString());
+    }
+
+    runWorkersIfPossible();
+    setAllDoneIfPossible();
+
+    if (isAllDone() && activeProcessors == 0) {
+      cleanUp();
+    }
+  }
+
+  /**
+   * Tries to launch a new worker, and returns whether it was doable.
+   *
+   * Later workers have priority, i.e., those responsible for merging higher 
levels of the merge tree. Workers that
+   * read the original input channels have the lowest priority. This priority 
order ensures that we don't build up
+   * too much unmerged data.
+   */
+  @GuardedBy("runWorkersLock")
+  private void runWorkersIfPossible()
+  {
+    if (isAllDone()) {
+      // Do nothing if the instance is all done. This can happen in case of 
error or cancellation.
+      return;
+    }
+
+    try {
+      while (activeProcessors < maxActiveProcessors &&
+             (runNextUltimateMerger() || runNextMiddleMerger() || 
runNextLevelZeroMerger() || runNextBatcher())) {
+        activeProcessors += 1;
+
+        if (log.isDebugEnabled()) {
+          log.debug(stateString());
+        }
+      }
+
+      if (activeProcessors == 0 && noWorkRunnable != null) {
+        log.debug("No active workers and no work left to start.");
+
+        // Only called in tests. No need to bother with try/catch and such.
+        noWorkRunnable.run();
+      }
+    }
+    catch (Throwable e) {
+      allDone.setException(e);
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private void setAllDoneIfPossible()
+  {
+    if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
+      // No input data -- generate empty output channels.
+      final ClusterByPartitions partitions = getOutputPartitions();
+      final List<OutputChannel> channels = new ArrayList<>(partitions.size());
+
+      for (int partitionNum = 0; partitionNum < partitions.size(); 
partitionNum++) {
+        channels.add(outputChannelFactory.openNilChannel(partitionNum));
+      }
+
+      // OK to use wrap, not wrapReadOnly, because nil channels are already 
read-only.
+      allDone.set(OutputChannels.wrap(channels));
+    } else if (totalMergingLevels != UNKNOWN_LEVEL
+               && outputsReadyByLevel.containsKey(totalMergingLevels - 1)
+               && outputsReadyByLevel.get(totalMergingLevels - 1).size() == 
getOutputPartitions().size()) {
+      // We're done!!
+      try {
+        // OK to use wrap, not wrapReadOnly, because all channels in this list 
are already read-only.
+        allDone.set(OutputChannels.wrap(outputChannels));
+      }
+      catch (Throwable e) {
+        allDone.setException(e);
+      }
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextBatcher()
+  {
+    if (batcherIsRunning || inputChannelsToRead.isEmpty()) {
+      return false;
+    } else {
+      batcherIsRunning = true;
+
+      runWorker(
+          new FrameChannelBatcher(inputChannels, maxChannelsPerProcessor),
+          result -> {
+            final List<Frame> batch = result.lhs;
+            final IntSet keepReading = result.rhs;
+
+            synchronized (runWorkersLock) {
+              inputBuffer.addAll(batch);
+              inputFramesReadSoFar += batch.size();
+              inputChannelsToRead = keepReading;
+
+              if (inputChannelsToRead.isEmpty()) {
+                inputChannels.forEach(ReadableFrameChannel::doneReading);
+                setTotalInputFrames(inputFramesReadSoFar);
+                runWorkersIfPossible();
+              } else if (inputBuffer.size() >= maxChannelsPerProcessor) {
+                runWorkersIfPossible();
+              }
+
+              batcherIsRunning = false;
+            }
+          }
+      );
+
+      return true;
+    }
+  }
+
+  /**
+   * Level zero mergers read batches of frames from the "inputBuffer". These 
frames are individually sorted, but there
+   * is no ordering between the frames. Their output is a sorted sequence of 
frames.
+   */
+  @GuardedBy("runWorkersLock")
+  private boolean runNextLevelZeroMerger()
+  {
+    if (inputBuffer.isEmpty() || (inputBuffer.size() < maxChannelsPerProcessor 
&& !allInputRead())) {
+      return false;
+    }
+
+    final List<ReadableFrameChannel> in = new ArrayList<>();
+
+    while (in.size() < maxChannelsPerProcessor) {
+      final Frame frame = inputBuffer.poll();
+
+      if (frame == null) {
+        break;
+      }
+
+      in.add(singleReadableFrameChannel(new FrameWithPartition(frame, 
FrameWithPartition.NO_PARTITION)));
+    }
+
+    runMerger(0, levelZeroMergersRunSoFar++, in, null);
+    return true;
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextMiddleMerger()

Review Comment:
   Ah! Glad to hear that the "guts" are in a separate bit we can test well.
   
   I wouldn't block a commit for restructuring; we can tackle it later as 
needed. I find that, for things like this, it's hard to know a better structure 
until we have to change something: then struggles with tests (or understanding) 
suggest the joints at which the code should be carved.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}.
+ *
+ * If you want single threaded execution, use {@code Execs.singleThreaded()}. 
It is not a good idea to use this with a
+ * same-thread executor like {@code Execs.directExecutor()}, because it will 
lead to deep call stacks.
+ */
+public class FrameProcessorExecutor

Review Comment:
   Thanks for the explanation. Since there is nothing available, and we need to 
roll our own, we can generalize when we find a need to do so, no need to do 
extra work now.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}.

Review Comment:
   That makes sense. So, to run the "fragment" you just described, we'd have 
something like this?
   
   ```text
   reader processor --> channel --> main logic processor --> channel --> output 
processor
   ```
   
   In this case, there are three concurrent frame processors communicating over 
in-memory channels? If so, it is very Go-like! This leads to two questions.
   
   First, how does the "runner" know what to run? A comment in the sorter 
suggests we'd prefer to do the output, then do the main logic that can produce 
more output, and finally do a read only if we've used up whatever data we have. 
Clear enough.
   
   How does the runner coordinate? What is the protocol that each frame 
processor uses to say "I can run" or "I could run, if you gave me more input" 
or "Sorry, EOF, I'm fresh out of data?
   
   And, in the main logic, what structure, if any, do we provide if that logic 
is, say, a filter, projection and frame-repackaging? How do the various steps 
coordinate?
   
   This is the kind of nitty-gritty detail that each frame processor writer 
will need to know. Not sure where that info should go, but it would be handy if 
we could explain it.



##########
processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableNilFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Represents an output channel for some frame processor. Composed of a pair 
of {@link WritableFrameChannel}, which the
+ * processor writes to, along with a supplier of a {@link 
ReadableFrameChannel}, which readers can read from.
+ *
+ * At the time an instance of this class is created, the writable channel is 
already open, but the readable channel
+ * has not yet been created. It is created upon the first call to {@link 
#getReadableChannel()}.
+ */
+public class OutputChannel
+{
+  @Nullable
+  private final WritableFrameChannel writableChannel;
+  @Nullable
+  private final MemoryAllocator frameMemoryAllocator;
+  private final Supplier<ReadableFrameChannel> readableChannelSupplier;
+  private final int partitionNumber;
+
+  private OutputChannel(
+      @Nullable final WritableFrameChannel writableChannel,
+      @Nullable final MemoryAllocator frameMemoryAllocator,
+      final Supplier<ReadableFrameChannel> readableChannelSupplier,
+      final int partitionNumber
+  )
+  {
+    this.writableChannel = writableChannel;
+    this.frameMemoryAllocator = frameMemoryAllocator;
+    this.readableChannelSupplier = readableChannelSupplier;
+    this.partitionNumber = partitionNumber;
+
+    if (partitionNumber < 0 && partitionNumber != 
FrameWithPartition.NO_PARTITION) {
+      throw new IAE("Invalid partition number [%d]", partitionNumber);
+    }
+  }
+
+  /**
+   * Creates an output channel pair.
+   *
+   * @param writableChannel         writable channel for producer
+   * @param frameMemoryAllocator    memory allocator for producer to use while 
writing frames to the channel
+   * @param readableChannelSupplier readable channel for consumer. May be 
called multiple times, so you should wrap this
+   *                                in {@link Suppliers#memoize} if needed.
+   * @param partitionNumber         partition number, if any; may be {@link 
FrameWithPartition#NO_PARTITION} if unknown
+   */
+  public static OutputChannel pair(
+      final WritableFrameChannel writableChannel,
+      final MemoryAllocator frameMemoryAllocator,
+      final Supplier<ReadableFrameChannel> readableChannelSupplier,
+      final int partitionNumber
+  )
+  {
+    return new OutputChannel(
+        Preconditions.checkNotNull(writableChannel, "writableChannel"),
+        Preconditions.checkNotNull(frameMemoryAllocator, 
"frameMemoryAllocator"),
+        readableChannelSupplier,
+        partitionNumber
+    );
+  }
+
+  /**
+   * Create a nil output channel, representing a processor that writes 
nothing. It is not actually writable, but
+   * provides a way for downstream processors to read nothing.
+   */
+  public static OutputChannel nil(final int partitionNumber)
+  {
+    return new OutputChannel(null, null, () -> 
ReadableNilFrameChannel.INSTANCE, partitionNumber);
+  }
+
+  /**
+   * Returns the writable channel of this pair. The producer writes to this 
channel.
+   */
+  public WritableFrameChannel getWritableChannel()
+  {
+    if (writableChannel == null) {
+      throw new ISE("Writable channel is not available");
+    } else {
+      return writableChannel;
+    }
+  }
+
+  /**
+   * Returns the memory allocator for the writable channel. The producer uses 
this to generate frames for the channel.
+   */
+  public MemoryAllocator getFrameMemoryAllocator()
+  {
+    if (frameMemoryAllocator == null) {
+      throw new ISE("Writable channel is not available");
+    } else {
+      return frameMemoryAllocator;
+    }
+  }
+
+  /**
+   * Returns the readable channel of this pair. This readable channel may, or 
may not, be usable before the
+   * writable channel is closed. It depends on whether the channel pair was 
created in a stream-capable manner or not.
+   */
+  public ReadableFrameChannel getReadableChannel()
+  {
+    return readableChannelSupplier.get();
+  }
+
+  public int getPartitionNumber()
+  {
+    return partitionNumber;
+  }
+
+  public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, 
WritableFrameChannel> mapFn)
+  {
+    if (writableChannel == null) {
+      return this;
+    } else {
+      return new OutputChannel(
+          mapFn.apply(writableChannel),
+          frameMemoryAllocator,
+          readableChannelSupplier,
+          partitionNumber
+      );
+    }
+  }
+
+  /**
+   * Returns a read-only version of this instance. Read-only versions have 
neither {@link #getWritableChannel()} nor
+   * {@link #getFrameMemoryAllocator()}, and therefore require substantially 
less memory.
+   */
+  public OutputChannel readOnly()

Review Comment:
   I kind of like the Go approach: there is a channel, then a channel can 
provide a channel reader and a channel writer. In Go, only the writer can close 
the channel (to indicate EOF), the reader doesn't have to. That seems like a 
wise choice: there can be multiple readers, and the Go approach avoids the need 
to track them to detect when the last reader has closed its channel reader.
   
   So, one thought would be to replicate that idea here. A "channel" is the 
pipe. A "channel reader" is the output and a "channel writer" is the input. A 
channel with only readers is expressed simply in this model.
   
   This is another of those "make it easy for developers to understand" things, 
not functional, so it's fine to defer that kind of refactoring to later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to