This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fac267f17d MSQ: Improved worker cancellation. (#17046)
6fac267f17d is described below

commit 6fac267f17d745cf6aac9419ae06b306a8898f09
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Sep 15 01:22:28 2024 -0700

    MSQ: Improved worker cancellation. (#17046)
    
    * MSQ: Improved worker cancellation.
    
    Four changes:
    
    1) FrameProcessorExecutor now requires that cancellationIds be registered
       with "registerCancellationId" prior to being used in "runFully" or 
"runAllFully".
    
    2) FrameProcessorExecutor gains an "asExecutor" method, which allows that
       executor to be used as an executor for future callbacks in such a way
       that respects cancellationId.
    
    3) RunWorkOrder gains a "stop" method, which cancels the current
       cancellationId and closes the current FrameContext. It blocks until
       both operations are complete.
    
    4) Fixes a bug in RunAllFullyWidget where "processorManager.result()" was
       called outside "runAllFullyLock", which could cause it to be called
       out-of-order with "cleanup()" in case of cancellation or other error.
    
    Together, these changes help ensure cancellation does not have races.
    Once "cancel" is called for a given cancellationId, all existing processors
    and running callbacks are canceled and exit in an orderly manner. Future
    processors and callbacks with the same cancellationId are rejected
    before being executed.
    
    * Fix test.
    
    * Use execute, which doesn't return, to avoid errorprone complaints.
    
    * Fix some style stuff.
    
    * Further enhancements.
    
    * Fix style.
---
 .../frame/FrameChannelMergerBenchmark.java         |   8 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  54 +++++---
 .../org/apache/druid/msq/exec/RunWorkOrder.java    | 137 ++++++++++++++++++---
 .../druid/msq/exec/RunWorkOrderListener.java       |   2 +-
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |  45 +++----
 .../druid/msq/querykit/FrameProcessorTestBase.java |   9 +-
 .../frame/processor/FrameProcessorExecutor.java    |  83 ++++++++++---
 .../druid/frame/processor/RunAllFullyWidget.java   |   6 +-
 .../frame/processor/RunnableFrameProcessor.java    |  65 ++++++++++
 .../apache/druid/frame/processor/SuperSorter.java  |   4 +-
 .../processor/FrameProcessorExecutorTest.java      |  23 ++++
 .../frame/processor/RunAllFullyWidgetTest.java     |   4 +-
 12 files changed, 353 insertions(+), 87 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java
index a57b7a116c4..25f9015de2b 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/frame/FrameChannelMergerBenchmark.java
@@ -21,6 +21,7 @@ package org.apache.druid.benchmark.frame;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.guava.FutureUtils;
@@ -203,6 +204,7 @@ public class FrameChannelMergerBenchmark
   private final List<KeyColumn> sortKey = ImmutableList.of(new KeyColumn(KEY, 
KeyOrder.ASCENDING));
 
   private List<List<Frame>> channelFrames;
+  private ListeningExecutorService innerExec;
   private FrameProcessorExecutor exec;
   private List<BlockingQueueFrameChannel> channels;
 
