gianm commented on code in PR #12848: URL: https://github.com/apache/druid/pull/12848#discussion_r936328373
########## processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java: ########## @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.processor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.longs.LongIterator; +import it.unimi.dsi.fastutil.longs.LongRBTreeSet; +import it.unimi.dsi.fastutil.longs.LongSortedSet; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocator; +import org.apache.druid.frame.channel.BlockingQueueFrameChannel; +import org.apache.druid.frame.channel.FrameWithPartition; +import org.apache.druid.frame.channel.ReadableFileFrameChannel; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.channel.WritableStreamFrameChannel; +import org.apache.druid.frame.file.FrameFile; +import org.apache.druid.frame.file.FrameFileWriter; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.utils.CloseableUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +/** + * Sorts and partitions a dataset using parallel external merge sort. + * + * Input is provided as a set of {@link ReadableFrameChannel} and output is provided as {@link OutputChannels}. + * Work is performed on a provided {@link FrameProcessorExecutor}. + * + * The most central point for SuperSorter logic is the {@link #runWorkersIfPossible} method, which determines what + * needs to be done next based on the current state of the SuperSorter. The logic is: + * + * 1) Read input channels into {@link #inputBuffer} using {@link FrameChannelBatcher}, launched via + * {@link #runNextBatcher()}, up to a limit of {@link #maxChannelsPerProcessor} per batcher. + * + * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} scratch files using + * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}. + * + * 3a) Merge level 0 scratch files into level 1 scratch files using {@link FrameChannelMerger} launched from + * {@link #runNextMiddleMerger()}, processing up to {@link #maxChannelsPerProcessor} files per merger. + * Continue this process through increasing level numbers, with the size of scratch files increasing by a factor + * of {@link #maxChannelsPerProcessor} each level. + * + * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by {@link #runNextMiddleMerger()} writes + * partitioned {@link FrameFile} scratch files. The penultimate level cannot be written until + * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by this point, the SuperSorter pauses. + * The SuperSorter resumes and writes the penultimate level's files when the future resolves. + * + * 4) Write the final level using {@link FrameChannelMerger} launched from {@link #runNextUltimateMerger()}. + * Outputs for this level are written to channels provided by {@link #outputChannelFactory}, rather than scratch files. + * + * At all points, higher level processing is preferred over lower-level processing. Writing to final output files + * is preferred over intermediate, and writing to intermediate files is preferred over reading inputs. These + * preferences ensure that the amount of data buffered up in memory does not grow too large. + * + * Potential future work (things we could optimize if necessary): + * + * - Collapse merging to a single level if level zero has one merger, and we want to write one output partition. + * - Skip batching, and inject directly into level 0, if input channels are already individually fully-sorted. + * - Combine (for example: aggregate) while merging. + */ +public class SuperSorter +{ + private static final Logger log = new Logger(SuperSorter.class); + + public static final int UNKNOWN_LEVEL = -1; + public static final long UNKNOWN_TOTAL = -1; + + private final List<ReadableFrameChannel> inputChannels; + private final FrameReader frameReader; + private final ClusterBy clusterBy; + private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture; + private final FrameProcessorExecutor exec; + private final File directory; + private final OutputChannelFactory outputChannelFactory; + private final Supplier<MemoryAllocator> innerFrameAllocatorMaker; + private final int maxChannelsPerProcessor; + private final int maxActiveProcessors; + private final long rowLimit; + private final String cancellationId; + + private final Object runWorkersLock = new Object(); + + @GuardedBy("runWorkersLock") + private boolean batcherIsRunning = false; + + @GuardedBy("runWorkersLock") + private IntSet inputChannelsToRead = new IntOpenHashSet(); + + @GuardedBy("runWorkersLock") + private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new Int2ObjectArrayMap<>(); + + @GuardedBy("runWorkersLock") + private List<OutputChannel> outputChannels = null; + + @GuardedBy("runWorkersLock") + private int activeProcessors = 0; + + @GuardedBy("runWorkersLock") + private long totalInputFrames = UNKNOWN_TOTAL; + + @GuardedBy("runWorkersLock") + private int totalMergingLevels = UNKNOWN_LEVEL; + + @GuardedBy("runWorkersLock") + private final Queue<Frame> inputBuffer = new ArrayDeque<>(); + + @GuardedBy("runWorkersLock") + private long inputFramesReadSoFar = 0; + + @GuardedBy("runWorkersLock") + private long levelZeroMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private int ultimateMergersRunSoFar = 0; + + @GuardedBy("runWorkersLock") + private final Map<File, FrameFile> penultimateFrameFileCache = new HashMap<>(); + + @GuardedBy("runWorkersLock") + private SettableFuture<OutputChannels> allDone = null; + + @GuardedBy("runWorkersLock") + SuperSorterProgressTracker superSorterProgressTracker; + + /** + * See {@link #setNoWorkRunnable}. + */ + @GuardedBy("runWorkersLock") + private Runnable noWorkRunnable = null; + + /** + * Initializes a SuperSorter. + * + * @param inputChannels input channels. All frames in these channels must be sorted according to the + * {@link ClusterBy#getColumns()}, or else sorting will not produce correct + * output. + * @param frameReader frame reader for the input channels + * @param clusterBy desired sorting order + * @param outputPartitionsFuture a future that resolves to the desired output partitions. Sorting will block + * prior to writing out final outputs until this future resolves. However, the + * sorter will be able to read all inputs even if this future is unresolved. + * If output need not be partitioned, use + * {@link ClusterByPartitions#oneUniversalPartition()}. In this case a single + * sorted channel is generated. + * @param exec executor to perform work in + * @param temporaryDirectory directory to use for scratch files. This must have enough space to store at + * least two copies of the dataset in {@link FrameFile} format. + * @param outputChannelFactory factory for partitioned, sorted output channels + * @param innerFrameAllocatorMaker supplier for allocators that are used to make merged frames for intermediate + * levels of merging, prior to the final output. Final output frame allocation is + * controlled by outputChannelFactory. One allocator is created per intermediate + * scratch file. + * @param maxActiveProcessors maximum number of merging processors to execute at once in the provided + * {@link FrameProcessorExecutor} + * @param maxChannelsPerProcessor maximum number of channels to merge at once per merging processor + * @param rowLimit limit to apply during sorting. The limit is merely advisory: the actual number + * of rows returned may be larger than the limit. The limit is applied across + * all partitions, not to each partition individually. + * @param cancellationId cancellation id to use when running processors in the provided + * {@link FrameProcessorExecutor}. + * @param superSorterProgressTracker progress tracker + */ + public SuperSorter( + final List<ReadableFrameChannel> inputChannels, + final FrameReader frameReader, + final ClusterBy clusterBy, + final ListenableFuture<ClusterByPartitions> outputPartitionsFuture, + final FrameProcessorExecutor exec, + final File temporaryDirectory, + final OutputChannelFactory outputChannelFactory, + final Supplier<MemoryAllocator> innerFrameAllocatorMaker, + final int maxActiveProcessors, + final int maxChannelsPerProcessor, + final long rowLimit, + @Nullable final String cancellationId, + final SuperSorterProgressTracker superSorterProgressTracker + ) + { + this.inputChannels = inputChannels; + this.frameReader = frameReader; + this.clusterBy = clusterBy; + this.outputPartitionsFuture = outputPartitionsFuture; + this.exec = exec; + this.directory = temporaryDirectory; + this.outputChannelFactory = outputChannelFactory; + this.innerFrameAllocatorMaker = innerFrameAllocatorMaker; + this.maxChannelsPerProcessor = maxChannelsPerProcessor; + this.maxActiveProcessors = maxActiveProcessors; + this.rowLimit = rowLimit; + this.cancellationId = cancellationId; + this.superSorterProgressTracker = superSorterProgressTracker; + + for (int i = 0; i < inputChannels.size(); i++) { + inputChannelsToRead.add(i); + } + + if (maxActiveProcessors < 1) { + throw new IAE("maxActiveProcessors[%d] < 1", maxActiveProcessors); + } + + if (maxChannelsPerProcessor < 2) { + throw new IAE("maxChannelsPerProcessor[%d] < 2", maxChannelsPerProcessor); + } + } + + /** + * Starts sorting. Can only be called once. Work is performed in the {@link FrameProcessorExecutor} that was + * passed to the constructor. + * + * Returns a future containing partitioned sorted output channels. + */ + public ListenableFuture<OutputChannels> run() + { + synchronized (runWorkersLock) { + if (allDone != null) { + throw new ISE("Cannot run() more than once."); + } + + allDone = SettableFuture.create(); + runWorkersIfPossible(); + + // When output partitions become known, that may unblock some additional layers of merging. + outputPartitionsFuture.addListener( + () -> { + synchronized (runWorkersLock) { + if (outputPartitionsFuture.isDone()) { // Update the progress tracker + superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size()); + } + runWorkersIfPossible(); + setAllDoneIfPossible(); + } + }, + exec.getExecutorService() + ); + + return FutureUtils.futureWithBaggage( + allDone, + () -> { + synchronized (runWorkersLock) { + if (activeProcessors == 0) { + cleanUp(); + } + } + } + ); + } + } + + /** + * Sets a callback that enables tests to see when this SuperSorter cannot do any work. Only used for testing. + */ + @VisibleForTesting + void setNoWorkRunnable(final Runnable runnable) + { + synchronized (runWorkersLock) { + this.noWorkRunnable = runnable; + } + } + + /** + * Called when a worker finishes. + */ + @GuardedBy("runWorkersLock") + private void workerFinished() + { + activeProcessors -= 1; + + if (log.isDebugEnabled()) { + log.debug(stateString()); + } + + runWorkersIfPossible(); + setAllDoneIfPossible(); + + if (isAllDone() && activeProcessors == 0) { + cleanUp(); + } + } + + /** + * Tries to launch a new worker, and returns whether it was doable. + * + * Later workers have priority, i.e., those responsible for merging higher levels of the merge tree. Workers that + * read the original input channels have the lowest priority. This priority order ensures that we don't build up + * too much unmerged data. + */ + @GuardedBy("runWorkersLock") + private void runWorkersIfPossible() + { + if (isAllDone()) { + // Do nothing if the instance is all done. This can happen in case of error or cancellation. + return; + } + + try { + while (activeProcessors < maxActiveProcessors && + (runNextUltimateMerger() || runNextMiddleMerger() || runNextLevelZeroMerger() || runNextBatcher())) { + activeProcessors += 1; + + if (log.isDebugEnabled()) { + log.debug(stateString()); + } + } + + if (activeProcessors == 0 && noWorkRunnable != null) { + log.debug("No active workers and no work left to start."); + + // Only called in tests. No need to bother with try/catch and such. + noWorkRunnable.run(); + } + } + catch (Throwable e) { + allDone.setException(e); + } + } + + @GuardedBy("runWorkersLock") + private void setAllDoneIfPossible() + { + if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) { + // No input data -- generate empty output channels. + final ClusterByPartitions partitions = getOutputPartitions(); + final List<OutputChannel> channels = new ArrayList<>(partitions.size()); + + for (int partitionNum = 0; partitionNum < partitions.size(); partitionNum++) { + channels.add(outputChannelFactory.openNilChannel(partitionNum)); + } + + // OK to use wrap, not wrapReadOnly, because nil channels are already read-only. + allDone.set(OutputChannels.wrap(channels)); + } else if (totalMergingLevels != UNKNOWN_LEVEL + && outputsReadyByLevel.containsKey(totalMergingLevels - 1) + && outputsReadyByLevel.get(totalMergingLevels - 1).size() == getOutputPartitions().size()) { + // We're done!! + try { + // OK to use wrap, not wrapReadOnly, because all channels in this list are already read-only. + allDone.set(OutputChannels.wrap(outputChannels)); + } + catch (Throwable e) { + allDone.setException(e); + } + } + } + + @GuardedBy("runWorkersLock") + private boolean runNextBatcher() + { + if (batcherIsRunning || inputChannelsToRead.isEmpty()) { + return false; + } else { + batcherIsRunning = true; + + runWorker( + new FrameChannelBatcher(inputChannels, maxChannelsPerProcessor), + result -> { + final List<Frame> batch = result.lhs; + final IntSet keepReading = result.rhs; + + synchronized (runWorkersLock) { + inputBuffer.addAll(batch); + inputFramesReadSoFar += batch.size(); + inputChannelsToRead = keepReading; + + if (inputChannelsToRead.isEmpty()) { + inputChannels.forEach(ReadableFrameChannel::doneReading); + setTotalInputFrames(inputFramesReadSoFar); + runWorkersIfPossible(); + } else if (inputBuffer.size() >= maxChannelsPerProcessor) { + runWorkersIfPossible(); + } + + batcherIsRunning = false; + } + } + ); + + return true; + } + } + + /** + * Level zero mergers read batches of frames from the "inputBuffer". These frames are individually sorted, but there + * is no ordering between the frames. Their output is a sorted sequence of frames. + */ + @GuardedBy("runWorkersLock") + private boolean runNextLevelZeroMerger() + { + if (inputBuffer.isEmpty() || (inputBuffer.size() < maxChannelsPerProcessor && !allInputRead())) { + return false; + } + + final List<ReadableFrameChannel> in = new ArrayList<>(); + + while (in.size() < maxChannelsPerProcessor) { + final Frame frame = inputBuffer.poll(); + + if (frame == null) { + break; + } + + in.add(singleReadableFrameChannel(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION))); + } + + runMerger(0, levelZeroMergersRunSoFar++, in, null); + return true; + } + + @GuardedBy("runWorkersLock") + private boolean runNextMiddleMerger() Review Comment: The actual merger is indeed a separate class: FrameChannelMerger (single-threaded frame processor that does an N-way merge). The logic in SuperSorter is mainly concerned with creating FrameChannelMergers and connecting them to each other such that the right thing happens. Definitely open to suggestions about how it could be made simpler, or which parts might be ripe for extraction elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
