paul-rogers commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r937150038
########## processing/src/main/java/org/apache/druid/frame/channel/ReadableFrameChannel.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.frame.Frame; + +/** + * Interface for reading a sequence of frames. Supports nonblocking reads through the {@link #canRead()} and + * {@link #readabilityFuture()} methods. + * + * May be implemented using an in-memory queue, disk file, stream, etc. + */ +public interface ReadableFrameChannel +{ + /** + * Returns whether this channel is finished. Finished channels will not generate any further frames or errors. + * + * Generally, once you discover that a channel is finished, you should call {@link #doneReading()} and then + * discard it. + * + * Note that it is possible for a channel to be unfinished and also have no available frames or errors. This happens + * when it is not in a ready-for-reading state. See {@link #readabilityFuture()} for details. + */ + boolean isFinished(); + + /** + * Returns whether this channel has a frame or error condition currently available. If this method returns true, then + * you can call {@link #read()} to retrieve the frame or error. + * + * Note that it is possible for a channel to be unfinished and also have no available frames or errors. This happens + * when it is not in a ready-for-reading state. See {@link #readabilityFuture()} for details. + */ + boolean canRead(); + + /** + * Returns the next available frame from this channel. + * + * Before calling this method, you should check {@link #canRead()} to ensure there is a frame or + * error available. + * + * @throws java.util.NoSuchElementException if there is no frame currently available + */ + Frame read(); Review Comment: I see the point, and the logic is clear. Perhaps the enum is overkill. Still, the minimize-steps-in-the-inner-loop code I had in mind is: ```java data = channel.read(); if (data != null) { // handle next frame } else if (channel.isFinished()) { // handle eof } else { // no data available, hang out for a while } ``` But, I suppose I'm guilty of pre-mature optimization: we don't know if this actually _is_ an inner loop, so it's fine as is. ########## processing/src/main/java/org/apache/druid/frame/processor/FrameProcessor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; + +import java.io.IOException; +import java.util.List; + +/** + * A FrameProcessor is like an incremental version of Runnable that operates on {@link ReadableFrameChannel} and + * {@link WritableFrameChannel}. + * + * It is designed to enable interleaved non-blocking work on a fixed-size thread pool. Typically, this is done using + * an instance of {@link FrameProcessorExecutor}. + */ +public interface FrameProcessor<T> +{ + /** + * List of input channels. The positions of channels in this list are used to build the {@code readableInputs} set + * provided to {@link #runIncrementally}. + */ + List<ReadableFrameChannel> inputChannels(); + + /** + * List of output channels. + */ + List<WritableFrameChannel> outputChannels(); + + /** + * Runs some of the algorithm, without blocking, and either returns a value or a set of input channels + * to wait for. This method is called by {@link FrameProcessorExecutor#runFully} when all output channels are + * writable. Therefore, it is guaranteed that each output channel can accept at least one frame. + * + * This method must not read more than one frame from each readable input channel, and must not write more than one + * frame to each output channel. + * + * @param readableInputs channels from {@link #inputChannels()} that are either finished or ready to read. + * + * @return either a final return value or a set of input channels to wait for. Must be nonnull. + */ + ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws IOException; + + /** + * Cleans up resources used by this worker, including signalling to input and output channels that we are + * done reading and writing, via {@link ReadableFrameChannel#doneReading()} and + * {@link WritableFrameChannel#doneWriting()}. + * + * This method may be called before the worker reports completion via {@link #runIncrementally}, especially in + * cases of cancellation. + */ + void cleanup() throws IOException; +} Review Comment: We are free to call a method `close()` without implementing `Closeable`. Implementing `Closeable` is really only needed when we want to do the closing generically, as in a `Closer`. Given the special naming, my guess is we don't close these objects generically? Rather, the thought here was that we all can predict what `close()` does, so it is a handy way to communicate to readers the purpose of the function. At least in Eclipse, if I ask to "show all reference", I get the references to only this one class -- if the class doesn't implement `Closeable`. Again, just a minor point; the code works either way and we can always change things later, if there is a reason. ########## processing/src/main/java/org/apache/druid/frame/channel/ReadableInputStreamFrameChannel.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.channel; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.druid.frame.Frame; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.utils.CloseableUtils; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +/** + * Channel backed by an {@link InputStream}. + * + * Frame channels are expected to be nonblocking, but InputStreams cannot be read in nonblocking fashion. + * This implementation deals with that by using an {@link ExecutorService} to read from the stream in a + * separate thread. + */ +public class ReadableInputStreamFrameChannel implements ReadableFrameChannel +{ + private final InputStream inputStream; + private final ReadableByteChunksFrameChannel delegate; + private final ExecutorService executorService; + private final Object lock = new Object(); + + @GuardedBy("lock") + private final byte[] buffer = new byte[8 * 1024]; + + @GuardedBy("lock") + private long totalInputStreamBytesRead = 0; + + @GuardedBy("lock") + private boolean inputStreamFinished = false; + + @GuardedBy("lock") + private boolean inputStreamError = false; Review Comment: Sure. I just dislike the tedium of watching the debugger step over those lines when debugging. And, when optimizing the heck out of an inner loop, I tend to chuck out the nice-to-haves. But, it is obviously a trivial point. ########## processing/src/main/java/org/apache/druid/frame/file/FrameFilePartialFetch.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.file; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.java.util.common.ISE; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Response object for {@link FrameFileHttpResponseHandler}. + */ +public class FrameFilePartialFetch +{ + private final AtomicLong bytesRead = new AtomicLong(0L); Review Comment: Let's see if I'm clear. The items are mutable by the thread doing the reading, but then immutable when returned to (passed to) the consumer. Correct? If that is true, then the "Atomic" form isn't strictly necessary since only one thread at a time access them: the object ping-pongs back and forth between the two. The question is whether the producer mutates these items while the consumer reads. If so, then the consumer will have a very hard time of it. ########## processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongRBTreeSet; +import it.unimi.dsi.fastutil.longs.LongSortedSet; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.channel.WritableStreamFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +/** + * Sorts and partitions a dataset using parallel external merge sort. + * + * Input is provided as a set of {@link ReadableFrameChannel} and output is provided as {@link OutputChannels}. + * Work is performed on a provided {@link FrameProcessorExecutor}. + * + * The most central point for SuperSorter logic is the {@link #runWorkersIfPossible} method, which determines what + * needs to be done next based on the current state of the SuperSorter. The logic is: + * + * 1) Read input channels into {@link #inputBuffer} using {@link FrameChannelBatcher}, launched via + * {@link #runNextBatcher()}, up to a limit of {@link #maxChannelsPerProcessor} per batcher. + * + * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} scratch files using + * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}. + * + * 3a) Merge level 0 scratch files into level 1 scratch files using {@link FrameChannelMerger} launched from + * {@link #runNextMiddleMerger()}, processing up to {@link #maxChannelsPerProcessor} files per merger. + * Continue this process through increasing level numbers, with the size of scratch files increasing by a factor + * of {@link #maxChannelsPerProcessor} each level. + * + * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by {@link #runNextMiddleMerger()} writes + * partitioned {@link FrameFile} scratch files. The penultimate level cannot be written until + * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by this point, the SuperSorter pauses. + * The SuperSorter resumes and writes the penultimate level's files when the future resolves. + * + * 4) Write the final level using {@link FrameChannelMerger} launched from {@link #runNextUltimateMerger()}. + * Outputs for this level are written to channels provided by {@link #outputChannelFactory}, rather than scratch files. + * + * At all points, higher level processing is preferred over lower-level processing. Writing to final output files + * is preferred over intermediate, and writing to intermediate files is preferred over reading inputs. These + * preferences ensure that the amount of data buffered up in memory does not grow too large. + * + * Potential future work (things we could optimize if necessary): + * + * - Collapse merging to a single level if level zero has one merger, and we want to write one output partition. + * - Skip batching, and inject directly into level 0, if input channels are already individually fully-sorted. + * - Combine (for example: aggregate) while merging. + */ +public class SuperSorter +{ + private static final Logger log = new Logger(SuperSorter.class); + + public static final int UNKNOWN_LEVEL = -1; + public static final long UNKNOWN_TOTAL = -1; + + private final List<ReadableFrameChannel> inputChannels; + private final FrameReader frameReader; + private final ClusterBy clusterBy; + private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture; + private final FrameProcessorExecutor exec; + private final File directory; + private final OutputChannelFactory outputChannelFactory; + private final Supplier<MemoryAllocator> innerFrameAllocatorMaker; + private final int maxChannelsPerProcessor; + private final int maxActiveProcessors; + private final long rowLimit; + private final String cancellationId; + + private final Object runWorkersLock = new Object(); + + @GuardedBy("runWorkersLock") + private boolean batcherIsRunning = false; + + @GuardedBy("runWorkersLock") + private IntSet inputChannelsToRead = new IntOpenHashSet(); + + @GuardedBy("runWorkersLock") + private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap<>(); + + @GuardedBy("runWorkersLock") + private List<OutputChannel> outputChannels = null; + + @GuardedBy("runWorkersLock") + private int activeProcessors = 0; + + @GuardedBy("runWorkersLock") + private long totalInputFrames = UNKNOWN_TOTAL; + + @GuardedBy("runWorkersLock") + private int totalMergingLevels = UNKNOWN_LEVEL; + + @GuardedBy("runWorkersLock") + private final Queue<Frame> inputBuffer = new ArrayDeque<>(); + + @GuardedBy("runWorkersLock") + private long inputFramesReadSoFar = 0; + + @GuardedBy("runWorkersLock") + private long levelZeroMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private int ultimateMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap<>(); + + @GuardedBy("runWorkersLock") + private SettableFuture<OutputChannels> allDone = null; + + @GuardedBy("runWorkersLock") + SuperSorterProgressTracker superSorterProgressTracker; + + /** + * See {@link #setNoWorkRunnable}. + */ + @GuardedBy("runWorkersLock") + private Runnable noWorkRunnable = null; + + /** + * Initializes a SuperSorter. + * + * @param inputChannels input channels. All frames in these channels must be sorted according to the + * {@link ClusterBy#getColumns()}, or else sorting will not produce correct + * output. + * @param frameReader frame reader for the input channels + * @param clusterBy desired sorting order + * @param outputPartitionsFuture a future that resolves to the desired output partitions. Sorting will block + * prior to writing out final outputs until this future resolves. However, the + * sorter will be able to read all inputs even if this future is unresolved. + * If output need not be partitioned, use + * {@link ClusterByPartitions#oneUniversalPartition()}. In this case a single + * sorted channel is generated. + * @param exec executor to perform work in + * @param temporaryDirectory directory to use for scratch files. This must have enough space to store at + * least two copies of the dataset in {@link FrameFile} format. + * @param outputChannelFactory factory for partitioned, sorted output channels + * @param innerFrameAllocatorMaker supplier for allocators that are used to make merged frames for intermediate + * levels of merging, prior to the final output. Final output frame allocation is + * controlled by outputChannelFactory. One allocator is created per intermediate + * scratch file. + * @param maxActiveProcessors maximum number of merging processors to execute at once in the provided + * {@link FrameProcessorExecutor} + * @param maxChannelsPerProcessor maximum number of channels to merge at once per merging processor + * @param rowLimit limit to apply during sorting. The limit is merely advisory: the actual number + * of rows returned may be larger than the limit. The limit is applied across + * all partitions, not to each partition individually. + * @param cancellationId cancellation id to use when running processors in the provided + * {@link FrameProcessorExecutor}. + * @param superSorterProgressTracker progress tracker + */ + public SuperSorter( + final List<ReadableFrameChannel> inputChannels, + final FrameReader frameReader, + final ClusterBy clusterBy, + final ListenableFuture<ClusterByPartitions> outputPartitionsFuture, + final FrameProcessorExecutor exec, + final File temporaryDirectory, + final OutputChannelFactory outputChannelFactory, + final Supplier<MemoryAllocator> innerFrameAllocatorMaker, + final int maxActiveProcessors, + final int maxChannelsPerProcessor, + final long rowLimit, + @Nullable final String cancellationId, + final SuperSorterProgressTracker superSorterProgressTracker + ) + { + this.inputChannels = inputChannels; + this.frameReader = frameReader; + this.clusterBy = clusterBy; + this.outputPartitionsFuture = outputPartitionsFuture; + this.exec = exec; + this.directory = temporaryDirectory; + this.outputChannelFactory = outputChannelFactory; + this.innerFrameAllocatorMaker = innerFrameAllocatorMaker; + this.maxChannelsPerProcessor = maxChannelsPerProcessor; + this.maxActiveProcessors = maxActiveProcessors; + this.rowLimit = rowLimit; + this.cancellationId = cancellationId; + this.superSorterProgressTracker = superSorterProgressTracker; + + for (int i = 0; i < inputChannels.size(); i++) { + inputChannelsToRead.add(i); + } + + if (maxActiveProcessors < 1) { + throw new IAE("maxActiveProcessors[%d] < 1", maxActiveProcessors); + } + + if (maxChannelsPerProcessor < 2) { + throw new IAE("maxChannelsPerProcessor[%d] < 2", maxChannelsPerProcessor); + } + } + + /** + * Starts sorting. Can only be called once. Work is performed in the {@link FrameProcessorExecutor} that was + * passed to the constructor. + * + * Returns a future containing partitioned sorted output channels. + */ + public ListenableFuture<OutputChannels> run() + { + synchronized (runWorkersLock) { + if (allDone != null) { + throw new ISE("Cannot run() more than once."); + } + + allDone = SettableFuture.create(); + runWorkersIfPossible(); + + // When output partitions become known, that may unblock some additional layers of merging. + outputPartitionsFuture.addListener( + () -> { + synchronized (runWorkersLock) { + if (outputPartitionsFuture.isDone()) { // Update the progress tracker + superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size()); + } + runWorkersIfPossible(); + setAllDoneIfPossible(); + } + }, + exec.getExecutorService() + ); + + return FutureUtils.futureWithBaggage( + allDone, + () -> { + synchronized (runWorkersLock) { + if (activeProcessors == 0) { + cleanUp(); + } + } + } + ); + } + } + + /** + * Sets a callback that enables tests to see when this SuperSorter cannot do any work. Only used for testing. + */ + @VisibleForTesting + void setNoWorkRunnable(final Runnable runnable) + { + synchronized (runWorkersLock) { + this.noWorkRunnable = runnable; + } + } + + /** + * Called when a worker finishes. + */ + @GuardedBy("runWorkersLock") + private void workerFinished() + { + activeProcessors -= 1; + + if (log.isDebugEnabled()) { + log.debug(stateString()); + } + + runWorkersIfPossible(); + setAllDoneIfPossible(); + + if (isAllDone() && activeProcessors == 0) { + cleanUp(); + } + } + + /** + * Tries to launch a new worker, and returns whether it was doable. + * + * Later workers have priority, i.e., those responsible for merging higher levels of the merge tree. Workers that + * read the original input channels have the lowest priority. This priority order ensures that we don't build up + * too much unmerged data. + */ + @GuardedBy("runWorkersLock") + private void runWorkersIfPossible() + { + if (isAllDone()) { + // Do nothing if the instance is all done. This can happen in case of error or cancellation. + return; + } + + try { + while (activeProcessors < maxActiveProcessors && + (runNextUltimateMerger() || runNextMiddleMerger() || runNextLevelZeroMerger() || runNextBatcher())) { + activeProcessors += 1; + + if (log.isDebugEnabled()) { + log.debug(stateString()); + } + } + + if (activeProcessors == 0 && noWorkRunnable != null) { + log.debug("No active workers and no work left to start."); + + // Only called in tests. No need to bother with try/catch and such. + noWorkRunnable.run(); + } + } + catch (Throwable e) { + allDone.setException(e); + } + } + + @GuardedBy("runWorkersLock") + private void setAllDoneIfPossible() + { + if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) { + // No input data -- generate empty output channels. + final ClusterByPartitions partitions = getOutputPartitions(); + final List<OutputChannel> channels = new ArrayList<>(partitions.size()); + + for (int partitionNum = 0; partitionNum < partitions.size(); partitionNum++) { + channels.add(outputChannelFactory.openNilChannel(partitionNum)); + } + + // OK to use wrap, not wrapReadOnly, because nil channels are already read-only. + allDone.set(OutputChannels.wrap(channels)); + } else if (totalMergingLevels != UNKNOWN_LEVEL + && outputsReadyByLevel.containsKey(totalMergingLevels - 1) + && outputsReadyByLevel.get(totalMergingLevels - 1).size() == getOutputPartitions().size()) { + // We're done!! + try { + // OK to use wrap, not wrapReadOnly, because all channels in this list are already read-only. + allDone.set(OutputChannels.wrap(outputChannels)); + } + catch (Throwable e) { + allDone.setException(e); + } + } + } + + @GuardedBy("runWorkersLock") + private boolean runNextBatcher() + { + if (batcherIsRunning || inputChannelsToRead.isEmpty()) { + return false; + } else { + batcherIsRunning = true; + + runWorker( + new FrameChannelBatcher(inputChannels, maxChannelsPerProcessor), + result -> { + final List<Frame> batch = result.lhs; + final IntSet keepReading = result.rhs; + + synchronized (runWorkersLock) { + inputBuffer.addAll(batch); + inputFramesReadSoFar += batch.size(); + inputChannelsToRead = keepReading; + + if (inputChannelsToRead.isEmpty()) { + inputChannels.forEach(ReadableFrameChannel::doneReading); + setTotalInputFrames(inputFramesReadSoFar); + runWorkersIfPossible(); + } else if (inputBuffer.size() >= maxChannelsPerProcessor) { + runWorkersIfPossible(); + } + + batcherIsRunning = false; + } + } + ); + + return true; + } + } + + /** + * Level zero mergers read batches of frames from the "inputBuffer". These frames are individually sorted, but there + * is no ordering between the frames. Their output is a sorted sequence of frames. + */ + @GuardedBy("runWorkersLock") + private boolean runNextLevelZeroMerger() + { + if (inputBuffer.isEmpty() || (inputBuffer.size() < maxChannelsPerProcessor && !allInputRead())) { + return false; + } + + final List<ReadableFrameChannel> in = new ArrayList<>(); + + while (in.size() < maxChannelsPerProcessor) { + final Frame frame = inputBuffer.poll(); + + if (frame == null) { + break; + } + + in.add(singleReadableFrameChannel(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION))); + } + + runMerger(0, levelZeroMergersRunSoFar++, in, null); + return true; + } + + @GuardedBy("runWorkersLock") + private boolean runNextMiddleMerger() Review Comment: Ah! Glad to hear that the "guts" are in a separate bit we can test well. I wouldn't block a commit for restructuring; we can tackle it later as needed. I find that, for things like this, it's hard to know a better structure until we have to change something: then struggles with tests (or understanding) suggest the joints at which the code should be carved. ########## processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java: ########## @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}. + * + * If you want single threaded execution, use {@code Execs.singleThreaded()}. It is not a good idea to use this with a + * same-thread executor like {@code Execs.directExecutor()}, because it will lead to deep call stacks. + */ +public class FrameProcessorExecutor Review Comment: Thanks for the explanation. Since there is nothing available, and we need to roll our own, we can generalize when we find a need to do so, no need to do extra work now. ########## processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java: ########## @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Manages execution of {@link FrameProcessor} in an {@link ExecutorService}. Review Comment: That makes sense. So, to run the "fragment" you just described, we'd have something like this? ```text reader processor --> channel --> main logic processor --> channel --> output processor ``` In this case, there are three concurrent frame processors communicating over in-memory channels? If so, it is very Go-like! This leads to two questions. First, how does the "runner" know what to run? A comment in the sorter suggests we'd prefer to do the output, then do the main logic that can produce more output, and finally do a read only if we've used up whatever data we have. Clear enough. How does the runner coordinate? What is the protocol that each frame processor uses to say "I can run" or "I could run, if you gave me more input" or "Sorry, EOF, I'm fresh out of data? And, in the main logic, what structure, if any, do we provide if that logic is, say, a filter, projection and frame-repackaging? How do the various steps coordinate? This is the kind of nitty-gritty detail that each frame processor writer will need to know. Not sure where that info should go, but it would be handy if we could explain it. ########## processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.ReadableNilFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; + +import javax.annotation.Nullable; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Represents an output channel for some frame processor. Composed of a pair of {@link WritableFrameChannel}, which the + * processor writes to, along with a supplier of a {@link ReadableFrameChannel}, which readers can read from. + * + * At the time an instance of this class is created, the writable channel is already open, but the readable channel + * has not yet been created. It is created upon the first call to {@link #getReadableChannel()}. + */ +public class OutputChannel +{ + @Nullable + private final WritableFrameChannel writableChannel; + @Nullable + private final MemoryAllocator frameMemoryAllocator; + private final Supplier<ReadableFrameChannel> readableChannelSupplier; + private final int partitionNumber; + + private OutputChannel( + @Nullable final WritableFrameChannel writableChannel, + @Nullable final MemoryAllocator frameMemoryAllocator, + final Supplier<ReadableFrameChannel> readableChannelSupplier, + final int partitionNumber + ) + { + this.writableChannel = writableChannel; + this.frameMemoryAllocator = frameMemoryAllocator; + this.readableChannelSupplier = readableChannelSupplier; + this.partitionNumber = partitionNumber; + + if (partitionNumber < 0 && partitionNumber != FrameWithPartition.NO_PARTITION) { + throw new IAE("Invalid partition number [%d]", partitionNumber); + } + } + + /** + * Creates an output channel pair. + * + * @param writableChannel writable channel for producer + * @param frameMemoryAllocator memory allocator for producer to use while writing frames to the channel + * @param readableChannelSupplier readable channel for consumer. May be called multiple times, so you should wrap this + * in {@link Suppliers#memoize} if needed. + * @param partitionNumber partition number, if any; may be {@link FrameWithPartition#NO_PARTITION} if unknown + */ + public static OutputChannel pair( + final WritableFrameChannel writableChannel, + final MemoryAllocator frameMemoryAllocator, + final Supplier<ReadableFrameChannel> readableChannelSupplier, + final int partitionNumber + ) + { + return new OutputChannel( + Preconditions.checkNotNull(writableChannel, "writableChannel"), + Preconditions.checkNotNull(frameMemoryAllocator, "frameMemoryAllocator"), + readableChannelSupplier, + partitionNumber + ); + } + + /** + * Create a nil output channel, representing a processor that writes nothing. It is not actually writable, but + * provides a way for downstream processors to read nothing. + */ + public static OutputChannel nil(final int partitionNumber) + { + return new OutputChannel(null, null, () -> ReadableNilFrameChannel.INSTANCE, partitionNumber); + } + + /** + * Returns the writable channel of this pair. The producer writes to this channel. + */ + public WritableFrameChannel getWritableChannel() + { + if (writableChannel == null) { + throw new ISE("Writable channel is not available"); + } else { + return writableChannel; + } + } + + /** + * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel. + */ + public MemoryAllocator getFrameMemoryAllocator() + { + if (frameMemoryAllocator == null) { + throw new ISE("Writable channel is not available"); + } else { + return frameMemoryAllocator; + } + } + + /** + * Returns the readable channel of this pair. This readable channel may, or may not, be usable before the + * writable channel is closed. It depends on whether the channel pair was created in a stream-capable manner or not. + */ + public ReadableFrameChannel getReadableChannel() + { + return readableChannelSupplier.get(); + } + + public int getPartitionNumber() + { + return partitionNumber; + } + + public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn) + { + if (writableChannel == null) { + return this; + } else { + return new OutputChannel( + mapFn.apply(writableChannel), + frameMemoryAllocator, + readableChannelSupplier, + partitionNumber + ); + } + } + + /** + * Returns a read-only version of this instance. Read-only versions have neither {@link #getWritableChannel()} nor + * {@link #getFrameMemoryAllocator()}, and therefore require substantially less memory. + */ + public OutputChannel readOnly() Review Comment: I kind of like the Go approach: there is a channel, then a channel can provide a channel reader and a channel writer. In Go, only the writer can close the channel (to indicate EOF), the reader doesn't have to. That seems like a wise choice: there can be multiple readers, and the Go approach avoids the need to track them to detect when the last reader has closed its channel reader. So, one thought would be to replicate that idea here. A "channel" is the pipe. A "channel reader" is the output and a "channel writer" is the input. A channel with only readers is expressed simply in this model. This is another of those "make it easy for developers to understand" things, not functional, so it's fine to defer that kind of refactoring to later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