@@ -226,7 +228,7 @@ public class FrameChannelMergerBenchmark
     frameReader = FrameReader.create(signature);
 
     exec = new FrameProcessorExecutor(
-        MoreExecutors.listeningDecorator(
+        innerExec = MoreExecutors.listeningDecorator(
             
Execs.singleThreaded(StringUtils.encodeForFormat(getClass().getSimpleName()))
         )
     );
@@ -335,8 +337,8 @@ public class FrameChannelMergerBenchmark
   @TearDown(Level.Trial)
   public void tearDown() throws Exception
   {
-    exec.getExecutorService().shutdownNow();
-    if (!exec.getExecutorService().awaitTermination(1, TimeUnit.MINUTES)) {
+    innerExec.shutdownNow();
+    if (!innerExec.awaitTermination(1, TimeUnit.MINUTES)) {
       throw new ISE("Could not terminate executor after 1 minute");
     }
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 4b63d85cda7..6d1ef21abbf 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -224,6 +224,7 @@ import java.util.stream.StreamSupport;
 public class ControllerImpl implements Controller
 {
   private static final Logger log = new Logger(ControllerImpl.class);
+  private static final String RESULT_READER_CANCELLATION_ID = "result-reader";
 
   private final String queryId;
   private final MSQSpec querySpec;
@@ -2189,6 +2190,34 @@ public class ControllerImpl implements Controller
     }
   }
 
+  /**
+   * Create a result-reader executor for {@link 
RunQueryUntilDone#readQueryResults()}.
+   */
+  private static FrameProcessorExecutor createResultReaderExec(final String 
queryId)
+  {
+    return new FrameProcessorExecutor(
+        MoreExecutors.listeningDecorator(
+            
Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + queryId 
+ "]")))
+    );
+  }
+
+  /**
+   * Cancel any currently-running work and shut down a result-reader executor, 
like one created by
+   * {@link #createResultReaderExec(String)}.
+   */
+  private static void closeResultReaderExec(final FrameProcessorExecutor exec)
+  {
+    try {
+      exec.cancel(RESULT_READER_CANCELLATION_ID);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    finally {
+      exec.shutdownNow();
+    }
+  }
+
   private void stopExternalFetchers()
   {
     if (workerSketchFetcher != null) {
@@ -2698,12 +2727,9 @@ public class ControllerImpl implements Controller
         inputChannelFactory = new WorkerInputChannelFactory(netClient, () -> 
taskIds);
       }
 
-      final FrameProcessorExecutor resultReaderExec = new 
FrameProcessorExecutor(
-          MoreExecutors.listeningDecorator(
-              
Execs.singleThreaded(StringUtils.encodeForFormat("msq-result-reader[" + 
queryId() + "]")))
-      );
+      final FrameProcessorExecutor resultReaderExec = 
createResultReaderExec(queryId());
+      resultReaderExec.registerCancellationId(RESULT_READER_CANCELLATION_ID);
 
-      final String cancellationId = "results-reader";
       ReadableConcatFrameChannel resultsChannel = null;
 
       try {
@@ -2713,7 +2739,7 @@ public class ControllerImpl implements Controller
             inputChannelFactory,
             () -> ArenaMemoryAllocator.createOnHeap(5_000_000),
             resultReaderExec,
-            cancellationId,
+            RESULT_READER_CANCELLATION_ID,
             null,
             
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context())
         );
@@ -2747,7 +2773,7 @@ public class ControllerImpl implements Controller
             queryListener
         );
 
-        queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, 
cancellationId);
+        queryResultsReaderFuture = resultReaderExec.runFully(resultsReader, 
RESULT_READER_CANCELLATION_ID);
 
         // When results are done being read, kick the main thread.
         // Important: don't use FutureUtils.futureWithBaggage, because we need 
queryResultsReaderFuture to resolve
@@ -2764,23 +2790,13 @@ public class ControllerImpl implements Controller
             e,
             () -> CloseableUtils.closeAll(
                 finalResultsChannel,
-                () -> resultReaderExec.getExecutorService().shutdownNow()
+                () -> closeResultReaderExec(resultReaderExec)
             )
         );
       }
 
       // Result reader is set up. Register with the query-wide closer.
