gianm commented on code in PR #12848:
URL: https://github.com/apache/druid/pull/12848#discussion_r936328373


##########
processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java:
##########
@@ -0,0 +1,868 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.math.LongMath;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongRBTreeSet;
+import it.unimi.dsi.fastutil.longs.LongSortedSet;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.MemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFileFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.channel.WritableStreamFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.ClusterByPartitions;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+/**
+ * Sorts and partitions a dataset using parallel external merge sort.
+ *
+ * Input is provided as a set of {@link ReadableFrameChannel} and output is 
provided as {@link OutputChannels}.
+ * Work is performed on a provided {@link FrameProcessorExecutor}.
+ *
+ * The most central point for SuperSorter logic is the {@link 
#runWorkersIfPossible} method, which determines what
+ * needs to be done next based on the current state of the SuperSorter. The 
logic is:
+ *
+ * 1) Read input channels into {@link #inputBuffer} using {@link 
FrameChannelBatcher}, launched via
+ * {@link #runNextBatcher()}, up to a limit of {@link 
#maxChannelsPerProcessor} per batcher.
+ *
+ * 2) Merge and write frames from {@link #inputBuffer} into {@link FrameFile} 
scratch files using
+ * {@link FrameChannelMerger} launched via {@link #runNextLevelZeroMerger()}.
+ *
+ * 3a) Merge level 0 scratch files into level 1 scratch files using {@link 
FrameChannelMerger} launched from
+ * {@link #runNextMiddleMerger()}, processing up to {@link 
#maxChannelsPerProcessor} files per merger.
+ * Continue this process through increasing level numbers, with the size of 
scratch files increasing by a factor
+ * of {@link #maxChannelsPerProcessor} each level.
+ *
+ * 3b) For the penultimate level, the {@link FrameChannelMerger} launched by 
{@link #runNextMiddleMerger()} writes
+ * partitioned {@link FrameFile} scratch files. The penultimate level cannot 
be written until
+ * {@link #outputPartitionsFuture} resolves, so if it has not resolved yet by 
this point, the SuperSorter pauses.
+ * The SuperSorter resumes and writes the penultimate level's files when the 
future resolves.
+ *
+ * 4) Write the final level using {@link FrameChannelMerger} launched from 
{@link #runNextUltimateMerger()}.
+ * Outputs for this level are written to channels provided by {@link 
#outputChannelFactory}, rather than scratch files.
+ *
+ * At all points, higher level processing is preferred over lower-level 
processing. Writing to final output files
+ * is preferred over intermediate, and writing to intermediate files is 
preferred over reading inputs. These
+ * preferences ensure that the amount of data buffered up in memory does not 
grow too large.
+ *
+ * Potential future work (things we could optimize if necessary):
+ *
+ * - Collapse merging to a single level if level zero has one merger, and we 
want to write one output partition.
+ * - Skip batching, and inject directly into level 0, if input channels are 
already individually fully-sorted.
+ * - Combine (for example: aggregate) while merging.
+ */
+public class SuperSorter
+{
+  private static final Logger log = new Logger(SuperSorter.class);
+
+  public static final int UNKNOWN_LEVEL = -1;
+  public static final long UNKNOWN_TOTAL = -1;
+
+  private final List<ReadableFrameChannel> inputChannels;
+  private final FrameReader frameReader;
+  private final ClusterBy clusterBy;
+  private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
+  private final FrameProcessorExecutor exec;
+  private final File directory;
+  private final OutputChannelFactory outputChannelFactory;
+  private final Supplier<MemoryAllocator> innerFrameAllocatorMaker;
+  private final int maxChannelsPerProcessor;
+  private final int maxActiveProcessors;
+  private final long rowLimit;
+  private final String cancellationId;
+
+  private final Object runWorkersLock = new Object();
+
+  @GuardedBy("runWorkersLock")
+  private boolean batcherIsRunning = false;
+
+  @GuardedBy("runWorkersLock")
+  private IntSet inputChannelsToRead = new IntOpenHashSet();
+
+  @GuardedBy("runWorkersLock")
+  private final Int2ObjectMap<LongSortedSet> outputsReadyByLevel = new 
Int2ObjectArrayMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private List<OutputChannel> outputChannels = null;
+
+  @GuardedBy("runWorkersLock")
+  private int activeProcessors = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long totalInputFrames = UNKNOWN_TOTAL;
+
+  @GuardedBy("runWorkersLock")
+  private int totalMergingLevels = UNKNOWN_LEVEL;
+
+  @GuardedBy("runWorkersLock")
+  private final Queue<Frame> inputBuffer = new ArrayDeque<>();
+
+  @GuardedBy("runWorkersLock")
+  private long inputFramesReadSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private long levelZeroMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private int ultimateMergersRunSoFar = 0;
+
+  @GuardedBy("runWorkersLock")
+  private final Map<File, FrameFile> penultimateFrameFileCache = new 
HashMap<>();
+
+  @GuardedBy("runWorkersLock")
+  private SettableFuture<OutputChannels> allDone = null;
+
+  @GuardedBy("runWorkersLock")
+  SuperSorterProgressTracker superSorterProgressTracker;
+
+  /**
+   * See {@link #setNoWorkRunnable}.
+   */
+  @GuardedBy("runWorkersLock")
+  private Runnable noWorkRunnable = null;
+
+  /**
+   * Initializes a SuperSorter.
+   *
+   * @param inputChannels              input channels. All frames in these 
channels must be sorted according to the
+   *                                   {@link ClusterBy#getColumns()}, or else 
sorting will not produce correct
+   *                                   output.
+   * @param frameReader                frame reader for the input channels
+   * @param clusterBy                  desired sorting order
+   * @param outputPartitionsFuture     a future that resolves to the desired 
output partitions. Sorting will block
+   *                                   prior to writing out final outputs 
until this future resolves. However, the
+   *                                   sorter will be able to read all inputs 
even if this future is unresolved.
+   *                                   If output need not be partitioned, use
+   *                                   {@link 
ClusterByPartitions#oneUniversalPartition()}. In this case a single
+   *                                   sorted channel is generated.
+   * @param exec                       executor to perform work in
+   * @param temporaryDirectory         directory to use for scratch files. 
This must have enough space to store at
+   *                                   least two copies of the dataset in 
{@link FrameFile} format.
+   * @param outputChannelFactory       factory for partitioned, sorted output 
channels
+   * @param innerFrameAllocatorMaker   supplier for allocators that are used 
to make merged frames for intermediate
+   *                                   levels of merging, prior to the final 
output. Final output frame allocation is
+   *                                   controlled by outputChannelFactory. One 
allocator is created per intermediate
+   *                                   scratch file.
+   * @param maxActiveProcessors        maximum number of merging processors to 
execute at once in the provided
+   *                                   {@link FrameProcessorExecutor}
+   * @param maxChannelsPerProcessor    maximum number of channels to merge at 
once per merging processor
+   * @param rowLimit                   limit to apply during sorting. The 
limit is merely advisory: the actual number
+   *                                   of rows returned may be larger than the 
limit. The limit is applied across
+   *                                   all partitions, not to each partition 
individually.
+   * @param cancellationId             cancellation id to use when running 
processors in the provided
+   *                                   {@link FrameProcessorExecutor}.
+   * @param superSorterProgressTracker progress tracker
+   */
+  public SuperSorter(
+      final List<ReadableFrameChannel> inputChannels,
+      final FrameReader frameReader,
+      final ClusterBy clusterBy,
+      final ListenableFuture<ClusterByPartitions> outputPartitionsFuture,
+      final FrameProcessorExecutor exec,
+      final File temporaryDirectory,
+      final OutputChannelFactory outputChannelFactory,
+      final Supplier<MemoryAllocator> innerFrameAllocatorMaker,
+      final int maxActiveProcessors,
+      final int maxChannelsPerProcessor,
+      final long rowLimit,
+      @Nullable final String cancellationId,
+      final SuperSorterProgressTracker superSorterProgressTracker
+  )
+  {
+    this.inputChannels = inputChannels;
+    this.frameReader = frameReader;
+    this.clusterBy = clusterBy;
+    this.outputPartitionsFuture = outputPartitionsFuture;
+    this.exec = exec;
+    this.directory = temporaryDirectory;
+    this.outputChannelFactory = outputChannelFactory;
+    this.innerFrameAllocatorMaker = innerFrameAllocatorMaker;
+    this.maxChannelsPerProcessor = maxChannelsPerProcessor;
+    this.maxActiveProcessors = maxActiveProcessors;
+    this.rowLimit = rowLimit;
+    this.cancellationId = cancellationId;
+    this.superSorterProgressTracker = superSorterProgressTracker;
+
+    for (int i = 0; i < inputChannels.size(); i++) {
+      inputChannelsToRead.add(i);
+    }
+
+    if (maxActiveProcessors < 1) {
+      throw new IAE("maxActiveProcessors[%d] < 1", maxActiveProcessors);
+    }
+
+    if (maxChannelsPerProcessor < 2) {
+      throw new IAE("maxChannelsPerProcessor[%d] < 2", 
maxChannelsPerProcessor);
+    }
+  }
+
+  /**
+   * Starts sorting. Can only be called once. Work is performed in the {@link 
FrameProcessorExecutor} that was
+   * passed to the constructor.
+   *
+   * Returns a future containing partitioned sorted output channels.
+   */
+  public ListenableFuture<OutputChannels> run()
+  {
+    synchronized (runWorkersLock) {
+      if (allDone != null) {
+        throw new ISE("Cannot run() more than once.");
+      }
+
+      allDone = SettableFuture.create();
+      runWorkersIfPossible();
+
+      // When output partitions become known, that may unblock some additional 
layers of merging.
+      outputPartitionsFuture.addListener(
+          () -> {
+            synchronized (runWorkersLock) {
+              if (outputPartitionsFuture.isDone()) { // Update the progress 
tracker
+                
superSorterProgressTracker.setTotalMergersForUltimateLevel(getOutputPartitions().size());
+              }
+              runWorkersIfPossible();
+              setAllDoneIfPossible();
+            }
+          },
+          exec.getExecutorService()
+      );
+
+      return FutureUtils.futureWithBaggage(
+          allDone,
+          () -> {
+            synchronized (runWorkersLock) {
+              if (activeProcessors == 0) {
+                cleanUp();
+              }
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Sets a callback that enables tests to see when this SuperSorter cannot do 
any work. Only used for testing.
+   */
+  @VisibleForTesting
+  void setNoWorkRunnable(final Runnable runnable)
+  {
+    synchronized (runWorkersLock) {
+      this.noWorkRunnable = runnable;
+    }
+  }
+
+  /**
+   * Called when a worker finishes.
+   */
+  @GuardedBy("runWorkersLock")
+  private void workerFinished()
+  {
+    activeProcessors -= 1;
+
+    if (log.isDebugEnabled()) {
+      log.debug(stateString());
+    }
+
+    runWorkersIfPossible();
+    setAllDoneIfPossible();
+
+    if (isAllDone() && activeProcessors == 0) {
+      cleanUp();
+    }
+  }
+
+  /**
+   * Tries to launch a new worker, and returns whether it was doable.
+   *
+   * Later workers have priority, i.e., those responsible for merging higher 
levels of the merge tree. Workers that
+   * read the original input channels have the lowest priority. This priority 
order ensures that we don't build up
+   * too much unmerged data.
+   */
+  @GuardedBy("runWorkersLock")
+  private void runWorkersIfPossible()
+  {
+    if (isAllDone()) {
+      // Do nothing if the instance is all done. This can happen in case of 
error or cancellation.
+      return;
+    }
+
+    try {
+      while (activeProcessors < maxActiveProcessors &&
+             (runNextUltimateMerger() || runNextMiddleMerger() || 
runNextLevelZeroMerger() || runNextBatcher())) {
+        activeProcessors += 1;
+
+        if (log.isDebugEnabled()) {
+          log.debug(stateString());
+        }
+      }
+
+      if (activeProcessors == 0 && noWorkRunnable != null) {
+        log.debug("No active workers and no work left to start.");
+
+        // Only called in tests. No need to bother with try/catch and such.
+        noWorkRunnable.run();
+      }
+    }
+    catch (Throwable e) {
+      allDone.setException(e);
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private void setAllDoneIfPossible()
+  {
+    if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
+      // No input data -- generate empty output channels.
+      final ClusterByPartitions partitions = getOutputPartitions();
+      final List<OutputChannel> channels = new ArrayList<>(partitions.size());
+
+      for (int partitionNum = 0; partitionNum < partitions.size(); 
partitionNum++) {
+        channels.add(outputChannelFactory.openNilChannel(partitionNum));
+      }
+
+      // OK to use wrap, not wrapReadOnly, because nil channels are already 
read-only.
+      allDone.set(OutputChannels.wrap(channels));
+    } else if (totalMergingLevels != UNKNOWN_LEVEL
+               && outputsReadyByLevel.containsKey(totalMergingLevels - 1)
+               && outputsReadyByLevel.get(totalMergingLevels - 1).size() == 
getOutputPartitions().size()) {
+      // We're done!!
+      try {
+        // OK to use wrap, not wrapReadOnly, because all channels in this list 
are already read-only.
+        allDone.set(OutputChannels.wrap(outputChannels));
+      }
+      catch (Throwable e) {
+        allDone.setException(e);
+      }
+    }
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextBatcher()
+  {
+    if (batcherIsRunning || inputChannelsToRead.isEmpty()) {
+      return false;
+    } else {
+      batcherIsRunning = true;
+
+      runWorker(
+          new FrameChannelBatcher(inputChannels, maxChannelsPerProcessor),
+          result -> {
+            final List<Frame> batch = result.lhs;
+            final IntSet keepReading = result.rhs;
+
+            synchronized (runWorkersLock) {
+              inputBuffer.addAll(batch);
+              inputFramesReadSoFar += batch.size();
+              inputChannelsToRead = keepReading;
+
+              if (inputChannelsToRead.isEmpty()) {
+                inputChannels.forEach(ReadableFrameChannel::doneReading);
+                setTotalInputFrames(inputFramesReadSoFar);
+                runWorkersIfPossible();
+              } else if (inputBuffer.size() >= maxChannelsPerProcessor) {
+                runWorkersIfPossible();
+              }
+
+              batcherIsRunning = false;
+            }
+          }
+      );
+
+      return true;
+    }
+  }
+
+  /**
+   * Level zero mergers read batches of frames from the "inputBuffer". These 
frames are individually sorted, but there
+   * is no ordering between the frames. Their output is a sorted sequence of 
frames.
+   */
+  @GuardedBy("runWorkersLock")
+  private boolean runNextLevelZeroMerger()
+  {
+    if (inputBuffer.isEmpty() || (inputBuffer.size() < maxChannelsPerProcessor 
&& !allInputRead())) {
+      return false;
+    }
+
+    final List<ReadableFrameChannel> in = new ArrayList<>();
+
+    while (in.size() < maxChannelsPerProcessor) {
+      final Frame frame = inputBuffer.poll();
+
+      if (frame == null) {
+        break;
+      }
+
+      in.add(singleReadableFrameChannel(new FrameWithPartition(frame, 
FrameWithPartition.NO_PARTITION)));
+    }
+
+    runMerger(0, levelZeroMergersRunSoFar++, in, null);
+    return true;
+  }
+
+  @GuardedBy("runWorkersLock")
+  private boolean runNextMiddleMerger()

Review Comment:
   The actual merger is indeed a separate class: FrameChannelMerger 
(single-threaded frame processor that does an N-way merge). The logic in 
SuperSorter is mainly concerned with creating FrameChannelMergers and 
connecting them to each other such that the right thing happens. Definitely 
open to suggestions about how it could be made simpler, or which parts might be 
ripe for extraction elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to