paul-rogers commented on code in PR #12848:
URL: https://github.com/apache/druid/pull/12848#discussion_r936141405


##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * Channel backed by a byte stream that is continuously streamed in using 
{@link #addChunk}. The byte stream
+ * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}.
+ *
+ * This class is used by {@link 
org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking
+ * reads from a remote http server.
+ */
+public class ReadableByteChunksFrameChannel implements ReadableFrameChannel
+{
+  private static final Logger log = new 
Logger(ReadableByteChunksFrameChannel.class);
+
+  /**
+   * Largest supported frame. Limit exists as a safeguard against streams with 
huge frame sizes. It is not expected
+   * that any legitimate frame will be this large: typical usage involves 
frames an order of magnitude smaller.
+   */
+  private static final long MAX_FRAME_SIZE = 100_000_000;
+
+  private static final int UNKNOWN_LENGTH = -1;
+  private static final int FRAME_MARKER_BYTES = Byte.BYTES;
+  private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES =
+      Byte.BYTES + Frame.COMPRESSED_FRAME_ENVELOPE_SIZE;
+
+  private enum StreamPart
+  {
+    MAGIC,
+    FRAMES,
+    FOOTER
+  }
+
+  private final Object lock = new Object();
+  private final String id;
+  private final long bytesLimit;
+
+  @GuardedBy("lock")
+  private final List<Either<Throwable, byte[]>> chunks = new ArrayList<>();
+
+  @GuardedBy("lock")
+  private SettableFuture<?> addChunkBackpressureFuture = null;
+
+  @GuardedBy("lock")
+  private SettableFuture<?> readyForReadingFuture = null;
+
+  @GuardedBy("lock")
+  private boolean noMoreWrites = false;
+
+  @GuardedBy("lock")
+  private int positionInFirstChunk = 0;
+
+  @GuardedBy("lock")
+  private long bytesBuffered = 0;
+
+  @GuardedBy("lock")
+  private long bytesAdded = 0;
+
+  @GuardedBy("lock")
+  private long nextCompressedFrameLength = UNKNOWN_LENGTH;
+
+  @GuardedBy("lock")
+  private StreamPart streamPart = StreamPart.MAGIC;
+
+  private ReadableByteChunksFrameChannel(String id, long bytesLimit)
+  {
+    this.id = Preconditions.checkNotNull(id, "id");
+    this.bytesLimit = bytesLimit;
+  }
+
+  /**
+   * Create a channel that aims to limit its memory footprint to one frame. 
The channel exerts backpressure
+   * from {@link #addChunk} immediately once a full frame has been buffered.
+   */
+  public static ReadableByteChunksFrameChannel create(final String id)
+  {
+    // Set byte limit to 1, so backpressure will be exerted as soon as we have 
a full frame buffered.
+    // (The bytesLimit is soft: it will be exceeded if needed to store a 
complete frame.)
+    return new ReadableByteChunksFrameChannel(id, 1);
+  }
+
+  /**
+   * Adds a chunk of bytes. If this chunk forms a full frame, it will 
immediately become available for reading.
+   * Otherwise, the bytes will be buffered until a full frame is encountered.
+   *
+   * Returns an Optional that is absent if the amount of queued bytes is below 
this channel's limit accept, or present
+   * if the amount of queued bytes is at or above this channel's limit. If the 
Optional is present, you are politely
+   * requested to wait for the future to resolve before adding additional 
chunks. (This is not enforced; addChunk will
+   * continue to accept new chunks even if the channel is over its limit.)
+   *
+   * When done adding chunks call {@code doneWriting}.
+   */
+  public Optional<ListenableFuture<?>> addChunk(final byte[] chunk)

Review Comment:
   A general rule is to try to limit garbage in performance-sensitive code. 
`Optionals` are wonderful, but they create garbage. Should we just return a 
pointer, and let `null` indicate nothing exists?



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableFrameChannel.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+/**
+ * Interface for reading a sequence of frames. Supports nonblocking reads 
through the {@link #canRead()} and
+ * {@link #readabilityFuture()} methods.
+ *
+ * May be implemented using an in-memory queue, disk file, stream, etc.
+ */
+public interface ReadableFrameChannel
+{
+  /**
+   * Returns whether this channel is finished. Finished channels will not 
generate any further frames or errors.
+   *
+   * Generally, once you discover that a channel is finished, you should call 
{@link #doneReading()} and then
+   * discard it.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean isFinished();
+
+  /**
+   * Returns whether this channel has a frame or error condition currently 
available. If this method returns true, then
+   * you can call {@link #read()} to retrieve the frame or error.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean canRead();
+
+  /**
+   * Returns the next available frame from this channel.
+   *
+   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a frame or
+   * error available.
+   *
+   * @throws java.util.NoSuchElementException if there is no frame currently 
available
+   */
+  Frame read();
+
+  /**
+   * Returns a future that will resolve when either {@link #isFinished()} or 
{@link #canRead()} would
+   * return true. The future will never resolve to an exception. If something 
exceptional has happened, the exception
+   * can be retrieved from {@link #read()}.
+   */
+  ListenableFuture<?> readabilityFuture();
+
+  /**
+   * Releases any resources associated with this readable channel. After 
calling this, you should not call any other
+   * methods on the channel.
+   */
+  void doneReading();

Review Comment:
   Can we just call this good old `close()` to reduce cognitive load on us poor 
bit-stained coding wretches?



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * Channel backed by a byte stream that is continuously streamed in using 
{@link #addChunk}. The byte stream
+ * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}.
+ *
+ * This class is used by {@link 
org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking
+ * reads from a remote http server.
+ */
+public class ReadableByteChunksFrameChannel implements ReadableFrameChannel
+{
+  private static final Logger log = new 
Logger(ReadableByteChunksFrameChannel.class);
+
+  /**
+   * Largest supported frame. Limit exists as a safeguard against streams with 
huge frame sizes. It is not expected
+   * that any legitimate frame will be this large: typical usage involves 
frames an order of magnitude smaller.
+   */
+  private static final long MAX_FRAME_SIZE = 100_000_000;

Review Comment:
   Nit: add units: `MAX_FRAME_SIZE_BYTES`.



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Channel backed by an {@link InputStream}.
+ *
+ * Frame channels are expected to be nonblocking, but InputStreams cannot be 
read in nonblocking fashion.
+ * This implementation deals with that by using an {@link ExecutorService} to 
read from the stream in a
+ * separate thread.
+ */
+public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
+{
+  private final InputStream inputStream;
+  private final ReadableByteChunksFrameChannel delegate;
+  private final ExecutorService executorService;
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final byte[] buffer = new byte[8 * 1024];
+
+  @GuardedBy("lock")
+  private long totalInputStreamBytesRead = 0;
+
+  @GuardedBy("lock")
+  private boolean inputStreamFinished = false;
+
+  @GuardedBy("lock")
+  private boolean inputStreamError = false;

Review Comment:
   Nit: no real reason to explicitly initialize a variable to its default 
value. Trust the JVM.



##########
processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.ArrayDeque;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * In-memory channel backed by a limited-capacity {@link java.util.Deque}.
+ */

Review Comment:
   A queue makes sense when the producer and consumer run in different threads; 
it doesn't add much when run in a single thread. Given this, can we add an 
explanation of the concurrency semantics, here or in the parent class? For 
example is it thread-safe for concurrent producers and consumers? Do these 
block waiting for the other, or are they supposed to poll?
   
   What is the expected setup order? Should the channel be initialized before 
either the producer or consumer works with its?
   
   It probably does not make sense to have multiple consumers. Can there be 
multiple producers writing to the same channel?



##########
processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.file;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Response object for {@link FrameFileHttpResponseHandler}.
+ */
+public class FrameFilePartialFetch
+{
+  private final AtomicLong bytesRead = new AtomicLong(0L);

Review Comment:
   Add a bit more description? The term "Response object" suggests that this is 
the return value from a function, indicating a point in time. In that case, I'd 
expect the values here to be `final` simple values that hold a snapshot of the 
atomic values maintained in the underlying class.
   
   Given that these are atomic, I suspect that this class is not a simple 
"response object". Explain a bit?



##########
processing/src/main/java/org/apache/druid/frame/key/ClusterBy.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Describes a key used for sorting or partitioning.
+ *
+ * Keys have columns, and some of those columns may comprise a "bucket key". 
See {@link #getBucketByCount()} for
+ * details about bucket keys.
+ */
+public class ClusterBy
+{
+  private final List<SortColumn> columns;
+  private final int bucketByCount;
+
+  @JsonCreator
+  public ClusterBy(
+      @JsonProperty("columns") List<SortColumn> columns,
+      @JsonProperty("bucketByCount") int bucketByCount
+  )
+  {
+    this.columns = Preconditions.checkNotNull(columns, "columns");
+    this.bucketByCount = bucketByCount;
+
+    if (bucketByCount < 0 || bucketByCount > columns.size()) {
+      throw new IAE("Invalid bucketByCount [%d]", bucketByCount);
+    }
+  }
+
+  /**
+   * Create an empty key.
+   */
+  public static ClusterBy none()
+  {
+    return new ClusterBy(Collections.emptyList(), 0);
+  }
+
+  /**
+   * The columns that comprise this key, in order.
+   */
+  @JsonProperty
+  public List<SortColumn> getColumns()
+  {
+    return columns;
+  }
+
+  /**
+   * How many fields from {@link #getColumns()} comprise the "bucket key". 
Bucketing is like strict partitioning: all

Review Comment:
   Presumably this is the length of the prefix of columns that comprise the 
bucket key?



##########
processing/src/main/java/org/apache/druid/frame/processor/BlockingQueueOutputChannelFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+
+/**
+ * An {@link OutputChannelFactory} that generates {@link 
BlockingQueueFrameChannel}.
+ */
+public class BlockingQueueOutputChannelFactory implements OutputChannelFactory
+{
+  private final int frameSize;
+
+  public BlockingQueueOutputChannelFactory(final int frameSize)
+  {
+    this.frameSize = frameSize;
+  }
+
+  @Override
+  public OutputChannel openChannel(final int partitionNumber)
+  {
+    final BlockingQueueFrameChannel channel = 
BlockingQueueFrameChannel.minimal();
+    return OutputChannel.pair(channel, 
ArenaMemoryAllocator.createOnHeap(frameSize), () -> channel, partitionNumber);
+  }
+
+  @Override
+  public OutputChannel openNilChannel(final int partitionNumber)
+  {
+    return OutputChannel.nil(partitionNumber);
+  }

Review Comment:
   Explain this a bit? I get that a nil channel can be handy. I also get that 
we want to open a channel for a partition. I even get that there may be times 
when a channel is empty, and, for convenience, we just return a nil channel to 
the caller.
   
   But, when would the caller itself decide to ask for a nil channel rather 
than one with data?



##########
processing/src/main/java/org/apache/druid/frame/key/ClusterByPartition.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Boundaries of a partition marked by start and end keys. The keys are 
generally described by a
+ * {@link ClusterBy} instance that is not referenced here. (It is generally 
provided contextually.)
+ *
+ * Often, this object is part of a full partition set represented by {@link 
ClusterByPartitions}.
+ */
+public class ClusterByPartition
+{
+  @Nullable
+  private final RowKey start;
+  @Nullable
+  private final RowKey end;
+
+  @JsonCreator
+  public ClusterByPartition(
+      @JsonProperty("start") @Nullable RowKey start,
+      @JsonProperty("end") @Nullable RowKey end
+  )
+  {
+    this.start = start;
+    this.end = end;
+  }
+
+  /**
+   * Get the starting key for this range. It is inclusive (the range *does* 
contain this key).
+   *
+   * Null means the range is unbounded at the start.
+   */
+  @JsonProperty
+  @Nullable
+  public RowKey getStart()
+  {
+    return start;
+  }
+
+  /**
+   * Get the ending key for this range. It is exclusive (the range *does not* 
contain this key).
+   *
+   * Null means the range is unbounded at the end.
+   */
+  @JsonProperty
+  @Nullable
+  public RowKey getEnd()
+  {
+    return end;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClusterByPartition that = (ClusterByPartition) o;
+    return Objects.equals(start, that.start) && Objects.equals(end, that.end);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(start, end);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "{" + start + " -> " + end + "}";

Review Comment:
   `[start, end)`? Just to remind the reader that the range is inclusive, 
exclusive.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}.

Review Comment:
   Explain a bit more?
   
   This seems to assume that each "fragment" of execution is handled within a 
single frame processor: reads from (input | network), writes to (destination | 
output).
   
   Yet, most pipelines are made up of multiple operations: scan, filter and 
project, say. Or, aggregate & filter. Etc.
   
   If each "fragment" must do multiple operations, does that mean that every 
combination of pipeline elements has to be code up as a different frame 
processor? And that the developer has to know to route the one-frame-per-call 
logic to the step in that pipeline? (That is, route to the scan operation, then 
do a filter, and stop. Next call, pick up that frame and write to output and 
stop, etc.)
   
   Or, is the intent here that a "fragment" (unit of pipeline execution running 
on a node) is a combination of frame processors, and this class handles 
multiplexing calls to the elements of the pipeline? Call the scan. When it has 
a frame in its output buffer, call the filter. If it has an output frame, call 
write. Circle back ground to the scan otherwise.
   
   Would be great if we could explain this pipeline-level architecture 
somewhere.



##########
processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing a sequence of frames. Supports nonblocking writes 
through the {@link #writabilityFuture()}
+ * method.
+ *
+ * May be implemented using an in-memory queue, disk file, stream, etc.
+ */
+public interface WritableFrameChannel
+{
+  /**
+   * Writes a frame with an attached partition number.
+   *
+   * May throw an exception if {@link #writabilityFuture()} is unresolved.
+   */
+  void write(FrameWithPartition frameWithPartition) throws IOException;
+
+  /**
+   * Writes a frame without an attached partition number.
+   *
+   * May throw an exception if {@link #writabilityFuture()} is unresolved.
+   */
+  default void write(Frame frame) throws IOException
+  {
+    write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
+  }
+
+  /**
+   * Finish writing to this channel, unsuccessfully. Must be followed by a 
call to {@link #doneWriting()}.
+   */
+  void abort() throws IOException;
+
+  /**
+   * Finish writing to this channel.
+   *
+   * After calling this method, no additional calls to {@link #write}, {@link 
#abort()}, or this method are permitted.
+   */
+  void doneWriting() throws IOException;

Review Comment:
   `close()`?



##########
processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableNilFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Represents an output channel for some frame processor. Composed of a pair 
of {@link WritableFrameChannel}, which the
+ * processor writes to, along with a supplier of a {@link 
ReadableFrameChannel}, which readers can read from.
+ *
+ * At the time an instance of this class is created, the writable channel is 
already open, but the readable channel
+ * has not yet been created. It is created upon the first call to {@link 
#getReadableChannel()}.
+ */
+public class OutputChannel
+{
+  @Nullable
+  private final WritableFrameChannel writableChannel;
+  @Nullable
+  private final MemoryAllocator frameMemoryAllocator;
+  private final Supplier<ReadableFrameChannel> readableChannelSupplier;
+  private final int partitionNumber;
+
+  private OutputChannel(
+      @Nullable final WritableFrameChannel writableChannel,
+      @Nullable final MemoryAllocator frameMemoryAllocator,
+      final Supplier<ReadableFrameChannel> readableChannelSupplier,
+      final int partitionNumber
+  )
+  {
+    this.writableChannel = writableChannel;
+    this.frameMemoryAllocator = frameMemoryAllocator;
+    this.readableChannelSupplier = readableChannelSupplier;
+    this.partitionNumber = partitionNumber;
+
+    if (partitionNumber < 0 && partitionNumber != 
FrameWithPartition.NO_PARTITION) {
+      throw new IAE("Invalid partition number [%d]", partitionNumber);
+    }
+  }
+
+  /**
+   * Creates an output channel pair.
+   *
+   * @param writableChannel         writable channel for producer
+   * @param frameMemoryAllocator    memory allocator for producer to use while 
writing frames to the channel
+   * @param readableChannelSupplier readable channel for consumer. May be 
called multiple times, so you should wrap this
+   *                                in {@link Suppliers#memoize} if needed.
+   * @param partitionNumber         partition number, if any; may be {@link 
FrameWithPartition#NO_PARTITION} if unknown
+   */
+  public static OutputChannel pair(
+      final WritableFrameChannel writableChannel,
+      final MemoryAllocator frameMemoryAllocator,
+      final Supplier<ReadableFrameChannel> readableChannelSupplier,
+      final int partitionNumber
+  )
+  {
+    return new OutputChannel(
+        Preconditions.checkNotNull(writableChannel, "writableChannel"),
+        Preconditions.checkNotNull(frameMemoryAllocator, 
"frameMemoryAllocator"),
+        readableChannelSupplier,
+        partitionNumber
+    );
+  }
+
+  /**
+   * Create a nil output channel, representing a processor that writes 
nothing. It is not actually writable, but
+   * provides a way for downstream processors to read nothing.
+   */
+  public static OutputChannel nil(final int partitionNumber)
+  {
+    return new OutputChannel(null, null, () -> 
ReadableNilFrameChannel.INSTANCE, partitionNumber);
+  }
+
+  /**
+   * Returns the writable channel of this pair. The producer writes to this 
channel.
+   */
+  public WritableFrameChannel getWritableChannel()
+  {
+    if (writableChannel == null) {
+      throw new ISE("Writable channel is not available");
+    } else {
+      return writableChannel;
+    }
+  }
+
+  /**
+   * Returns the memory allocator for the writable channel. The producer uses 
this to generate frames for the channel.
+   */
+  public MemoryAllocator getFrameMemoryAllocator()
+  {
+    if (frameMemoryAllocator == null) {
+      throw new ISE("Writable channel is not available");
+    } else {
+      return frameMemoryAllocator;
+    }
+  }
+
+  /**
+   * Returns the readable channel of this pair. This readable channel may, or 
may not, be usable before the
+   * writable channel is closed. It depends on whether the channel pair was 
created in a stream-capable manner or not.
+   */
+  public ReadableFrameChannel getReadableChannel()
+  {
+    return readableChannelSupplier.get();
+  }
+
+  public int getPartitionNumber()
+  {
+    return partitionNumber;
+  }
+
+  public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, 
WritableFrameChannel> mapFn)
+  {
+    if (writableChannel == null) {
+      return this;
+    } else {
+      return new OutputChannel(
+          mapFn.apply(writableChannel),
+          frameMemoryAllocator,
+          readableChannelSupplier,
+          partitionNumber
+      );
+    }
+  }
+
+  /**
+   * Returns a read-only version of this instance. Read-only versions have 
neither {@link #getWritableChannel()} nor
+   * {@link #getFrameMemoryAllocator()}, and therefore require substantially 
less memory.
+   */
+  public OutputChannel readOnly()

Review Comment:
   OK, I'll bite. What does "read only output channel" even mean? Doesn't 
output kinda imply we're doing some writing and not doing any reading?



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A FrameProcessor is like an incremental version of Runnable that operates 
on {@link ReadableFrameChannel} and
+ * {@link WritableFrameChannel}.
+ *
+ * It is designed to enable interleaved non-blocking work on a fixed-size 
thread pool. Typically, this is done using
+ * an instance of {@link FrameProcessorExecutor}.
+ */
+public interface FrameProcessor<T>
+{
+  /**
+   * List of input channels. The positions of channels in this list are used 
to build the {@code readableInputs} set
+   * provided to {@link #runIncrementally}.
+   */
+  List<ReadableFrameChannel> inputChannels();
+
+  /**
+   * List of output channels.
+   */
+  List<WritableFrameChannel> outputChannels();
+
+  /**
+   * Runs some of the algorithm, without blocking, and either returns a value 
or a set of input channels
+   * to wait for. This method is called by {@link 
FrameProcessorExecutor#runFully} when all output channels are
+   * writable. Therefore, it is guaranteed that each output channel can accept 
at least one frame.
+   *
+   * This method must not read more than one frame from each readable input 
channel, and must not write more than one
+   * frame to each output channel.
+   *
+   * @param readableInputs channels from {@link #inputChannels()} that are 
either finished or ready to read.
+   *
+   * @return either a final return value or a set of input channels to wait 
for. Must be nonnull.
+   */
+  ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException;

Review Comment:
   The semantics seem simple on the surface, but complex in details. Take a 
simple filter. It reads one frame. It may filter out zero or more rows, and 
writes the resulting frame. In this model, it is driven by the input: we write 
the input to the output, with filtered rows removed, omitting the output step 
if all rows are filtered.
   
   The above can reduce in sub-optimally sized frames. So, maybe we want to 
combine frames to get the ideal size: 1000-row frames in, filter 60%, and 
combine 2 1/2 filtered frames for output. Here, the frame processor must buffer 
that remaining half frame so it can continue where it left off.
   
   A sort will read one input per call, gradually building up value. When all 
are read, it will do the sort. But, shouldn't it limit its processing to no 
more than x ms? Then, on output, it must write one frame and stop.
   
   I wonder, do we intend to provide some kind of helper classes to manage the 
complex state machine otherwise required to keep track of this state? For the 
sort, I could imagine a "gatherer" a "sorter" and a "writer" where we 
transition from one to the next as the lifecycle proceeds.
   
   Otherwise, I worry that implementations will become very complex. (We saw 
this in Drill when its operators had to do too much: the code got too dense for 
any of us to easily understand. Later revisions provided more structure to make 
the task more manageable.) I'm sure we could learn from Presto here since it 
also has, I believe, non-blocking operators.
   
   Also, might the semantics be better if each call did as much work as it can 
without blocking? That sort we discussed, if five inputs are ready, why not 
just buffer them all rather than churning the CPU just to return, reschedule, 
and re-call the same processor?



##########
processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntSets;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+public class ReturnOrAwait<T>

Review Comment:
   Explanation? On the surface, in an async system, one normally does one or 
the other. (Return what's available, or wait to fulfill the request.) Trying to 
sort out where this fits in...
   
   Might be easier to just identify the problem this solves. Why, for example, 
do I want to wait for *all* channels rather than *any* channel?



##########
processing/src/main/java/org/apache/druid/frame/processor/MultiColumnSelectorFactory.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class MultiColumnSelectorFactory implements ColumnSelectorFactory
+{
+  private final List<Supplier<ColumnSelectorFactory>> factorySuppliers;
+  private final ColumnInspector columnInspector;
+
+  private int currentFactory = 0;

Review Comment:
   Nit about trusting the JVM to initialize fields.



##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.channel.WritableStreamFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+/**
+ * Sorts and partitions a dataset using parallel external merge sort.
+ *
+ * Input is provided as a set of {@link ReadableFrameChannel} and output is 
provided as {@link OutputChannels}.
+ * Work is performed on a provided {@link FrameProcessorExecutor}.
+ *
+ * The most central point for SuperSorter logic is the {@link 
#runWorkersIfPossible} method, which determines what
+ * needs to be done next based on the current state of the SuperSorter. The 
logic is:
+ *
+ * 1) Read input channels into {@link #inputBuffer} using {@link 
FrameChannelBatcher}, launched via
+ * {@link #runNextBatcher()}, up to a limit of {@link 
#maxChannelsPerProcessor} per batcher.
+ *
+ * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} 
scratch files using
+ * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}.
+ *
+ * 3a) Merge level 0 scratch files into level 1 scratch files using {@link 
FrameChannelMerger} launched from
+ * {@link #runNextMiddleMerger()}, processing up to {@link 
#maxChannelsPerProcessor} files per merger.
+ * Continue this process through increasing level numbers, with the size of 
scratch files increasing by a factor
+ * of {@link #maxChannelsPerProcessor} each level.
+ *
+ * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by 
{@link #runNextMiddleMerger()} writes
+ * partitioned {@link FrameFile} scratch files. The penultimate level cannot 
be written until
+ * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by 
this point, the SuperSorter pauses.
+ * The SuperSorter resumes and writes the penultimate level's files when the 
future resolves.
+ *
+ * 4) Write the final level using {@link FrameChannelMerger} launched from 
{@link #runNextUltimateMerger()}.
+ * Outputs for this level are written to channels provided by {@link 
#outputChannelFactory}, rather than scratch files.
+ *
+ * At all points, higher level processing is preferred over lower-level 
processing. Writing to final output files
+ * is preferred over intermediate, and writing to intermediate files is 
preferred over reading inputs. These
+ * preferences ensure that the amount of data buffered up in memory does not 
grow too large.
+ *
+ * Potential future work (things we could optimize if necessary):
+ *
+ * - Collapse merging to a single level if level zero has one merger, and we 
want to write one output partition.
+ * - Skip batching, and inject directly into level 0, if input channels are 
already individually fully-sorted.
+ * - Combine (for example: aggregate) while merging.
+ */
+public class SuperSorter
+{
+  private static final Logger log = new Logger(SuperSorter.class);
+
+  public static final int UNKNOWN_LEVEL = -1;
+  public static final long UNKNOWN_TOTAL = -1;
+
+  private final List<ReadableFrameChannel> inputChannels;
+  private final FrameReader frameReader;
+  private final ClusterBy clusterBy;
+  private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
+  private final FrameProcessorExecutor exec;
+  private final File directory;
+  private final OutputChannelFactory outputChannelFactory;
+  private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
+  private final int maxChannelsPerProcessor;
+  private final int maxActiveProcessors;
+  private final long rowLimit;
+  private final String cancellationId;
+
+  private final Object runWorkersLock = new Object();
+
+  @GuardedBy("runWorkersLock")
+  private boolean batcherIsRunning = false;
+
+  @GuardedBy("runWorkersLock")
+  private IntSet inputChannelsToRead = new IntOpenHashSet();
+
+  @GuardedBy("runWorkersLock")
+  private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new 
Int2ObjectArrayMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private List<OutputChannel> outputChannels = null;
+
+  @GuardedBy("runWorkersLock")
+  private int activeProcessors = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long totalInputFrames = UNKNOWN_TOTAL;
+
+  @GuardedBy("runWorkersLock")
+  private int totalMergingLevels = UNKNOWN_LEVEL;
+
+  @GuardedBy("runWorkersLock")
+  private final Queue<Frame> inputBuffer = new ArrayDeque<>();
+
+  @GuardedBy("runWorkersLock")
+  private long inputFramesReadSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long levelZeroMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private int ultimateMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private final Map<File, FrameFile> penultimateFrameFileCache = new 
HashMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private SettableFuture<OutputChannels> allDone = null;
+
+  @GuardedBy("runWorkersLock")
+  SuperSorterProgressTracker superSorterProgressTracker;
+
+  /**
+   * See {@link #setNoWorkRunnable}.
+   */
+  @GuardedBy("runWorkersLock")
+  private Runnable noWorkRunnable = null;
+
+  /**
+   * Initializes a SuperSorter.
+   *
+   * @param inputChannels              input channels. All frames in these 
channels must be sorted according to the
+   *                                   {@link ClusterBy#getColumns()}, or else 
sorting will not produce correct
+   *                                   output.
+   * @param frameReader                frame reader for the input channels
+   * @param clusterBy                  desired sorting order
+   * @param outputPartitionsFuture     a future that resolves to the desired 
output partitions. Sorting will block
+   *                                   prior to writing out final outputs 
until this future resolves. However, the
+   *                                   sorter will be able to read all inputs 
even if this future is unresolved.
+   *                                   If output need not be partitioned, use
+   *                                   {@link 
ClusterByPartitions#oneUniversalPartition()}. In this case a single
+   *                                   sorted channel is generated.
+   * @param exec                       executor to perform work in
+   * @param temporaryDirectory         directory to use for scratch files. 
This must have enough space to store at
+   *                                   least two copies of the dataset in 
{@link FrameFile} format.
+   * @param outputChannelFactory       factory for partitioned, sorted output 
channels
+   * @param innerFrameAllocatorMaker   supplier for allocators that are used 
to make merged frames for intermediate
+   *                                   levels of merging, prior to the final 
output. Final output frame allocation is
+   *                                   controlled by outputChannelFactory. One 
allocator is created per intermediate
+   *                                   scratch file.
+   * @param maxActiveProcessors        maximum number of merging processors to 
execute at once in the provided
+   *                                   {@link FrameProcessorExecutor}
+   * @param maxChannelsPerProcessor    maximum number of channels to merge at 
once per merging processor
+   * @param rowLimit                   limit to apply during sorting. The 
limit is merely advisory: the actual number
+   *                                   of rows returned may be larger than the 
limit. The limit is applied across
+   *                                   all partitions, not to each partition 
individually.
+   * @param cancellationId             cancellation id to use when running 
processors in the provided
+   *                                   {@link FrameProcessorExecutor}.
+   * @param superSorterProgressTracker progress tracker
+   */
+  public SuperSorter(
+      final List<ReadableFrameChannel> inputChannels,
+      final FrameReader frameReader,
+      final ClusterBy clusterBy,
+      final ListenableFuture<ClusterByPartitions> outputPartitionsFuture,
+      final FrameProcessorExecutor exec,
+      final File temporaryDirectory,
+      final OutputChannelFactory outputChannelFactory,
+      final Supplier<MemoryAllocator> innerFrameAllocatorMaker,
+      final int maxActiveProcessors,
+      final int maxChannelsPerProcessor,
+      final long rowLimit,
+      @Nullable final String cancellationId,
+      final SuperSorterProgressTracker superSorterProgressTracker
+  )
+  {
+    this.inputChannels = inputChannels;
+    this.frameReader = frameReader;
+    this.clusterBy = clusterBy;
+    this.outputPartitionsFuture = outputPartitionsFuture;
+    this.exec = exec;
+    this.directory = temporaryDirectory;
+    this.outputChannelFactory = outputChannelFactory;
+    this.innerFrameAllocatorMaker = innerFrameAllocatorMaker;
+    this.maxChannelsPerProcessor = maxChannelsPerProcessor;
+    this.maxActiveProcessors = maxActiveProcessors;
+    this.rowLimit = rowLimit;
+    this.cancellationId = cancellationId;
+    this.superSorterProgressTracker = superSorterProgressTracker;
+
+    for (int i = 0; i < inputChannels.size(); i++) {
+      inputChannelsToRead.add(i);
+    }
+
+    if (maxActiveProcessors < 1) {
+      throw new IAE("maxActiveProcessors[%d] < 1", maxActiveProcessors);
+    }
+
+    if (maxChannelsPerProcessor < 2) {
+      throw new IAE("maxChannelsPerProcessor[%d] < 2", 
maxChannelsPerProcessor);
+    }
+  }
+
+  /**
+   * Starts sorting. Can only be called once. Work is performed in the {@link 
FrameProcessorExecutor} that was
+   * passed to the constructor.
+   *
+   * Returns a future containing partitioned sorted output channels.
+   */
+  public ListenableFuture<OutputChannels> run()
+  {
+    synchronized (runWorkersLock) {
+      if (allDone != null) {
+        throw new ISE("Cannot run() more than once.");
+      }
+
+      allDone = SettableFuture.create();
+      runWorkersIfPossible();
+
+      // When output partitions become known, that may unblock some additional 
layers of merging.
+      outputPartitionsFuture.addListener(
+          () -> {
+            synchronized (runWorkersLock) {
+              if (outputPartitionsFuture.isDone()) { // Update the progress 
tracker
+                
superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size());
+              }
+              runWorkersIfPossible();
+              setAllDoneIfPossible();
+            }
+          },
+          exec.getExecutorService()
+      );
+
+      return FutureUtils.futureWithBaggage(
+          allDone,
+          () -> {
+            synchronized (runWorkersLock) {
+              if (activeProcessors == 0) {
+                cleanUp();
+              }
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Sets a callback that enables tests to see when this SuperSorter cannot do 
any work. Only used for testing.
+   */
+  @VisibleForTesting
+  void setNoWorkRunnable(final Runnable runnable)
+  {
+    synchronized (runWorkersLock) {
+      this.noWorkRunnable = runnable;
+    }
+  }
+
+  /**
+   * Called when a worker finishes.
+   */
+  @GuardedBy("runWorkersLock")
+  private void workerFinished()
+  {
+    activeProcessors -= 1;
+
+    if (log.isDebugEnabled()) {
+      log.debug(stateString());
+    }
+
+    runWorkersIfPossible();
+    setAllDoneIfPossible();
+
+    if (isAllDone() && activeProcessors == 0) {
+      cleanUp();
+    }
+  }
+
+  /**
+   * Tries to launch a new worker, and returns whether it was doable.
+   *
+   * Later workers have priority, i.e., those responsible for merging higher 
levels of the merge tree. Workers that
+   * read the original input channels have the lowest priority. This priority 
order ensures that we don't build up
+   * too much unmerged data.
+   */
+  @GuardedBy("runWorkersLock")
+  private void runWorkersIfPossible()
+  {
+    if (isAllDone()) {
+      // Do nothing if the instance is all done. This can happen in case of 
error or cancellation.
+      return;
+    }
+
+    try {
+      while (activeProcessors < maxActiveProcessors &&
+             (runNextUltimateMerger() || runNextMiddleMerger() || 
runNextLevelZeroMerger() || runNextBatcher())) {
+        activeProcessors += 1;
+
+        if (log.isDebugEnabled()) {
+          log.debug(stateString());
+        }
+      }
+
+      if (activeProcessors == 0 && noWorkRunnable != null) {
+        log.debug("No active workers and no work left to start.");
+
+        // Only called in tests. No need to bother with try/catch and such.
+        noWorkRunnable.run();
+      }
+    }
+    catch (Throwable e) {
+      allDone.setException(e);
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private void setAllDoneIfPossible()
+  {
+    if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
+      // No input data -- generate empty output channels.
+      final ClusterByPartitions partitions = getOutputPartitions();
+      final List<OutputChannel> channels = new ArrayList<>(partitions.size());
+
+      for (int partitionNum = 0; partitionNum < partitions.size(); 
partitionNum++) {
+        channels.add(outputChannelFactory.openNilChannel(partitionNum));
+      }
+
+      // OK to use wrap, not wrapReadOnly, because nil channels are already 
read-only.
+      allDone.set(OutputChannels.wrap(channels));
+    } else if (totalMergingLevels != UNKNOWN_LEVEL
+               && outputsReadyByLevel.containsKey(totalMergingLevels - 1)
+               && outputsReadyByLevel.get(totalMergingLevels - 1).size() == 
getOutputPartitions().size()) {
+      // We're done!!
+      try {
+        // OK to use wrap, not wrapReadOnly, because all channels in this list 
are already read-only.
+        allDone.set(OutputChannels.wrap(outputChannels));
+      }
+      catch (Throwable e) {
+        allDone.setException(e);
+      }
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextBatcher()
+  {
+    if (batcherIsRunning || inputChannelsToRead.isEmpty()) {
+      return false;
+    } else {
+      batcherIsRunning = true;
+
+      runWorker(
+          new FrameChannelBatcher(inputChannels, maxChannelsPerProcessor),
+          result -> {
+            final List<Frame> batch = result.lhs;
+            final IntSet keepReading = result.rhs;
+
+            synchronized (runWorkersLock) {
+              inputBuffer.addAll(batch);
+              inputFramesReadSoFar += batch.size();
+              inputChannelsToRead = keepReading;
+
+              if (inputChannelsToRead.isEmpty()) {
+                inputChannels.forEach(ReadableFrameChannel::doneReading);
+                setTotalInputFrames(inputFramesReadSoFar);
+                runWorkersIfPossible();
+              } else if (inputBuffer.size() >= maxChannelsPerProcessor) {
+                runWorkersIfPossible();
+              }
+
+              batcherIsRunning = false;
+            }
+          }
+      );
+
+      return true;
+    }
+  }
+
+  /**
+   * Level zero mergers read batches of frames from the "inputBuffer". These 
frames are individually sorted, but there
+   * is no ordering between the frames. Their output is a sorted sequence of 
frames.
+   */
+  @GuardedBy("runWorkersLock")
+  private boolean runNextLevelZeroMerger()
+  {
+    if (inputBuffer.isEmpty() || (inputBuffer.size() < maxChannelsPerProcessor 
&& !allInputRead())) {
+      return false;
+    }
+
+    final List<ReadableFrameChannel> in = new ArrayList<>();
+
+    while (in.size() < maxChannelsPerProcessor) {
+      final Frame frame = inputBuffer.poll();
+
+      if (frame == null) {
+        break;
+      }
+
+      in.add(singleReadableFrameChannel(new FrameWithPartition(frame, 
FrameWithPartition.NO_PARTITION)));
+    }
+
+    runMerger(0, levelZeroMergersRunSoFar++, in, null);
+    return true;
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextMiddleMerger()

Review Comment:
   Very cool implementation, great use of concurrency.
   
   The first impression is that this on class does too much: it will be very 
hard to test thoroughly because of the amount of things that happen at once. 
From the school of hard knocks, I've found it best to split up a sort into its 
component pieces, test each to death, then combine them into the overall sort.
   
   Here, for example, the actual mid-level merger could be a separate class (or 
function) so that it can be tested in isolation (two empty inputs, n empty 
inputs, all inputs the same value, lopsided inputs, already-sorted inputs 
chunks of inputs are in ascending order, but don't overlap, etc.)
   
   The complex code to orchestrate "threads" seems similar to that used 
elsewhere: that is multiple copies to review and test. Can there be a single 
runner that handles generic tasks? We test the heck out of that once, and trust 
it in other uses.
   
   In each "concurrent aware" operation, there is a simple sequential operation 
at its core. separating the two (very different) concerns will allow easier 
testing of each. That is, for this function, each merge is single-threaded, but 
coordinates with other workers at the boundaries.
   
   And so on.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java:
##########
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}.
+ *
+ * If you want single threaded execution, use {@code Execs.singleThreaded()}. 
It is not a good idea to use this with a
+ * same-thread executor like {@code Execs.directExecutor()}, because it will 
lead to deep call stacks.
+ */
+public class FrameProcessorExecutor

Review Comment:
   This seems much like a bespoke implementation of Go's go-routines. (I once 
wrote a highly concurrent data pipeline using Go: the runtime handled all this 
fancy synchronization. Such a breeze!)
   
   If so, then a) is there some library that exists that does this already? 
And, b) can this be generalized so it works not just with frame processors, but 
with all manner of "go-routine-like" objects?
   
   The thought is that, in a busy node, there may be many concurrent fragments 
(queries) running, and the ideal is that a single runtime manages all of them 
to ensure fair scheduling across routines, rather than each clump of routines 
doing its own scheduling.



##########
processing/src/main/java/org/apache/druid/frame/channel/WritableFrameChannel.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing a sequence of frames. Supports nonblocking writes 
through the {@link #writabilityFuture()}
+ * method.
+ *
+ * May be implemented using an in-memory queue, disk file, stream, etc.
+ */
+public interface WritableFrameChannel
+{
+  /**
+   * Writes a frame with an attached partition number.
+   *
+   * May throw an exception if {@link #writabilityFuture()} is unresolved.
+   */
+  void write(FrameWithPartition frameWithPartition) throws IOException;
+
+  /**
+   * Writes a frame without an attached partition number.
+   *
+   * May throw an exception if {@link #writabilityFuture()} is unresolved.
+   */
+  default void write(Frame frame) throws IOException
+  {
+    write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
+  }
+
+  /**
+   * Finish writing to this channel, unsuccessfully. Must be followed by a 
call to {@link #doneWriting()}.
+   */
+  void abort() throws IOException;

Review Comment:
   Given our fraught times, maybe `cancel()` is a bit less of a loaded term?



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Channel backed by an {@link InputStream}.
+ *
+ * Frame channels are expected to be nonblocking, but InputStreams cannot be 
read in nonblocking fashion.
+ * This implementation deals with that by using an {@link ExecutorService} to 
read from the stream in a
+ * separate thread.
+ */
+public class ReadableInputStreamFrameChannel implements ReadableFrameChannel
+{
+  private final InputStream inputStream;
+  private final ReadableByteChunksFrameChannel delegate;
+  private final ExecutorService executorService;
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final byte[] buffer = new byte[8 * 1024];
+
+  @GuardedBy("lock")
+  private long totalInputStreamBytesRead = 0;
+
+  @GuardedBy("lock")
+  private boolean inputStreamFinished = false;
+
+  @GuardedBy("lock")
+  private boolean inputStreamError = false;
+
+  private volatile boolean readingStarted = false;
+  private volatile boolean keepReading = true;
+
+  public ReadableInputStreamFrameChannel(InputStream inputStream, String id, 
ExecutorService executorService)
+  {
+    this.inputStream = inputStream;
+    this.delegate = ReadableByteChunksFrameChannel.create(id);
+    this.executorService = executorService;
+  }
+
+  /**
+   * Method needs to be called for reading of input streams into 
ByteChunksFrameChannel
+   */
+  public void startReading()

Review Comment:
   `open()`?
   
   Is there a good reason to separate construction in time from 
`startReading()`? (There are often such good reasons: create the object at the 
start of execution, but open it as late as possible, say.)
   
   Otherwise, should the class follow the usual pattern to do the open 
functionality in the constructor so the client doesn't have to deal with the 
created/opened/closed states?



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableFrameChannel.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+/**
+ * Interface for reading a sequence of frames. Supports nonblocking reads 
through the {@link #canRead()} and
+ * {@link #readabilityFuture()} methods.
+ *
+ * May be implemented using an in-memory queue, disk file, stream, etc.
+ */
+public interface ReadableFrameChannel
+{
+  /**
+   * Returns whether this channel is finished. Finished channels will not 
generate any further frames or errors.
+   *
+   * Generally, once you discover that a channel is finished, you should call 
{@link #doneReading()} and then
+   * discard it.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean isFinished();
+
+  /**
+   * Returns whether this channel has a frame or error condition currently 
available. If this method returns true, then
+   * you can call {@link #read()} to retrieve the frame or error.
+   *
+   * Note that it is possible for a channel to be unfinished and also have no 
available frames or errors. This happens
+   * when it is not in a ready-for-reading state. See {@link 
#readabilityFuture()} for details.
+   */
+  boolean canRead();
+
+  /**
+   * Returns the next available frame from this channel.
+   *
+   * Before calling this method, you should check {@link #canRead()} to ensure 
there is a frame or
+   * error available.
+   *
+   * @throws java.util.NoSuchElementException if there is no frame currently 
available
+   */
+  Frame read();

Review Comment:
   Would it be simpler to combine the methods a different way?
   
   ```java
     enum Status { READY, PENDING, EOF }
   
     Status status();
   ```
   
   And for `read()`, returns null if nothing is available to read. The caller 
can then call `read()` and only if it gets `null`, do the work to call 
`status()` to find out why.



##########
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A FrameProcessor is like an incremental version of Runnable that operates 
on {@link ReadableFrameChannel} and
+ * {@link WritableFrameChannel}.
+ *
+ * It is designed to enable interleaved non-blocking work on a fixed-size 
thread pool. Typically, this is done using
+ * an instance of {@link FrameProcessorExecutor}.
+ */
+public interface FrameProcessor<T>
+{
+  /**
+   * List of input channels. The positions of channels in this list are used 
to build the {@code readableInputs} set
+   * provided to {@link #runIncrementally}.
+   */
+  List<ReadableFrameChannel> inputChannels();
+
+  /**
+   * List of output channels.
+   */
+  List<WritableFrameChannel> outputChannels();
+
+  /**
+   * Runs some of the algorithm, without blocking, and either returns a value 
or a set of input channels
+   * to wait for. This method is called by {@link 
FrameProcessorExecutor#runFully} when all output channels are
+   * writable. Therefore, it is guaranteed that each output channel can accept 
at least one frame.
+   *
+   * This method must not read more than one frame from each readable input 
channel, and must not write more than one
+   * frame to each output channel.
+   *
+   * @param readableInputs channels from {@link #inputChannels()} that are 
either finished or ready to read.
+   *
+   * @return either a final return value or a set of input channels to wait 
for. Must be nonnull.
+   */
+  ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException;
+
+  /**
+   * Cleans up resources used by this worker, including signalling to input 
and output channels that we are
+   * done reading and writing, via {@link ReadableFrameChannel#doneReading()} 
and
+   * {@link WritableFrameChannel#doneWriting()}.
+   *
+   * This method may be called before the worker reports completion via {@link 
#runIncrementally}, especially in
+   * cases of cancellation.
+   */
+  void cleanup() throws IOException;
+}

Review Comment:
   `close()`?



##########
processing/src/main/java/org/apache/druid/frame/processor/MultiColumnSelectorFactory.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class MultiColumnSelectorFactory implements ColumnSelectorFactory

Review Comment:
   Comment to explain what a multi-column selector might be?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to