gianm commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r935978411
########## processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntPriorityQueue; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.key.FrameComparisonWidget; +import org.apache.druid.frame.key.RowKey; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.row.FrameColumnSelectorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +/** + * Processor that merges already-sorted inputChannels and writes a fully-sorted stream to a single outputChannel. + * + * Frames from input channels must be {@link org.apache.druid.frame.FrameType#ROW_BASED}. Output frames will + * be row-based as well. + * + * For unsorted output, use {@link FrameChannelMuxer} instead. + */ +public class FrameChannelMerger implements FrameProcessor<Long> +{ + static final long UNLIMITED = -1; Review Comment: Yes, it can be! I changed it. ########## processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java: ########## @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +/** + * Helper for {@link FrameProcessorExecutor#runAllFully}. Review Comment: I added a few comments. Please let me know if there's more info you were looking for that I didn't include. ########## processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java: ########## @@ -0,0 +1,867 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongRBTreeSet; +import it.unimi.dsi.fastutil.longs.LongSortedSet; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.channel.WritableStreamFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +/** + * Sorts and partitions a dataset using parallel external merge sort. + * + * Input is provided as a set of {@link ReadableFrameChannel} and output is provided as {@link OutputChannels}. + * Work is performed on a provided {@link FrameProcessorExecutor}. + * + * The most central point for SuperSorter logic is the {@link #runWorkersIfPossible} method, which determines what + * needs to be done next based on the current state of the SuperSorter. The logic is: + * + * 1) Read input channels into {@link #inputBuffer} using {@link FrameChannelBatcher}, launched via + * {@link #runNextBatcher()}, up to a limit of {@link #maxChannelsPerProcessor} per batcher. + * + * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} scratch files using + * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}. + * + * 3a) Merge level 0 scratch files into level 1 scratch files using {@link FrameChannelMerger} launched from + * {@link #runNextMiddleMerger()}, processing up to {@link #maxChannelsPerProcessor} files per merger. + * Continue this process through increasing level numbers, with the size of scratch files increasing by a factor + * of {@link #maxChannelsPerProcessor} each level. + * + * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by {@link #runNextMiddleMerger()} writes + * partitioned {@link FrameFile} scratch files. The penultimate level cannot be written until + * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by this point, the SuperSorter pauses. + * The SuperSorter resumes and writes the penultimate level's files when the future resolves. + * + * 4) Write the final level using {@link FrameChannelMerger} launched from {@link #runNextUltimateMerger()}. + * Outputs for this level are written to channels provided by {@link #outputChannelFactory}, rather than scratch files. + * + * At all points, higher level processing is preferred over lower-level processing. Writing to final output files + * is preferred over intermediate, and writing to intermediate files is preferred over reading inputs. These + * preferences ensure that the amount of data buffered up in memory does not grow too large. + * + * Potential future work (things we could optimize if necessary): + * + * - Collapse merging to a single level if level zero has one merger, and we want to write one output partition. + * - Skip batching, and inject directly into level 0, if input channels are already individually fully-sorted. + * - Combine (for example: aggregate) while merging. + */ +public class SuperSorter +{ + private static final Logger log = new Logger(SuperSorter.class); + + public static final int UNKNOWN_LEVEL = -1; + public static final long UNKNOWN_TOTAL = -1; + + private final List<ReadableFrameChannel> inputChannels; + private final FrameReader frameReader; + private final ClusterBy clusterBy; + private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture; + private final FrameProcessorExecutor exec; + private final File directory; + private final OutputChannelFactory outputChannelFactory; + private final Supplier<MemoryAllocator> innerFrameAllocatorMaker; + private final int maxChannelsPerProcessor; + private final int maxActiveProcessors; + private final long rowLimit; + private final String cancellationId; + + private final Object runWorkersLock = new Object(); + + @GuardedBy("runWorkersLock") + private boolean batcherIsRunning = false; + + @GuardedBy("runWorkersLock") + private IntSet inputChannelsToRead = new IntOpenHashSet(); + + @GuardedBy("runWorkersLock") + private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap<>(); + + @GuardedBy("runWorkersLock") + private List<OutputChannel> outputChannels = null; + + @GuardedBy("runWorkersLock") + private int activeProcessors = 0; + + @GuardedBy("runWorkersLock") + private long totalInputFrames = UNKNOWN_TOTAL; + + @GuardedBy("runWorkersLock") + private int totalMergingLevels = UNKNOWN_LEVEL; + + @GuardedBy("runWorkersLock") + private final Queue<Frame> inputBuffer = new ArrayDeque<>(); + + @GuardedBy("runWorkersLock") + private long inputFramesReadSoFar = 0; + + @GuardedBy("runWorkersLock") + private long levelZeroMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private int ultimateMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap<>(); + + @GuardedBy("runWorkersLock") + private SettableFuture<OutputChannels> allDone = null; + + @GuardedBy("runWorkersLock") + SuperSorterProgressTracker superSorterProgressTracker; + + /** + * See {@link #setNoWorkRunnable}. + */ + @GuardedBy("runWorkersLock") + private Runnable noWorkRunnable = null; + + /** + * Initializes a SuperSorter. + * + * @param inputChannels input channels. All frames in these channels must be sorted according to the + * {@link ClusterBy#getColumns()}, or else sorting will not produce correct + * output. + * @param frameReader frame reader for the input channels + * @param clusterBy desired sorting order + * @param outputPartitionsFuture a future that resolves to the desired output partitions. Sorting will block + * prior to writing out final outputs until this future resolves. However, the + * sorter will be able to read all inputs even if this future is unresolved. + * If output need not be partitioned, use + * {@link ClusterByPartitions#oneUniversalPartition()}. In this case a single + * sorted channel is generated. + * @param exec executor to perform work in + * @param temporaryDirectory directory to use for scratch files. This must have enough space to store at + * least two copies of the dataset in {@link FrameFile} format. + * @param outputChannelFactory factory for partitioned, sorted output channels + * @param innerFrameAllocatorMaker supplier for allocators that are used to make merged frames for intermediate + * levels of merging, prior to the final output. Final output frame allocation is + * controlled by outputChannelFactory. One allocator is created per intermediate + * scratch file. + * @param maxActiveProcessors maximum number of merging processors to execute at once in the provided + * {@link FrameProcessorExecutor} + * @param maxChannelsPerProcessor maximum number of channels to merge at once per merging processor + * @param rowLimit limit to apply during sorting. The limit is merely advisory: the actual number Review Comment: It's applied across all partitions, not to each partition individually. I added that sentence to the javadoc to hopefully clarify it. ########## processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.key; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * Holder object for a set of {@link ClusterByPartition}. There are no preconditions put upon the partitions, except + * that there is at least one of them. + * + * In particular, they are not required to abut each other. See {@link #allAbutting()} to check if this particular list + * of partitions is in fact all abutting. + */ +public class ClusterByPartitions implements Iterable<ClusterByPartition> +{ + private static final ClusterByPartitions ONE_UNIVERSAL_PARTITION = Review Comment: Ah, I did it this way because in practice, we don't ever need a single universal `ClusterByPartition` by itself. We always need it wrapped up in a `ClusterByPartitions` instance. ########## processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.key; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * Holder object for a set of {@link ClusterByPartition}. There are no preconditions put upon the partitions, except + * that there is at least one of them. + * + * In particular, they are not required to abut each other. See {@link #allAbutting()} to check if this particular list Review Comment: The class doesn't enforce anything, including anything related overlappingness. `allAbutting` can be used to check that the partitions are all abutting and non-overlapping (because abutting implies non-overlapping). I added some detail to the comment to hopefully clarify this. ########## processing/src/main/java/org/apache/druid/frame/channel/ReadableConcatFrameChannel.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Channel that concatenates a sequence of other channels that are provided by an iterator. The iterator is + * walked just-in-time, meaning that {@link Iterator#next()} is not called until the channel is actually ready + * to be used. + * + * The first channel is pulled from the iterator immediately upon construction. + */ +public class ReadableConcatFrameChannel implements ReadableFrameChannel +{ + private final Iterator<ReadableFrameChannel> channelIterator; + + // Null means there were never any channels to begin with. + @Nullable + private ReadableFrameChannel currentChannel; + + private ReadableConcatFrameChannel(Iterator<ReadableFrameChannel> channelIterator) + { + this.channelIterator = channelIterator; + this.currentChannel = channelIterator.hasNext() ? channelIterator.next() : null; + } + + /** + * Creates a new concatenated channel. The first channel is pulled from the provided iterator immediately. + * Other channels are pulled on-demand, when they are ready to be used. + */ + public static ReadableConcatFrameChannel open(final Iterator<ReadableFrameChannel> channelIterator) + { + return new ReadableConcatFrameChannel(channelIterator); + } + + @Override + public boolean isFinished() + { + advanceCurrentChannelIfFinished(); + return currentChannel == null || currentChannel.isFinished(); + } + + @Override + public boolean canRead() + { + advanceCurrentChannelIfFinished(); + return currentChannel != null && currentChannel.canRead(); + } + + @Override + public Frame read() + { + if (!canRead() || currentChannel == null) { Review Comment: Hmm, I think you're right. I removed the `currentChannel == null` part of the check. ########## processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.java.util.common.ISE; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Limiter for access to some resource. + * + * Used by {@link FrameProcessorExecutor#runAllFully} to limit the number of outstanding processors. + */ +public class Bouncer Review Comment: Hmm. I had it here because its purpose is to be an argument to `FrameProcessorExecutor#runAllFully`, which is part of the processor package. So it makes sense to me in this package. ########## processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java: ########## @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; + +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * Channel backed by a byte stream that is continuously streamed in using {@link #addChunk}. The byte stream + * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}. + * + * This class is used by {@link org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking + * reads from a remote http server. + */ +public class ReadableByteChunksFrameChannel implements ReadableFrameChannel +{ + private static final Logger log = new Logger(ReadableByteChunksFrameChannel.class); + + /** + * Largest supported frame. Limit exists as a safeguard against streams with huge frame sizes. It is not expected + * that any legitimate frame will be this large: typical usage involves frames an order of magnitude smaller. + */ + private static final long MAX_FRAME_SIZE = 100_000_000; + + private static final int UNKNOWN_LENGTH = -1; + private static final int FRAME_MARKER_BYTES = Byte.BYTES; + private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES = + Byte.BYTES + Frame.COMPRESSED_FRAME_ENVELOPE_SIZE; + + private enum StreamPart + { + MAGIC, + FRAMES, + FOOTER + } + + private final Object lock = new Object(); + private final String id; + private final long bytesLimit; + + @GuardedBy("lock") + private final List<Either<Throwable, byte[]>> chunks = new ArrayList<>(); + + @GuardedBy("lock") + private SettableFuture<?> addChunkBackpressureFuture = null; + + @GuardedBy("lock") + private SettableFuture<?> readyForReadingFuture = null; + + @GuardedBy("lock") + private boolean noMoreWrites = false; + + @GuardedBy("lock") + private int positionInFirstChunk = 0; + + @GuardedBy("lock") + private long bytesBuffered = 0; + + @GuardedBy("lock") + private long bytesAdded = 0; + + @GuardedBy("lock") + private long nextCompressedFrameLength = UNKNOWN_LENGTH; + + @GuardedBy("lock") + private StreamPart streamPart = StreamPart.MAGIC; + + private ReadableByteChunksFrameChannel(String id, long bytesLimit) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.bytesLimit = bytesLimit; + } + + /** + * Create a channel that aims to limit its memory footprint to one frame. The channel exerts backpressure Review Comment: Ah, it works because the limit is soft. I added a comment explaining it: ``` // Set byte limit to 1, so backpressure will be exerted as soon as we have a full frame buffered. // (The bytesLimit is soft: it will be exceeded if needed to store a complete frame.) ``` ########## processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; +import it.unimi.dsi.fastutil.ints.IntSortedSet; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Review Comment: Added Javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