-      closer.register(() -> {
-        try {
-          resultReaderExec.cancel(cancellationId);
-        }
-        catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-        finally {
-          resultReaderExec.getExecutorService().shutdownNow();
-        }
-      });
+      closer.register(() -> closeResultReaderExec(resultReaderExec));
     }
 
     /**
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 4d028147af0..3d31d7e2c3e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.exec;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -56,6 +57,7 @@ import 
org.apache.druid.frame.processor.manager.ProcessorManager;
 import org.apache.druid.frame.processor.manager.ProcessorManagers;
 import org.apache.druid.frame.util.DurableStorageUtils;
 import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -67,6 +69,8 @@ import org.apache.druid.msq.counters.CpuCounters;
 import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
 import org.apache.druid.msq.indexing.InputChannelFactory;
 import org.apache.druid.msq.indexing.InputChannelsImpl;
+import org.apache.druid.msq.indexing.error.CanceledFault;
+import org.apache.druid.msq.indexing.error.MSQException;
 import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSliceReader;
@@ -94,7 +98,6 @@ import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
-import org.apache.druid.utils.CloseableUtils;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nullable;
@@ -104,7 +107,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -112,7 +116,29 @@ import java.util.stream.Collectors;
  */
 public class RunWorkOrder
 {
-  private final String controllerTaskId;
+  enum State
+  {
+    /**
+     * Initial state. Must be in this state to call {@link #startAsync()}.
+     */
+    INIT,
+
+    /**
+     * State entered upon calling {@link #startAsync()}.
+     */
+    STARTED,
+
+    /**
+     * State entered upon calling {@link #stop()}.
+     */
+    STOPPING,
+
+    /**
+     * State entered when a call to {@link #stop()} concludes.
+     */
+    STOPPED
+  }
+
   private final WorkOrder workOrder;
   private final InputChannelFactory inputChannelFactory;
   private final CounterTracker counterTracker;
@@ -125,7 +151,9 @@ public class RunWorkOrder
   private final boolean reindex;
   private final boolean removeNullBytes;
   private final ByteTracker intermediateSuperSorterLocalStorageTracker;
-  private final AtomicBoolean started = new AtomicBoolean();
+  private final AtomicReference<State> state = new 
AtomicReference<>(State.INIT);
+  private final CountDownLatch stopLatch = new CountDownLatch(1);
+  private final AtomicReference<Either<Throwable, Object>> resultForListener = 
new AtomicReference<>();
 
   @MonotonicNonNull
   private InputSliceReader inputSliceReader;
@@ -141,7 +169,6 @@ public class RunWorkOrder
   private ListenableFuture<OutputChannels> stageOutputChannelsFuture;
 
   public RunWorkOrder(
-      final String controllerTaskId,
       final WorkOrder workOrder,
       final InputChannelFactory inputChannelFactory,
       final CounterTracker counterTracker,
@@ -154,7 +181,6 @@ public class RunWorkOrder
       final boolean removeNullBytes
   )
   {
-    this.controllerTaskId = controllerTaskId;
     this.workOrder = workOrder;
     this.inputChannelFactory = inputChannelFactory;
     this.counterTracker = counterTracker;
@@ -180,15 +206,16 @@ public class RunWorkOrder
    * Execution proceeds asynchronously after this method returns. The {@link 
RunWorkOrderListener} passed to the
    * constructor of this instance can be used to track progress.
    */
-  public void start() throws IOException
+  public void startAsync()
   {
-    if (started.getAndSet(true)) {
-      throw new ISE("Already started");
+    if (!state.compareAndSet(State.INIT, State.STARTED)) {
+      throw new ISE("Cannot start from state[%s]", state);
     }
 
     final StageDefinition stageDef = workOrder.getStageDefinition();
 
     try {
+      exec.registerCancellationId(cancellationId);
       makeInputSliceReader();
       makeWorkOutputChannelFactory();
       makeShuffleOutputChannelFactory();
@@ -205,16 +232,78 @@ public class RunWorkOrder
       setUpCompletionCallbacks();
     }
     catch (Throwable t) {
-      // If start() has problems, cancel anything that was already kicked off, 
and close the FrameContext.
+      stopUnchecked();
+    }
+  }
+
+  /**
+   * Stops an execution that was previously initiated through {@link 
#startAsync()} and closes the {@link FrameContext}.
+   * May be called to cancel execution. Must also be called after successful 
execution in order to ensure that resources
+   * are all properly cleaned up.
+   *
+   * Blocks until execution is fully stopped.
+   */
+  public void stop() throws InterruptedException
+  {
+    if (state.compareAndSet(State.INIT, State.STOPPING)
+        || state.compareAndSet(State.STARTED, State.STOPPING)) {
+      // Initiate stopping.
+      Throwable e = null;
+
       try {
         exec.cancel(cancellationId);
       }
-      catch (Throwable t2) {
-        t.addSuppressed(t2);
+      catch (Throwable e2) {
+        e = e2;
       }
 
-      CloseableUtils.closeAndSuppressExceptions(frameContext, 
t::addSuppressed);
-      throw t;
+      try {
+        frameContext.close();
+      }
+      catch (Throwable e2) {
+        if (e == null) {
+          e = e2;
+        } else {
+          e.addSuppressed(e2);
+        }
+      }
+
+      try {
+        // notifyListener will ignore this cancellation error if work has 
already succeeded.
+        notifyListener(Either.error(new 
MSQException(CanceledFault.instance())));
+      }
+      catch (Throwable e2) {
+        if (e == null) {
+          e = e2;
+        } else {
+          e.addSuppressed(e2);
+        }
+      }
+
+      stopLatch.countDown();
+
+      if (e != null) {
+        Throwables.throwIfInstanceOf(e, InterruptedException.class);
+        Throwables.throwIfUnchecked(e);
+        throw new RuntimeException(e);
+      }
+    }
+
+    stopLatch.await();
+  }
+
+  /**
+   * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link 
InterruptedException}, this method sets
+   * the interrupt flag and throws an unchecked exception.
+   */
+  public void stopUnchecked()
+  {
+    try {
+      stop();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
     }
   }
 
@@ -459,19 +548,33 @@ public class RunWorkOrder
               writeDurableStorageSuccessFile();
             }
 
-            listener.onSuccess(resultObject);
+            notifyListener(Either.value(resultObject));
           }
 
           @Override
           public void onFailure(final Throwable t)
           {
-            listener.onFailure(t);
+            notifyListener(Either.error(t));
           }
         },
         Execs.directExecutor()
     );
   }
 
