This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/31.0.0 by this push:
     new f27a1dc651d [Backport] Dart: Smoother handling of stage early exit 
(#17228) (#17069) (#17256)
f27a1dc651d is described below

commit f27a1dc651d0ee607337ac6f0483643b32ace66a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Oct 5 17:29:34 2024 +0530

    [Backport] Dart: Smoother handling of stage early exit (#17228) (#17069) 
(#17256)
    
    * MSQ: Properly report errors that occur when starting up RunWorkOrder. 
(#17069)
    * Dart: Smoother handling of stage early-exit. (#17228)
    ---------
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../msq/dart/controller/DartWorkerManager.java     |  40 +++---
 .../druid/msq/dart/worker/DartWorkerRunner.java    |   6 +-
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  72 +++++++----
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |   4 +-
 .../druid/msq/indexing/error/MSQFaultUtils.java    |   7 ++
 .../druid/msq/kernel/worker/WorkerStageKernel.java |  19 ++-
 .../druid/msq/kernel/worker/WorkerStagePhase.java  |   2 +-
 .../apache/druid/msq/exec/RunWorkOrderTest.java    | 139 +++++++++++++++++++++
 .../channel/ReadableByteChunksFrameChannel.java    |   6 +-
 9 files changed, 234 insertions(+), 61 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 54e163862d6..c49e0b98aed 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -172,25 +172,27 @@ public class DartWorkerManager implements WorkerManager
   public void stop(boolean interrupt)
   {
     if (state.compareAndSet(State.STARTED, State.STOPPED)) {
-      final List<ListenableFuture<?>> futures = new ArrayList<>();
-
-      // Send stop commands to all workers. This ensures they exit promptly, 
and do not get left in a zombie state.
-      // For this reason, the workerClient uses an unlimited retry policy. If 
a stop command is lost, a worker
-      // could get stuck in a zombie state without its controller. This state 
would persist until the server that
-      // ran the controller shuts down or restarts. At that time, the listener 
in DartWorkerRunner.BrokerListener calls
-      // "controllerFailed()" on the Worker, and the zombie worker would exit.
-
-      for (final String workerId : workerIds) {
-        futures.add(workerClient.stopWorker(workerId));
-      }
-
-      // Block until messages are acknowledged, or until the worker we're 
communicating with has failed.
-
-      try {
-        FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
-      }
-      catch (Throwable ignored) {
-        // Suppress errors.
+      if (interrupt) {
+        final List<ListenableFuture<?>> futures = new ArrayList<>();
+
+        // Send stop commands to all workers. This ensures they exit promptly, 
and do not get left in a zombie state.
+        // For this reason, the workerClient uses an unlimited retry policy. 
If a stop command is lost, a worker
+        // could get stuck in a zombie state without its controller. This 
state would persist until the server that
+        // ran the controller shuts down or restarts. At that time, the 
listener in DartWorkerRunner.BrokerListener
+        // calls "controllerFailed()" on the Worker, and the zombie worker 
would exit.
+
+        for (final String workerId : workerIds) {
+          futures.add(workerClient.stopWorker(workerId));
+        }
+
+        // Block until messages are acknowledged, or until the worker we're 
communicating with has failed.
+
+        try {
+          FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
+        }
+        catch (Throwable ignored) {
+          // Suppress errors.
+        }
       }
 
       CloseableUtils.closeAndSuppressExceptions(workerClient, e -> log.warn(e, 
"Failed to close workerClient"));
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
index ae136196a0f..d51a410fbb3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
@@ -36,8 +36,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.worker.http.DartWorkerInfo;
 import org.apache.druid.msq.dart.worker.http.GetWorkersResponse;
 import org.apache.druid.msq.exec.Worker;
-import org.apache.druid.msq.indexing.error.CanceledFault;
-import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFaultUtils;
 import org.apache.druid.msq.rpc.ResourcePermissionMapper;
 import org.apache.druid.msq.rpc.WorkerResource;
 import org.apache.druid.query.QueryContext;
@@ -142,8 +141,7 @@ public class DartWorkerRunner
           holder.worker.run();
         }
         catch (Throwable t) {
-          if (Thread.interrupted()
-              || t instanceof MSQException && ((MSQException) 
t).getFault().getErrorCode().equals(CanceledFault.CODE)) {
+          if (Thread.interrupted() || MSQFaultUtils.isCanceledException(t)) {
             log.debug(t, "Canceled, exiting thread.");
           } else {
             log.warn(t, "Worker for query[%s] failed and stopped.", queryId);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index e92d310cdde..8b6216861a1 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -129,12 +129,17 @@ public class RunWorkOrder
     STARTED,
 
     /**
-     * State entered upon calling {@link #stop()}.
+     * State entered upon failure of some work.
+     */
+    FAILED,
+
+    /**
+     * State entered upon calling {@link #stop(Throwable)}.
      */
     STOPPING,
 
     /**
-     * State entered when a call to {@link #stop()} concludes.
+     * State entered when a call to {@link #stop(Throwable)} concludes.
      */
     STOPPED
   }
@@ -232,7 +237,7 @@ public class RunWorkOrder
       setUpCompletionCallbacks();
     }
     catch (Throwable t) {
-      stopUnchecked();
+      stopUnchecked(t);
     }
   }
 
@@ -242,64 +247,73 @@ public class RunWorkOrder
    * are all properly cleaned up.
    *
    * Blocks until execution is fully stopped.
+   *
+   * @param t error to send to {@link RunWorkOrderListener#onFailure}, if 
success/failure has not already been sent.
+   *          Will also be thrown at the end of this method.
    */
-  public void stop() throws InterruptedException
+  public void stop(@Nullable Throwable t) throws InterruptedException
   {
     if (state.compareAndSet(State.INIT, State.STOPPING)
-        || state.compareAndSet(State.STARTED, State.STOPPING)) {
+        || state.compareAndSet(State.STARTED, State.STOPPING)
+        || state.compareAndSet(State.FAILED, State.STOPPING)) {
       // Initiate stopping.
-      Throwable e = null;
-
       try {
         exec.cancel(cancellationId);
       }
       catch (Throwable e2) {
-        e = e2;
+        if (t == null) {
+          t = e2;
+        } else {
+          t.addSuppressed(e2);
+        }
       }
 
       try {
         frameContext.close();
       }
       catch (Throwable e2) {
-        if (e == null) {
-          e = e2;
+        if (t == null) {
+          t = e2;
         } else {
-          e.addSuppressed(e2);
+          t.addSuppressed(e2);
         }
       }
 
       try {
-        // notifyListener will ignore this cancellation error if work has 
already succeeded.
-        notifyListener(Either.error(new 
MSQException(CanceledFault.instance())));
+        // notifyListener will ignore this error if work has already succeeded.
+        notifyListener(Either.error(t != null ? t : new 
MSQException(CanceledFault.instance())));
       }
       catch (Throwable e2) {
-        if (e == null) {
-          e = e2;
+        if (t == null) {
+          t = e2;
         } else {
-          e.addSuppressed(e2);
+          t.addSuppressed(e2);
         }
       }
 
       stopLatch.countDown();
-
-      if (e != null) {
-        Throwables.throwIfInstanceOf(e, InterruptedException.class);
-        Throwables.throwIfUnchecked(e);
-        throw new RuntimeException(e);
-      }
     }
 
     stopLatch.await();
+
+    if (t != null) {
+      Throwables.throwIfInstanceOf(t, InterruptedException.class);
+      Throwables.throwIfUnchecked(t);
+      throw new RuntimeException(t);
+    }
   }
 
   /**
-   * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link 
InterruptedException}, this method sets
-   * the interrupt flag and throws an unchecked exception.
+   * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)} 
throws {@link InterruptedException},
+   * this method sets the interrupt flag and throws an unchecked exception.
+   *
+   * @param t error to send to {@link RunWorkOrderListener#onFailure}, if 
success/failure has not already been sent.
+   *          Will also be thrown at the end of this method.
    */
-  public void stopUnchecked()
+  public void stopUnchecked(@Nullable final Throwable t)
   {
     try {
-      stop();
+      stop(t);
     }
     catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -554,7 +568,11 @@ public class RunWorkOrder
           @Override
           public void onFailure(final Throwable t)
           {
-            notifyListener(Either.error(t));
+            if (state.compareAndSet(State.STARTED, State.FAILED)) {
+              // Call notifyListener only if we were STARTED. In particular, 
if we were STOPPING, skip this and allow
+              // the stop() method to set its own Canceled error.
+              notifyListener(Either.error(t));
+            }
           }
         },
         Execs.directExecutor()
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index c2bd4ec450a..46e7e4a1449 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -406,7 +406,7 @@ public class WorkerImpl implements Worker
     );
 
     // Set up processorCloser (called when processing is done).
-    kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
+    kernelHolder.processorCloser.register(() -> 
runWorkOrder.stopUnchecked(null));
 
     // Start working on this stage immediately.
     kernel.startReading();
@@ -625,6 +625,8 @@ public class WorkerImpl implements Worker
       holder.finishProcessing(stageId);
       final WorkerStageKernel kernel = holder.getKernelFor(stageId);
       if (kernel != null) {
+        // Calling setStageFinished places the kernel into FINISHED state, 
which also means we'll ignore any
+        // "Canceled" errors generated by "holder.finishProcessing(stageId)". 
(See WorkerStageKernel.fail)
         kernel.setStageFinished();
       }
     });
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
index 781639b17ad..5385645d5eb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
@@ -50,4 +50,11 @@ public class MSQFaultUtils
     return message.split(ERROR_CODE_DELIMITER, 2)[0];
   }
 
+  /**
+   * Returns whether the provided throwable is a {@link MSQException} with 
{@link CanceledFault}.
+   */
+  public static boolean isCanceledException(final Throwable t)
+  {
+    return t instanceof MSQException && ((MSQException) 
t).getFault().getErrorCode().equals(CanceledFault.CODE);
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
index b838092ca71..5745e9b75af 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.indexing.error.MSQFaultUtils;
 import org.apache.druid.msq.kernel.ShuffleKind;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageId;
@@ -195,12 +196,18 @@ public class WorkerStageKernel
   {
     Preconditions.checkNotNull(t, "t");
 
-    transitionTo(WorkerStagePhase.FAILED);
-    resultKeyStatisticsSnapshot = null;
-    resultPartitionBoundaries = null;
-
-    if (exceptionFromFail == null) {
-      exceptionFromFail = t;
+    if (WorkerStagePhase.FAILED.canTransitionFrom(phase)) {
+      transitionTo(WorkerStagePhase.FAILED);
+      resultKeyStatisticsSnapshot = null;
+      resultPartitionBoundaries = null;
+
+      if (exceptionFromFail == null) {
+        exceptionFromFail = t;
+      }
+    } else if (!MSQFaultUtils.isCanceledException(t)) {
+      // Current phase is already terminal. Log and suppress this error. It 
likely happened during cleanup.
+      // (Don't log CanceledFault though. Ignore those if they come after the 
kernel is in a terminal phase.)
+      log.warn(t, "Stage[%s] failed while in phase[%s]", 
getStageDefinition().getId(), phase);
     }
   }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
index 10543beeb06..797dd8d9f8a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
@@ -84,7 +84,7 @@ public enum WorkerStagePhase
     @Override
     public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
     {
-      return true;
+      return !priorPhase.isTerminal();
     }
   };
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
new file mode 100644
index 00000000000..dbd6857b272
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class RunWorkOrderTest
+{
+  private static final String CANCELLATION_ID = "my-cancellation-id";
+
+  @Test
+  public void test_stopUnchecked() throws InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    runWorkOrder.stopUnchecked(null);
+
+    // Calling a second time doesn't do anything special.
+    runWorkOrder.stopUnchecked(null);
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    
Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class));
+  }
+
+  @Test
+  public void test_stopUnchecked_error() throws InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    final ISE exception = new ISE("oops");
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(exception)
+    );
+
+    // Calling a second time doesn't do anything special. We already tried our 
best.
+    runWorkOrder.stopUnchecked(null);
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+
+  @Test
+  public void test_stopUnchecked_errorDuringExecCancel() throws 
InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    final ISE exception = new ISE("oops");
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+    Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(null)
+    );
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+
+  @Test
+  public void test_stopUnchecked_errorDuringFrameContextClose() throws 
InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    final ISE exception = new ISE("oops");
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+    Mockito.doThrow(exception).when(frameContext).close();
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(null)
+    );
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
index a4a40d70a38..db20c609422 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
@@ -180,9 +180,9 @@ public class ReadableByteChunksFrameChannel implements 
ReadableFrameChannel
   public void setError(final Throwable t)
   {
     synchronized (lock) {
-      if (noMoreWrites) {
-        log.noStackTrace().warn(t, "Channel is no longer accepting writes, 
cannot propagate exception");
-      } else {
+      // Write error to the channel, unless "noMoreWrites" is set. If that's 
set, suppress errors, so regular channel
+      // shutdown doesn't trigger warnings in the log.
+      if (!noMoreWrites) {
         chunks.clear();
         chunks.add(Either.error(t));
         nextCompressedFrameLength = UNKNOWN_LENGTH;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to