This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 50503fe0ef0 MSQ: Properly report errors that occur when starting up
RunWorkOrder. (#17069)
50503fe0ef0 is described below
commit 50503fe0ef0bb422e957ad0507b826dc80cd8b9f
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Sep 17 08:02:02 2024 -0700
MSQ: Properly report errors that occur when starting up RunWorkOrder.
(#17069)
* MSQ: Properly report errors that occur when starting up RunWorkOrder.
In #17046, an exception thrown by RunWorkOrder#startAsync would be ignored
and replaced with a generic CanceledFault. This patch fixes it by retaining
the original error.
---
.../org/apache/druid/msq/exec/RunWorkOrder.java | 58 +++++----
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 2 +-
.../apache/druid/msq/exec/RunWorkOrderTest.java | 139 +++++++++++++++++++++
3 files changed, 173 insertions(+), 26 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 3d31d7e2c3e..3ad8bf1f29a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -129,12 +129,12 @@ public class RunWorkOrder
STARTED,
/**
- * State entered upon calling {@link #stop()}.
+ * State entered upon calling {@link #stop(Throwable)}.
*/
STOPPING,
/**
- * State entered when a call to {@link #stop()} concludes.
+ * State entered when a call to {@link #stop(Throwable)} concludes.
*/
STOPPED
}
@@ -232,7 +232,7 @@ public class RunWorkOrder
setUpCompletionCallbacks();
}
catch (Throwable t) {
- stopUnchecked();
+ stopUnchecked(t);
}
}
@@ -242,64 +242,72 @@ public class RunWorkOrder
* are all properly cleaned up.
*
* Blocks until execution is fully stopped.
+ *
+ * @param t error to send to {@link RunWorkOrderListener#onFailure}, if
success/failure has not already been sent.
+ * Will also be thrown at the end of this method.
*/
- public void stop() throws InterruptedException
+ public void stop(@Nullable Throwable t) throws InterruptedException
{
if (state.compareAndSet(State.INIT, State.STOPPING)
|| state.compareAndSet(State.STARTED, State.STOPPING)) {
// Initiate stopping.
- Throwable e = null;
-
try {
exec.cancel(cancellationId);
}
catch (Throwable e2) {
- e = e2;
+ if (t == null) {
+ t = e2;
+ } else {
+ t.addSuppressed(e2);
+ }
}
try {
frameContext.close();
}
catch (Throwable e2) {
- if (e == null) {
- e = e2;
+ if (t == null) {
+ t = e2;
} else {
- e.addSuppressed(e2);
+ t.addSuppressed(e2);
}
}
try {
- // notifyListener will ignore this cancellation error if work has
already succeeded.
- notifyListener(Either.error(new
MSQException(CanceledFault.instance())));
+ // notifyListener will ignore this error if work has already succeeded.
+ notifyListener(Either.error(t != null ? t : new
MSQException(CanceledFault.instance())));
}
catch (Throwable e2) {
- if (e == null) {
- e = e2;
+ if (t == null) {
+ t = e2;
} else {
- e.addSuppressed(e2);
+ t.addSuppressed(e2);
}
}
stopLatch.countDown();
-
- if (e != null) {
- Throwables.throwIfInstanceOf(e, InterruptedException.class);
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
}
stopLatch.await();
+
+ if (t != null) {
+ Throwables.throwIfInstanceOf(t, InterruptedException.class);
+ Throwables.throwIfUnchecked(t);
+ throw new RuntimeException(t);
+ }
}
/**
- * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link
InterruptedException}, this method sets
- * the interrupt flag and throws an unchecked exception.
+ * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)}
throws {@link InterruptedException},
+ * this method sets the interrupt flag and throws an unchecked exception.
+ *
+ * @param t error to send to {@link RunWorkOrderListener#onFailure}, if
success/failure has not already been sent.
+ * Will also be thrown at the end of this method.
*/
- public void stopUnchecked()
+ public void stopUnchecked(@Nullable final Throwable t)
{
try {
- stop();
+ stop(t);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 906d9e041b3..7be045542bc 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -405,7 +405,7 @@ public class WorkerImpl implements Worker
);
// Set up processorCloser (called when processing is done).
- kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
+ kernelHolder.processorCloser.register(() ->
runWorkOrder.stopUnchecked(null));
// Start working on this stage immediately.
kernel.startReading();
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
new file mode 100644
index 00000000000..dbd6857b272
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class RunWorkOrderTest
+{
+ private static final String CANCELLATION_ID = "my-cancellation-id";
+
+ @Test
+ public void test_stopUnchecked() throws InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ runWorkOrder.stopUnchecked(null);
+
+ // Calling a second time doesn't do anything special.
+ runWorkOrder.stopUnchecked(null);
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+
Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class));
+ }
+
+ @Test
+ public void test_stopUnchecked_error() throws InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ final ISE exception = new ISE("oops");
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(exception)
+ );
+
+ // Calling a second time doesn't do anything special. We already tried our
best.
+ runWorkOrder.stopUnchecked(null);
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+
+ @Test
+ public void test_stopUnchecked_errorDuringExecCancel() throws
InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+ final ISE exception = new ISE("oops");
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+ Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(null)
+ );
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+
+ @Test
+ public void test_stopUnchecked_errorDuringFrameContextClose() throws
InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+ final ISE exception = new ISE("oops");
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+ Mockito.doThrow(exception).when(frameContext).close();
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(null)
+ );
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]