+  /**
+   * Notify {@link RunWorkOrderListener} that the job is done, if not already 
notified.
+   */
+  private void notifyListener(final Either<Throwable, Object> result)
+  {
+    if (resultForListener.compareAndSet(null, result)) {
+      if (result.isError()) {
+        listener.onFailure(result.error());
+      } else {
+        listener.onSuccess(result.valueOrThrow());
+      }
+    }
+  }
+
   /**
    * Write {@link DurableStorageUtils#SUCCESS_MARKER_FILENAME} for a 
particular stage, if durable storage is enabled.
    */
@@ -561,7 +664,7 @@ public class RunWorkOrder
   )
   {
     return DurableStorageOutputChannelFactory.createStandardImplementation(
-        controllerTaskId,
+        workerContext.queryId(),
         workOrder.getWorkerNumber(),
         workOrder.getStageNumber(),
         workerContext.workerId(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java
index 19c3c6570fe..8bffd6f8179 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrderListener.java
@@ -25,7 +25,7 @@ import 
org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 import javax.annotation.Nullable;
 
 /**
- * Listener for various things that may happen during execution of {@link 
RunWorkOrder#start()}. Listener methods are
+ * Listener for various things that may happen during execution of {@link 
RunWorkOrder#startAsync()}. Listener methods are
  * fired in processing threads, so they must be thread-safe, and it is 
important that they run quickly.
  */
 public interface RunWorkOrderListener
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 5d9f9b9db54..74e3850c6e9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -367,28 +367,19 @@ public class WorkerImpl implements Worker
     final WorkerStageKernel kernel = kernelHolder.kernel;
     final WorkOrder workOrder = kernel.getWorkOrder();
     final StageDefinition stageDefinition = workOrder.getStageDefinition();
-    final String cancellationId = cancellationIdFor(stageDefinition.getId());
+    final String cancellationId = cancellationIdFor(stageDefinition.getId(), 
workOrder.getWorkerNumber());
 
     log.info(
-        "Processing work order for stage[%s]%s",
+        "Starting work order for stage[%s], workerNumber[%d]%s",
         stageDefinition.getId(),
+        workOrder.getWorkerNumber(),
         (log.isDebugEnabled()
          ? StringUtils.format(", payload[%s]", 
context.jsonMapper().writeValueAsString(workOrder)) : "")
     );
 
-    final FrameContext frameContext = 
kernelHolder.processorCloser.register(context.frameContext(workOrder));
-    kernelHolder.processorCloser.register(() -> {
-      try {
-        workerExec.cancel(cancellationId);
-      }
-      catch (InterruptedException e) {
-        // Strange that cancellation would itself be interrupted. Log and 
suppress.
-        log.warn(e, "Cancellation interrupted for stage[%s]", 
stageDefinition.getId());
-        Thread.currentThread().interrupt();
-      }
-    });
+    final FrameContext frameContext = context.frameContext(workOrder);
 
-    // Set up cleanup functions for this work order.
+    // Set up resultsCloser (called when we are done reading results).
     kernelHolder.resultsCloser.register(() -> 
FileUtils.deleteDirectory(frameContext.tempDir()));
     kernelHolder.resultsCloser.register(() -> 
removeStageOutputChannels(stageDefinition.getId()));
 
@@ -397,13 +388,9 @@ public class WorkerImpl implements Worker
     final InputChannelFactory inputChannelFactory =
         makeBaseInputChannelFactory(workOrder, controllerClient, 
kernelHolder.processorCloser);
 
-    // Start working on this stage immediately.
-    kernel.startReading();
-
     final QueryContext queryContext = task != null ? 
QueryContext.of(task.getContext()) : QueryContext.empty();
     final boolean includeAllCounters = context.includeAllCounters();
     final RunWorkOrder runWorkOrder = new RunWorkOrder(
-        task.getControllerTaskId(),
         workOrder,
         inputChannelFactory,
         stageCounters.computeIfAbsent(
@@ -419,7 +406,12 @@ public class WorkerImpl implements Worker
         MultiStageQueryContext.removeNullBytes(queryContext)
     );
 
-    runWorkOrder.start();
+    // Set up processorCloser (called when processing is done).
+    kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
+
+    // Start working on this stage immediately.
+    kernel.startReading();
+    runWorkOrder.startAsync();
     kernelHolder.partitionBoundariesFuture = 
runWorkOrder.getStagePartitionBoundariesFuture();
   }
 
@@ -987,9 +979,9 @@ public class WorkerImpl implements Worker
   /**
    * Returns cancellation ID for a particular stage, to be used in {@link 
FrameProcessorExecutor#cancel(String)}.
    */
-  private static String cancellationIdFor(final StageId stageId)
+  private static String cancellationIdFor(final StageId stageId, final int 
workerNumber)
   {
-    return stageId.toString();
+    return StringUtils.format("%s_%s", stageId, workerNumber);
   }
 
   /**
@@ -1244,9 +1236,18 @@ public class WorkerImpl implements Worker
   private static class KernelHolder
   {
     private final WorkerStageKernel kernel;
+    private SettableFuture<ClusterByPartitions> partitionBoundariesFuture;
+
+    /**
+     * Closer for processing. This is closed when all processing for a stage 
has completed.
+     */
     private final Closer processorCloser;
+
+    /**
+     * Closer for results. This is closed when results for a stage are no 
longer needed. Always closed
+     * *after* {@link #processorCloser} is done closing.
+     */
     private final Closer resultsCloser;
-    private SettableFuture<ClusterByPartitions> partitionBoundariesFuture;
 
     public KernelHolder(WorkerStageKernel kernel)
     {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
index 439aa148a84..cde2b0ea4e9 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/FrameProcessorTestBase.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.querykit;
 
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
@@ -47,19 +48,21 @@ public class FrameProcessorTestBase extends 
InitializedNullHandlingTest
 {
   protected static final StagePartition STAGE_PARTITION = new 
StagePartition(new StageId("q", 0), 0);
 
+  private ListeningExecutorService innerExec;
   protected FrameProcessorExecutor exec;
 
   @Before
   public void setUp()
   {
-    exec = new 
FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
+    innerExec = 
MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec"));
+    exec = new FrameProcessorExecutor(innerExec);
   }
 
   @After
   public void tearDown() throws Exception
   {
-    exec.getExecutorService().shutdownNow();
-    exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
+    innerExec.shutdownNow();
+    innerExec.awaitTermination(10, TimeUnit.MINUTES);
   }
 
   protected ReadableInput makeChannelFromCursorFactory(
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index c0f79d30e58..f255fbe13a6 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -46,12 +46,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -61,7 +63,6 @@ import java.util.stream.Collectors;
  * If you want single threaded execution, use {@code Execs.singleThreaded()}. 
It is not a good idea to use this with a
  * same-thread executor like {@code Execs.directExecutor()}, because it will 
lead to deep call stacks.
  */
-@SuppressWarnings("CheckReturnValue")
 public class FrameProcessorExecutor
 {
   private static final Logger log = new Logger(FrameProcessorExecutor.class);
@@ -70,6 +71,10 @@ public class FrameProcessorExecutor
 
   private final Object lock = new Object();
 
+  // Currently-active cancellationIds.
+  @GuardedBy("lock")
+  private final Set<String> activeCancellationIds = new HashSet<>();
+
   // Futures that are active and therefore cancelable.
   // Does not include return futures: those are in cancelableReturnFutures.
   @GuardedBy("lock")
@@ -96,19 +101,12 @@ public class FrameProcessorExecutor
     this.exec = exec;
   }
 
-  /**
-   * Returns the underlying executor service used by this executor.
-   */
-  public ListeningExecutorService getExecutorService()
-  {
-    return exec;
-  }
-
   /**
    * Runs a processor until it is done, and returns a future that resolves 
when execution is complete.
    *
-   * If "cancellationId" is provided, it can be used with the {@link 
#cancel(String)} method to cancel all processors
-   * currently running with the same cancellationId.
+   * If "cancellationId" is provided, it must have previously been registered 
with {@link #registerCancellationId}.
+   * Then, it can be used with the {@link #cancel(String)} method to cancel 
all processors with that
+   * same cancellationId.
    */
   public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, 
@Nullable final String cancellationId)
   {
@@ -116,6 +114,11 @@ public class FrameProcessorExecutor
     final List<WritableFrameChannel> outputChannels = 
processor.outputChannels();
     final SettableFuture<T> finished = 
registerCancelableFuture(SettableFuture.create(), true, cancellationId);
 
+    if (finished.isDone()) {
+      // Possibly due to starting life out being canceled.
+      return finished;
+    }
+
     class ExecutorRunnable implements Runnable
     {
       private final AwaitAnyWidget awaitAnyWidget = new 
AwaitAnyWidget(inputChannels);
@@ -152,7 +155,7 @@ public class FrameProcessorExecutor
             final IntSet await = result.awaitSet();
 
             if (await.isEmpty()) {
-              exec.submit(ExecutorRunnable.this);
+              exec.execute(ExecutorRunnable.this);
             } else if (result.isAwaitAll() || await.size() == 1) {
               final List<ListenableFuture<?>> readabilityFutures = new 
ArrayList<>();
 
@@ -164,7 +167,7 @@ public class FrameProcessorExecutor
               }
 
               if (readabilityFutures.isEmpty()) {
-                exec.submit(ExecutorRunnable.this);
+                exec.execute(ExecutorRunnable.this);
               } else {
                 
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
               }
@@ -272,7 +275,7 @@ public class FrameProcessorExecutor
               public void onSuccess(final V ignored)
               {
                 try {
-                  exec.submit(ExecutorRunnable.this);
+                  exec.execute(ExecutorRunnable.this);
                 }
                 catch (Throwable e) {
                   fail(e);
@@ -390,7 +393,7 @@ public class FrameProcessorExecutor
 
     logProcessorStatusString(processor, finished, null);
     registerCancelableProcessor(processor, cancellationId);
-    exec.submit(runnable);
+    exec.execute(runnable);
     return finished;
   }
 
@@ -423,8 +426,20 @@ public class FrameProcessorExecutor
   }
 
   /**
-   * Cancels all processors associated with a given cancellationId. Waits for 
the processors to exit before
-   * returning.
+   * Registers a cancellationId, so it can be provided to {@link #runFully} or 
{@link #runAllFully}. To avoid the
+   * set of active cancellationIds growing without bound, callers must also 
call {@link #cancel(String)} on the
+   * same cancellationId when done using it.
+   */
+  public void registerCancellationId(final String cancellationId)
+  {
+    synchronized (lock) {
+      activeCancellationIds.add(cancellationId);
+    }
+  }
+
+  /**
+   * Deregisters a cancellationId and cancels any currently-running processors 
associated with that cancellationId.
+   * Waits for any canceled processors to exit before returning.
    */
   public void cancel(final String cancellationId) throws InterruptedException
   {
@@ -435,6 +450,7 @@ public class FrameProcessorExecutor
     final Set<ListenableFuture<?>> returnFuturesToCancel;
 
     synchronized (lock) {
+      activeCancellationIds.remove(cancellationId);
       futuresToCancel = cancelableFutures.removeAll(cancellationId);
       processorsToCancel = cancelableProcessors.removeAll(cancellationId);
       returnFuturesToCancel = 
cancelableReturnFutures.removeAll(cancellationId);
@@ -457,6 +473,33 @@ public class FrameProcessorExecutor
     }
   }
 
+  /**
+   * Returns an {@link Executor} that executes using the same underlying 
service, and that is also connected to
+   * cancellation through {@link #cancel(String)}.
+   *
+   * @param cancellationId cancellation ID for the executor
+   */
+  public Executor asExecutor(@Nullable final String cancellationId)
+  {
+    return command -> runFully(new RunnableFrameProcessor(command), 
cancellationId);
+  }
+
+  /**
+   * Shuts down the underlying executor service immediately.
+   */
+  public void shutdownNow()
+  {
+    exec.shutdownNow();
+  }
+
+  /**
+   * Returns the underlying executor service used by this executor.
+   */
+  ListeningExecutorService getExecutorService()
+  {
+    return exec;
+  }
+
   /**
    * Register a future that will be canceled when the provided {@code 
cancellationId} is canceled.
    *
@@ -472,6 +515,12 @@ public class FrameProcessorExecutor
   {
     if (cancellationId != null) {
       synchronized (lock) {
+        if (!activeCancellationIds.contains(cancellationId)) {
+          // Cancel and return immediately.
+          future.cancel(true);
+          return future;
+        }
+
         final SetMultimap<String, ListenableFuture<?>> map = isReturn ? 
cancelableReturnFutures : cancelableFutures;
         map.put(cancellationId, future);
         future.addListener(
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
index a1a1c0f8712..7f79a319c28 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/RunAllFullyWidget.java
@@ -306,9 +306,11 @@ public class RunAllFullyWidget<T, ResultType>
                 }
 
                 if (isDone) {
-                  finished.compareAndSet(null, 
Either.value(processorManager.result()));
-
                   synchronized (runAllFullyLock) {
+                    if (finished.get() == null) {
+                      finished.compareAndSet(null, 
Either.value(processorManager.result()));
+                    }
+
                     cleanupIfNoMoreProcessors();
                   }
                 } else {
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java
new file mode 100644
index 00000000000..697879490e1
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/RunnableFrameProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Frame processor that simply runs a {@link Runnable}, once.
+ */
+public class RunnableFrameProcessor implements FrameProcessor<Void>
+{
+  private final Runnable runnable;
+
+  public RunnableFrameProcessor(Runnable runnable)
+  {
+    this.runnable = runnable;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<Void> runIncrementally(IntSet readableInputs)
+  {
+    runnable.run();
+    return ReturnOrAwait.returnObject(null);
+  }
+
+  @Override
+  public void cleanup()
+  {
+    // Nothing to do.
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java 
b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
index e30f2e77b02..b8b74a2b797 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
@@ -297,7 +297,7 @@ public class SuperSorter
               setAllDoneIfPossible();
             }
           },
-          exec.getExecutorService()
+          exec.asExecutor(cancellationId)
       );
 
       return FutureUtils.futureWithBaggage(
@@ -813,7 +813,7 @@ public class SuperSorter
         },
         // Must run in exec, instead of in the same thread, to avoid running 
callback immediately if the
         // worker happens to finish super-quickly.
-        exec.getExecutorService()
+        exec.asExecutor(cancellationId)
     );
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index 0f50624078b..4ed2c610525 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -222,6 +222,7 @@ public class FrameProcessorExecutorTest
       final SettableFuture<Object> future = SettableFuture.create();
       final String cancellationId = "xyzzy";
 
+      exec.registerCancellationId(cancellationId);
       Assert.assertSame(future, exec.registerCancelableFuture(future, false, 
cancellationId));
       exec.cancel(cancellationId);
 
@@ -236,6 +237,8 @@ public class FrameProcessorExecutorTest
     {
       final SleepyFrameProcessor processor = new SleepyFrameProcessor();
       final String cancellationId = "xyzzy";
+
+      exec.registerCancellationId(cancellationId);
       final ListenableFuture<Long> future = exec.runFully(processor, 
cancellationId);
 
       processor.awaitRun();
@@ -254,6 +257,8 @@ public class FrameProcessorExecutorTest
     {
       final SleepyFrameProcessor processor = new SleepyFrameProcessor();
       final String cancellationId = "xyzzy";
+
+      exec.registerCancellationId(cancellationId);
       final ListenableFuture<Long> future = exec.runFully(processor, 
cancellationId);
 
       processor.awaitRun();
@@ -314,6 +319,8 @@ public class FrameProcessorExecutorTest
 
       // Start up all systems at once.
       for (final String systemId : systemGeneratorsMap.keySet()) {
+        exec.registerCancellationId(systemId);
+
         for (InfiniteFrameProcessor generator : 
systemGeneratorsMap.get(systemId)) {
           processorFutureMap.put(generator, exec.runFully(generator, 
systemId));
         }
@@ -391,6 +398,22 @@ public class FrameProcessorExecutorTest
       // Just making sure no error is thrown when we refer to a nonexistent 
cancellationId.
       exec.cancel("nonexistent");
     }
+
+    @Test
+    public void test_runFully_nonexistentCancellationId()
+    {
+      final SleepyFrameProcessor processor = new SleepyFrameProcessor();
+      final String cancellationId = "xyzzy";
+
+      // Don't registerCancellationId(cancellationId).
+      final ListenableFuture<Long> future = exec.runFully(processor, 
cancellationId);
+
+      // Future should be immediately canceled, without running the processor.
+      Assert.assertTrue(future.isDone());
+      Assert.assertTrue(future.isCancelled());
+      Assert.assertFalse(processor.didGetInterrupt());
+      Assert.assertFalse(processor.didCleanup());
+    }
   }
 
   public abstract static class BaseFrameProcessorExecutorTestSuite extends 
InitializedNullHandlingTest
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
index 7cd1e980428..d0ae5a986a0 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/RunAllFullyWidgetTest.java
@@ -409,6 +409,8 @@ public class RunAllFullyWidgetTest extends 
FrameProcessorExecutorTest.BaseFrameP
                  .mapToObj(i -> new SleepyFrameProcessor())
                  .collect(Collectors.toList());
 
+    final String cancellationId = "xyzzy";
+    exec.registerCancellationId(cancellationId);
     final ListenableFuture<Long> future = exec.runAllFully(
         possiblyDelay(
             ensureClose(
@@ -418,7 +420,7 @@ public class RunAllFullyWidgetTest extends 
FrameProcessorExecutorTest.BaseFrameP
         ),
         maxOutstandingProcessors,
         bouncer,
-        "xyzzy"
+        cancellationId
     );
 
     for (int i = 0; i < expectedRunningProcessors; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to