gianm commented on code in PR #12848:
URL: https://github.com/apache/druid/pull/12848#discussion_r935978411


##########
processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntPriorityQueue;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.key.FrameComparisonWidget;
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.row.FrameColumnSelectorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Supplier;
+
+/**
+ * Processor that merges already-sorted inputChannels and writes a 
fully-sorted stream to a single outputChannel.
+ *
+ * Frames from input channels must be {@link 
org.apache.druid.frame.FrameType#ROW_BASED}. Output frames will
+ * be row-based as well.
+ *
+ * For unsorted output, use {@link FrameChannelMuxer} instead.
+ */
+public class FrameChannelMerger implements FrameProcessor<Long>
+{
+  static final long UNLIMITED = -1;

Review Comment:
   Yes, it can be! I changed it.



##########
processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+
+/**
+ * Helper for {@link FrameProcessorExecutor#runAllFully}.

Review Comment:
   I added a few comments. Please let me know if there's more info you were 
looking for that I didn't include.



##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -0,0 +1,867 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.channel.WritableStreamFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+/**
+ * Sorts and partitions a dataset using parallel external merge sort.
+ *
+ * Input is provided as a set of {@link ReadableFrameChannel} and output is 
provided as {@link OutputChannels}.
+ * Work is performed on a provided {@link FrameProcessorExecutor}.
+ *
+ * The most central point for SuperSorter logic is the {@link 
#runWorkersIfPossible} method, which determines what
+ * needs to be done next based on the current state of the SuperSorter. The 
logic is:
+ *
+ * 1) Read input channels into {@link #inputBuffer} using {@link 
FrameChannelBatcher}, launched via
+ * {@link #runNextBatcher()}, up to a limit of {@link 
#maxChannelsPerProcessor} per batcher.
+ *
+ * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} 
scratch files using
+ * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}.
+ *
+ * 3a) Merge level 0 scratch files into level 1 scratch files using {@link 
FrameChannelMerger} launched from
+ * {@link #runNextMiddleMerger()}, processing up to {@link 
#maxChannelsPerProcessor} files per merger.
+ * Continue this process through increasing level numbers, with the size of 
scratch files increasing by a factor
+ * of {@link #maxChannelsPerProcessor} each level.
+ *
+ * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by 
{@link #runNextMiddleMerger()} writes
+ * partitioned {@link FrameFile} scratch files. The penultimate level cannot 
be written until
+ * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by 
this point, the SuperSorter pauses.
+ * The SuperSorter resumes and writes the penultimate level's files when the 
future resolves.
+ *
+ * 4) Write the final level using {@link FrameChannelMerger} launched from 
{@link #runNextUltimateMerger()}.
+ * Outputs for this level are written to channels provided by {@link 
#outputChannelFactory}, rather than scratch files.
+ *
+ * At all points, higher level processing is preferred over lower-level 
processing. Writing to final output files
+ * is preferred over intermediate, and writing to intermediate files is 
preferred over reading inputs. These
+ * preferences ensure that the amount of data buffered up in memory does not 
grow too large.
+ *
+ * Potential future work (things we could optimize if necessary):
+ *
+ * - Collapse merging to a single level if level zero has one merger, and we 
want to write one output partition.
+ * - Skip batching, and inject directly into level 0, if input channels are 
already individually fully-sorted.
+ * - Combine (for example: aggregate) while merging.
+ */
+public class SuperSorter
+{
+  private static final Logger log = new Logger(SuperSorter.class);
+
+  public static final int UNKNOWN_LEVEL = -1;
+  public static final long UNKNOWN_TOTAL = -1;
+
+  private final List<ReadableFrameChannel> inputChannels;
+  private final FrameReader frameReader;
+  private final ClusterBy clusterBy;
+  private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
+  private final FrameProcessorExecutor exec;
+  private final File directory;
+  private final OutputChannelFactory outputChannelFactory;
+  private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
+  private final int maxChannelsPerProcessor;
+  private final int maxActiveProcessors;
+  private final long rowLimit;
+  private final String cancellationId;
+
+  private final Object runWorkersLock = new Object();
+
+  @GuardedBy("runWorkersLock")
+  private boolean batcherIsRunning = false;
+
+  @GuardedBy("runWorkersLock")
+  private IntSet inputChannelsToRead = new IntOpenHashSet();
+
+  @GuardedBy("runWorkersLock")
+  private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new 
Int2ObjectArrayMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private List<OutputChannel> outputChannels = null;
+
+  @GuardedBy("runWorkersLock")
+  private int activeProcessors = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long totalInputFrames = UNKNOWN_TOTAL;
+
+  @GuardedBy("runWorkersLock")
+  private int totalMergingLevels = UNKNOWN_LEVEL;
+
+  @GuardedBy("runWorkersLock")
+  private final Queue<Frame> inputBuffer = new ArrayDeque<>();
+
+  @GuardedBy("runWorkersLock")
+  private long inputFramesReadSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long levelZeroMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private int ultimateMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private final Map<File, FrameFile> penultimateFrameFileCache = new 
HashMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private SettableFuture<OutputChannels> allDone = null;
+
+  @GuardedBy("runWorkersLock")
+  SuperSorterProgressTracker superSorterProgressTracker;
+
+  /**
+   * See {@link #setNoWorkRunnable}.
+   */
+  @GuardedBy("runWorkersLock")
+  private Runnable noWorkRunnable = null;
+
+  /**
+   * Initializes a SuperSorter.
+   *
+   * @param inputChannels              input channels. All frames in these 
channels must be sorted according to the
+   *                                   {@link ClusterBy#getColumns()}, or else 
sorting will not produce correct
+   *                                   output.
+   * @param frameReader                frame reader for the input channels
+   * @param clusterBy                  desired sorting order
+   * @param outputPartitionsFuture     a future that resolves to the desired 
output partitions. Sorting will block
+   *                                   prior to writing out final outputs 
until this future resolves. However, the
+   *                                   sorter will be able to read all inputs 
even if this future is unresolved.
+   *                                   If output need not be partitioned, use
+   *                                   {@link 
ClusterByPartitions#oneUniversalPartition()}. In this case a single
+   *                                   sorted channel is generated.
+   * @param exec                       executor to perform work in
+   * @param temporaryDirectory         directory to use for scratch files. 
This must have enough space to store at
+   *                                   least two copies of the dataset in 
{@link FrameFile} format.
+   * @param outputChannelFactory       factory for partitioned, sorted output 
channels
+   * @param innerFrameAllocatorMaker   supplier for allocators that are used 
to make merged frames for intermediate
+   *                                   levels of merging, prior to the final 
output. Final output frame allocation is
+   *                                   controlled by outputChannelFactory. One 
allocator is created per intermediate
+   *                                   scratch file.
+   * @param maxActiveProcessors        maximum number of merging processors to 
execute at once in the provided
+   *                                   {@link FrameProcessorExecutor}
+   * @param maxChannelsPerProcessor    maximum number of channels to merge at 
once per merging processor
+   * @param rowLimit                   limit to apply during sorting. The 
limit is merely advisory: the actual number

Review Comment:
   It's applied across all partitions, not to each partition individually. I 
added that sentence to the javadoc to hopefully clarify it.



##########
processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Holder object for a set of {@link ClusterByPartition}. There are no 
preconditions put upon the partitions, except
+ * that there is at least one of them.
+ *
+ * In particular, they are not required to abut each other. See {@link 
#allAbutting()} to check if this particular list
+ * of partitions is in fact all abutting.
+ */
+public class ClusterByPartitions implements Iterable<ClusterByPartition>
+{
+  private static final ClusterByPartitions ONE_UNIVERSAL_PARTITION =

Review Comment:
   Ah, I did it this way because in practice, we don't ever need a single 
universal `ClusterByPartition` by itself. We always need it wrapped up in a 
`ClusterByPartitions` instance.



##########
processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.key;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Holder object for a set of {@link ClusterByPartition}. There are no 
preconditions put upon the partitions, except
+ * that there is at least one of them.
+ *
+ * In particular, they are not required to abut each other. See {@link 
#allAbutting()} to check if this particular list

Review Comment:
   The class doesn't enforce anything, including anything related 
overlappingness. `allAbutting` can be used to check that the partitions are all 
abutting and non-overlapping (because abutting implies non-overlapping). I 
added some detail to the comment to hopefully clarify this.



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableConcatFrameChannel.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Channel that concatenates a sequence of other channels that are provided by 
an iterator. The iterator is
+ * walked just-in-time, meaning that {@link Iterator#next()} is not called 
until the channel is actually ready
+ * to be used.
+ *
+ * The first channel is pulled from the iterator immediately upon construction.
+ */
+public class ReadableConcatFrameChannel implements ReadableFrameChannel
+{
+  private final Iterator<ReadableFrameChannel> channelIterator;
+
+  // Null means there were never any channels to begin with.
+  @Nullable
+  private ReadableFrameChannel currentChannel;
+
+  private ReadableConcatFrameChannel(Iterator<ReadableFrameChannel> 
channelIterator)
+  {
+    this.channelIterator = channelIterator;
+    this.currentChannel = channelIterator.hasNext() ? channelIterator.next() : 
null;
+  }
+
+  /**
+   * Creates a new concatenated channel. The first channel is pulled from the 
provided iterator immediately.
+   * Other channels are pulled on-demand, when they are ready to be used.
+   */
+  public static ReadableConcatFrameChannel open(final 
Iterator<ReadableFrameChannel> channelIterator)
+  {
+    return new ReadableConcatFrameChannel(channelIterator);
+  }
+
+  @Override
+  public boolean isFinished()
+  {
+    advanceCurrentChannelIfFinished();
+    return currentChannel == null || currentChannel.isFinished();
+  }
+
+  @Override
+  public boolean canRead()
+  {
+    advanceCurrentChannelIfFinished();
+    return currentChannel != null && currentChannel.canRead();
+  }
+
+  @Override
+  public Frame read()
+  {
+    if (!canRead() || currentChannel == null) {

Review Comment:
   Hmm, I think you're right. I removed the `currentChannel == null` part of 
the check.



##########
processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Limiter for access to some resource.
+ *
+ * Used by {@link FrameProcessorExecutor#runAllFully} to limit the number of 
outstanding processors.
+ */
+public class Bouncer

Review Comment:
   Hmm. I had it here because its purpose is to be an argument to 
`FrameProcessorExecutor#runAllFully`, which is part of the processor package. 
So it makes sense to me in this package.



##########
processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * Channel backed by a byte stream that is continuously streamed in using 
{@link #addChunk}. The byte stream
+ * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}.
+ *
+ * This class is used by {@link 
org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking
+ * reads from a remote http server.
+ */
+public class ReadableByteChunksFrameChannel implements ReadableFrameChannel
+{
+  private static final Logger log = new 
Logger(ReadableByteChunksFrameChannel.class);
+
+  /**
+   * Largest supported frame. Limit exists as a safeguard against streams with 
huge frame sizes. It is not expected
+   * that any legitimate frame will be this large: typical usage involves 
frames an order of magnitude smaller.
+   */
+  private static final long MAX_FRAME_SIZE = 100_000_000;
+
+  private static final int UNKNOWN_LENGTH = -1;
+  private static final int FRAME_MARKER_BYTES = Byte.BYTES;
+  private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES =
+      Byte.BYTES + Frame.COMPRESSED_FRAME_ENVELOPE_SIZE;
+
+  private enum StreamPart
+  {
+    MAGIC,
+    FRAMES,
+    FOOTER
+  }
+
+  private final Object lock = new Object();
+  private final String id;
+  private final long bytesLimit;
+
+  @GuardedBy("lock")
+  private final List<Either<Throwable, byte[]>> chunks = new ArrayList<>();
+
+  @GuardedBy("lock")
+  private SettableFuture<?> addChunkBackpressureFuture = null;
+
+  @GuardedBy("lock")
+  private SettableFuture<?> readyForReadingFuture = null;
+
+  @GuardedBy("lock")
+  private boolean noMoreWrites = false;
+
+  @GuardedBy("lock")
+  private int positionInFirstChunk = 0;
+
+  @GuardedBy("lock")
+  private long bytesBuffered = 0;
+
+  @GuardedBy("lock")
+  private long bytesAdded = 0;
+
+  @GuardedBy("lock")
+  private long nextCompressedFrameLength = UNKNOWN_LENGTH;
+
+  @GuardedBy("lock")
+  private StreamPart streamPart = StreamPart.MAGIC;
+
+  private ReadableByteChunksFrameChannel(String id, long bytesLimit)
+  {
+    this.id = Preconditions.checkNotNull(id, "id");
+    this.bytesLimit = bytesLimit;
+  }
+
+  /**
+   * Create a channel that aims to limit its memory footprint to one frame. 
The channel exerts backpressure

Review Comment:
   Ah, it works because the limit is soft. I added a comment explaining it:
   
   ```
       // Set byte limit to 1, so backpressure will be exerted as soon as we 
have a full frame buffered.
       // (The bytesLimit is soft: it will be exceeded if needed to store a 
complete frame.)
   ```



##########
processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
+import it.unimi.dsi.fastutil.ints.IntSortedSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ *

Review Comment:
   Added Javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to