This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 50503fe0ef0 MSQ: Properly report errors that occur when starting up 
RunWorkOrder. (#17069)
50503fe0ef0 is described below

commit 50503fe0ef0bb422e957ad0507b826dc80cd8b9f
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 17 08:02:02 2024 -0700

    MSQ: Properly report errors that occur when starting up RunWorkOrder. 
(#17069)
    
    * MSQ: Properly report errors that occur when starting up RunWorkOrder.
    
    In #17046, an exception thrown by RunWorkOrder#startAsync would be ignored
    and replaced with a generic CanceledFault. This patch fixes it by retaining
    the original error.
---
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  58 +++++----
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |   2 +-
 .../apache/druid/msq/exec/RunWorkOrderTest.java    | 139 +++++++++++++++++++++
 3 files changed, 173 insertions(+), 26 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 3d31d7e2c3e..3ad8bf1f29a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -129,12 +129,12 @@ public class RunWorkOrder
     STARTED,
 
     /**
-     * State entered upon calling {@link #stop()}.
+     * State entered upon calling {@link #stop(Throwable)}.
      */
     STOPPING,
 
     /**
-     * State entered when a call to {@link #stop()} concludes.
+     * State entered when a call to {@link #stop(Throwable)} concludes.
      */
     STOPPED
   }
@@ -232,7 +232,7 @@ public class RunWorkOrder
       setUpCompletionCallbacks();
     }
     catch (Throwable t) {
-      stopUnchecked();
+      stopUnchecked(t);
     }
   }
 
@@ -242,64 +242,72 @@ public class RunWorkOrder
    * are all properly cleaned up.
    *
    * Blocks until execution is fully stopped.
+   *
+   * @param t error to send to {@link RunWorkOrderListener#onFailure}, if 
success/failure has not already been sent.
+   *          Will also be thrown at the end of this method.
    */
-  public void stop() throws InterruptedException
+  public void stop(@Nullable Throwable t) throws InterruptedException
   {
     if (state.compareAndSet(State.INIT, State.STOPPING)
         || state.compareAndSet(State.STARTED, State.STOPPING)) {
       // Initiate stopping.
-      Throwable e = null;
-
       try {
         exec.cancel(cancellationId);
       }
       catch (Throwable e2) {
-        e = e2;
+        if (t == null) {
+          t = e2;
+        } else {
+          t.addSuppressed(e2);
+        }
       }
 
       try {
         frameContext.close();
       }
       catch (Throwable e2) {
-        if (e == null) {
-          e = e2;
+        if (t == null) {
+          t = e2;
         } else {
-          e.addSuppressed(e2);
+          t.addSuppressed(e2);
         }
       }
 
       try {
-        // notifyListener will ignore this cancellation error if work has 
already succeeded.
-        notifyListener(Either.error(new 
MSQException(CanceledFault.instance())));
+        // notifyListener will ignore this error if work has already succeeded.
+        notifyListener(Either.error(t != null ? t : new 
MSQException(CanceledFault.instance())));
       }
       catch (Throwable e2) {
-        if (e == null) {
-          e = e2;
+        if (t == null) {
+          t = e2;
         } else {
-          e.addSuppressed(e2);
+          t.addSuppressed(e2);
         }
       }
 
       stopLatch.countDown();
-
-      if (e != null) {
-        Throwables.throwIfInstanceOf(e, InterruptedException.class);
-        Throwables.throwIfUnchecked(e);
-        throw new RuntimeException(e);
-      }
     }
 
     stopLatch.await();
+
+    if (t != null) {
+      Throwables.throwIfInstanceOf(t, InterruptedException.class);
+      Throwables.throwIfUnchecked(t);
+      throw new RuntimeException(t);
+    }
   }
 
   /**
-   * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link 
InterruptedException}, this method sets
-   * the interrupt flag and throws an unchecked exception.
+   * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)} 
throws {@link InterruptedException},
+   * this method sets the interrupt flag and throws an unchecked exception.
+   *
+   * @param t error to send to {@link RunWorkOrderListener#onFailure}, if 
success/failure has not already been sent.
+   *          Will also be thrown at the end of this method.
    */
-  public void stopUnchecked()
+  public void stopUnchecked(@Nullable final Throwable t)
   {
     try {
-      stop();
+      stop(t);
     }
     catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 906d9e041b3..7be045542bc 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -405,7 +405,7 @@ public class WorkerImpl implements Worker
     );
 
     // Set up processorCloser (called when processing is done).
-    kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
+    kernelHolder.processorCloser.register(() -> 
runWorkOrder.stopUnchecked(null));
 
     // Start working on this stage immediately.
     kernel.startReading();
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
new file mode 100644
index 00000000000..dbd6857b272
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class RunWorkOrderTest
+{
+  private static final String CANCELLATION_ID = "my-cancellation-id";
+
+  @Test
+  public void test_stopUnchecked() throws InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    runWorkOrder.stopUnchecked(null);
+
+    // Calling a second time doesn't do anything special.
+    runWorkOrder.stopUnchecked(null);
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    
Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class));
+  }
+
+  @Test
+  public void test_stopUnchecked_error() throws InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    final ISE exception = new ISE("oops");
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(exception)
+    );
+
+    // Calling a second time doesn't do anything special. We already tried our 
best.
+    runWorkOrder.stopUnchecked(null);
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+
+  @Test
+  public void test_stopUnchecked_errorDuringExecCancel() throws 
InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    final ISE exception = new ISE("oops");
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+    Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID);
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(null)
+    );
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+
+  @Test
+  public void test_stopUnchecked_errorDuringFrameContextClose() throws 
InterruptedException
+  {
+    final FrameProcessorExecutor exec = 
Mockito.mock(FrameProcessorExecutor.class);
+    final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+    final FrameContext frameContext = Mockito.mock(FrameContext.class);
+    final WorkerStorageParameters storageParameters = 
Mockito.mock(WorkerStorageParameters.class);
+    final RunWorkOrderListener listener = 
Mockito.mock(RunWorkOrderListener.class);
+
+    final ISE exception = new ISE("oops");
+    
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+    Mockito.doThrow(exception).when(frameContext).close();
+
+    final RunWorkOrder runWorkOrder =
+        new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, 
workerContext, frameContext, listener, false, false);
+
+    Assert.assertThrows(
+        IllegalStateException.class,
+        () -> runWorkOrder.stopUnchecked(null)
+    );
+
+    Mockito.verify(exec).cancel(CANCELLATION_ID);
+    Mockito.verify(frameContext).close();
+    Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to