LakshSingla commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r935340716
########## processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; +import it.unimi.dsi.fastutil.ints.IntSortedSet; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Review Comment: nit: Javadoc would be helpful ########## processing/src/main/java/org/apache/druid/frame/channel/ReadableConcatFrameChannel.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; + +import javax.annotation.Nullable; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Channel that concatenates a sequence of other channels that are provided by an iterator. The iterator is + * walked just-in-time, meaning that {@link Iterator#next()} is not called until the channel is actually ready + * to be used. + * + * The first channel is pulled from the iterator immediately upon construction. + */ +public class ReadableConcatFrameChannel implements ReadableFrameChannel +{ + private final Iterator<ReadableFrameChannel> channelIterator; + + // Null means there were never any channels to begin with. + @Nullable + private ReadableFrameChannel currentChannel; + + private ReadableConcatFrameChannel(Iterator<ReadableFrameChannel> channelIterator) + { + this.channelIterator = channelIterator; + this.currentChannel = channelIterator.hasNext() ? channelIterator.next() : null; + } + + /** + * Creates a new concatenated channel. The first channel is pulled from the provided iterator immediately. + * Other channels are pulled on-demand, when they are ready to be used. + */ + public static ReadableConcatFrameChannel open(final Iterator<ReadableFrameChannel> channelIterator) + { + return new ReadableConcatFrameChannel(channelIterator); + } + + @Override + public boolean isFinished() + { + advanceCurrentChannelIfFinished(); + return currentChannel == null || currentChannel.isFinished(); + } + + @Override + public boolean canRead() + { + advanceCurrentChannelIfFinished(); + return currentChannel != null && currentChannel.canRead(); + } + + @Override + public Frame read() + { + if (!canRead() || currentChannel == null) { Review Comment: Is `currentChannel==null` check required since `canRead()` being false means either: a) `currentChannel == null` wherein this check is redundant b) `currentChannel.canRead()` is `false` wherein the `return currentChannel.read()` statement seems incorrect. I might be wrong, but there seems to be some redundancy here. ########## processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.key; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * Holder object for a set of {@link ClusterByPartition}. There are no preconditions put upon the partitions, except + * that there is at least one of them. + * + * In particular, they are not required to abut each other. See {@link #allAbutting()} to check if this particular list + * of partitions is in fact all abutting. + */ +public class ClusterByPartitions implements Iterable<ClusterByPartition> +{ + private static final ClusterByPartitions ONE_UNIVERSAL_PARTITION = Review Comment: nit: Can we create a special variable `new ClusterByPartition(null, null)` in `ClusterByPartition` class named something like `ONE_UNIVERSAL_PARTITION` which we can reference here (if that seems cleaner)? ########## processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java: ########## @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +/** + * Helper for {@link FrameProcessorExecutor#runAllFully}. Review Comment: Can this be expanded upon further? ########## processing/src/main/java/org/apache/druid/frame/processor/FrameChannelMerger.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntPriorityQueue; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.key.FrameComparisonWidget; +import org.apache.druid.frame.key.RowKey; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.row.FrameColumnSelectorFactory; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Supplier; + +/** + * Processor that merges already-sorted inputChannels and writes a fully-sorted stream to a single outputChannel. + * + * Frames from input channels must be {@link org.apache.druid.frame.FrameType#ROW_BASED}. Output frames will + * be row-based as well. + * + * For unsorted output, use {@link FrameChannelMuxer} instead. + */ +public class FrameChannelMerger implements FrameProcessor<Long> +{ + static final long UNLIMITED = -1; Review Comment: nit: Can this be made private? ########## processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java: ########## @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; + +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * Channel backed by a byte stream that is continuously streamed in using {@link #addChunk}. The byte stream + * must be in the format of a {@link org.apache.druid.frame.file.FrameFile}. + * + * This class is used by {@link org.apache.druid.frame.file.FrameFileHttpResponseHandler} to provide nonblocking + * reads from a remote http server. + */ +public class ReadableByteChunksFrameChannel implements ReadableFrameChannel +{ + private static final Logger log = new Logger(ReadableByteChunksFrameChannel.class); + + /** + * Largest supported frame. Limit exists as a safeguard against streams with huge frame sizes. It is not expected + * that any legitimate frame will be this large: typical usage involves frames an order of magnitude smaller. + */ + private static final long MAX_FRAME_SIZE = 100_000_000; + + private static final int UNKNOWN_LENGTH = -1; + private static final int FRAME_MARKER_BYTES = Byte.BYTES; + private static final int FRAME_MARKER_AND_COMPRESSED_ENVELOPE_BYTES = + Byte.BYTES + Frame.COMPRESSED_FRAME_ENVELOPE_SIZE; + + private enum StreamPart + { + MAGIC, + FRAMES, + FOOTER + } + + private final Object lock = new Object(); + private final String id; + private final long bytesLimit; + + @GuardedBy("lock") + private final List<Either<Throwable, byte[]>> chunks = new ArrayList<>(); + + @GuardedBy("lock") + private SettableFuture<?> addChunkBackpressureFuture = null; + + @GuardedBy("lock") + private SettableFuture<?> readyForReadingFuture = null; + + @GuardedBy("lock") + private boolean noMoreWrites = false; + + @GuardedBy("lock") + private int positionInFirstChunk = 0; + + @GuardedBy("lock") + private long bytesBuffered = 0; + + @GuardedBy("lock") + private long bytesAdded = 0; + + @GuardedBy("lock") + private long nextCompressedFrameLength = UNKNOWN_LENGTH; + + @GuardedBy("lock") + private StreamPart streamPart = StreamPart.MAGIC; + + private ReadableByteChunksFrameChannel(String id, long bytesLimit) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.bytesLimit = bytesLimit; + } + + /** + * Create a channel that aims to limit its memory footprint to one frame. The channel exerts backpressure Review Comment: I am confused because Javadoc mentions that it aims to limit memory footprint to one frame, although it returns a `ReadableByteChunksFrameChannel` whose `bytesLimit` is set to 1. ########## processing/src/main/java/org/apache/druid/frame/key/ClusterByPartitions.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.key; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.java.util.common.IAE; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +/** + * Holder object for a set of {@link ClusterByPartition}. There are no preconditions put upon the partitions, except + * that there is at least one of them. + * + * In particular, they are not required to abut each other. See {@link #allAbutting()} to check if this particular list Review Comment: While the partitions may not be abutting, is there any implicit restriction on overlapping partitions? ########## processing/src/main/java/org/apache/druid/frame/processor/BlockingQueueOutputChannelFactory.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import org.apache.druid.frame.allocation.ArenaMemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; + +public class BlockingQueueOutputChannelFactory implements OutputChannelFactory Review Comment: ```suggestion /** * Output channel factory implementation backed by a BlockingQueueFrameChannel */ public class BlockingQueueOutputChannelFactory implements OutputChannelFactory ``` ########## processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java: ########## @@ -0,0 +1,867 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongRBTreeSet; +import it.unimi.dsi.fastutil.longs.LongSortedSet; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.channel.WritableStreamFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +/** + * Sorts and partitions a dataset using parallel external merge sort. + * + * Input is provided as a set of {@link ReadableFrameChannel} and output is provided as {@link OutputChannels}. + * Work is performed on a provided {@link FrameProcessorExecutor}. + * + * The most central point for SuperSorter logic is the {@link #runWorkersIfPossible} method, which determines what + * needs to be done next based on the current state of the SuperSorter. The logic is: + * + * 1) Read input channels into {@link #inputBuffer} using {@link FrameChannelBatcher}, launched via + * {@link #runNextBatcher()}, up to a limit of {@link #maxChannelsPerProcessor} per batcher. + * + * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} scratch files using + * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}. + * + * 3a) Merge level 0 scratch files into level 1 scratch files using {@link FrameChannelMerger} launched from + * {@link #runNextMiddleMerger()}, processing up to {@link #maxChannelsPerProcessor} files per merger. + * Continue this process through increasing level numbers, with the size of scratch files increasing by a factor + * of {@link #maxChannelsPerProcessor} each level. + * + * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by {@link #runNextMiddleMerger()} writes + * partitioned {@link FrameFile} scratch files. The penultimate level cannot be written until + * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by this point, the SuperSorter pauses. + * The SuperSorter resumes and writes the penultimate level's files when the future resolves. + * + * 4) Write the final level using {@link FrameChannelMerger} launched from {@link #runNextUltimateMerger()}. + * Outputs for this level are written to channels provided by {@link #outputChannelFactory}, rather than scratch files. + * + * At all points, higher level processing is preferred over lower-level processing. Writing to final output files + * is preferred over intermediate, and writing to intermediate files is preferred over reading inputs. These + * preferences ensure that the amount of data buffered up in memory does not grow too large. + * + * Potential future work (things we could optimize if necessary): + * + * - Collapse merging to a single level if level zero has one merger, and we want to write one output partition. + * - Skip batching, and inject directly into level 0, if input channels are already individually fully-sorted. + * - Combine (for example: aggregate) while merging. + */ +public class SuperSorter +{ + private static final Logger log = new Logger(SuperSorter.class); + + public static final int UNKNOWN_LEVEL = -1; + public static final long UNKNOWN_TOTAL = -1; + + private final List<ReadableFrameChannel> inputChannels; + private final FrameReader frameReader; + private final ClusterBy clusterBy; + private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture; + private final FrameProcessorExecutor exec; + private final File directory; + private final OutputChannelFactory outputChannelFactory; + private final Supplier<MemoryAllocator> innerFrameAllocatorMaker; + private final int maxChannelsPerProcessor; + private final int maxActiveProcessors; + private final long rowLimit; + private final String cancellationId; + + private final Object runWorkersLock = new Object(); + + @GuardedBy("runWorkersLock") + private boolean batcherIsRunning = false; + + @GuardedBy("runWorkersLock") + private IntSet inputChannelsToRead = new IntOpenHashSet(); + + @GuardedBy("runWorkersLock") + private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap<>(); + + @GuardedBy("runWorkersLock") + private List<OutputChannel> outputChannels = null; + + @GuardedBy("runWorkersLock") + private int activeProcessors = 0; + + @GuardedBy("runWorkersLock") + private long totalInputFrames = UNKNOWN_TOTAL; + + @GuardedBy("runWorkersLock") + private int totalMergingLevels = UNKNOWN_LEVEL; + + @GuardedBy("runWorkersLock") + private final Queue<Frame> inputBuffer = new ArrayDeque<>(); + + @GuardedBy("runWorkersLock") + private long inputFramesReadSoFar = 0; + + @GuardedBy("runWorkersLock") + private long levelZeroMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private int ultimateMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap<>(); + + @GuardedBy("runWorkersLock") + private SettableFuture<OutputChannels> allDone = null; + + @GuardedBy("runWorkersLock") + SuperSorterProgressTracker superSorterProgressTracker; + + /** + * See {@link #setNoWorkRunnable}. + */ + @GuardedBy("runWorkersLock") + private Runnable noWorkRunnable = null; + + /** + * Initializes a SuperSorter. + * + * @param inputChannels input channels. All frames in these channels must be sorted according to the + * {@link ClusterBy#getColumns()}, or else sorting will not produce correct + * output. + * @param frameReader frame reader for the input channels + * @param clusterBy desired sorting order + * @param outputPartitionsFuture a future that resolves to the desired output partitions. Sorting will block + * prior to writing out final outputs until this future resolves. However, the + * sorter will be able to read all inputs even if this future is unresolved. + * If output need not be partitioned, use + * {@link ClusterByPartitions#oneUniversalPartition()}. In this case a single + * sorted channel is generated. + * @param exec executor to perform work in + * @param temporaryDirectory directory to use for scratch files. This must have enough space to store at + * least two copies of the dataset in {@link FrameFile} format. + * @param outputChannelFactory factory for partitioned, sorted output channels + * @param innerFrameAllocatorMaker supplier for allocators that are used to make merged frames for intermediate + * levels of merging, prior to the final output. Final output frame allocation is + * controlled by outputChannelFactory. One allocator is created per intermediate + * scratch file. + * @param maxActiveProcessors maximum number of merging processors to execute at once in the provided + * {@link FrameProcessorExecutor} + * @param maxChannelsPerProcessor maximum number of channels to merge at once per merging processor + * @param rowLimit limit to apply during sorting. The limit is merely advisory: the actual number Review Comment: Is this a soft limit on the final number of rows that _get output post sorting_? Or is this a limit on the number of rows per partition? ########## processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.java.util.common.ISE; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Limiter for access to some resource. + * + * Used by {@link FrameProcessorExecutor#runAllFully} to limit the number of outstanding processors. + */ +public class Bouncer Review Comment: Would it be more appropriate to move this class into `org.apache.druid.frame.util`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
