This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch 31.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/31.0.0 by this push:
new f27a1dc651d [Backport] Dart: Smoother handling of stage early exit
(#17228) (#17069) (#17256)
f27a1dc651d is described below
commit f27a1dc651d0ee607337ac6f0483643b32ace66a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Oct 5 17:29:34 2024 +0530
[Backport] Dart: Smoother handling of stage early exit (#17228) (#17069)
(#17256)
* MSQ: Properly report errors that occur when starting up RunWorkOrder.
(#17069)
* Dart: Smoother handling of stage early-exit. (#17228)
---------
Co-authored-by: Gian Merlino <[email protected]>
---
.../msq/dart/controller/DartWorkerManager.java | 40 +++---
.../druid/msq/dart/worker/DartWorkerRunner.java | 6 +-
.../org/apache/druid/msq/exec/RunWorkOrder.java | 72 +++++++----
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 4 +-
.../druid/msq/indexing/error/MSQFaultUtils.java | 7 ++
.../druid/msq/kernel/worker/WorkerStageKernel.java | 19 ++-
.../druid/msq/kernel/worker/WorkerStagePhase.java | 2 +-
.../apache/druid/msq/exec/RunWorkOrderTest.java | 139 +++++++++++++++++++++
.../channel/ReadableByteChunksFrameChannel.java | 6 +-
9 files changed, 234 insertions(+), 61 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
index 54e163862d6..c49e0b98aed 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java
@@ -172,25 +172,27 @@ public class DartWorkerManager implements WorkerManager
public void stop(boolean interrupt)
{
if (state.compareAndSet(State.STARTED, State.STOPPED)) {
- final List<ListenableFuture<?>> futures = new ArrayList<>();
-
- // Send stop commands to all workers. This ensures they exit promptly,
and do not get left in a zombie state.
- // For this reason, the workerClient uses an unlimited retry policy. If
a stop command is lost, a worker
- // could get stuck in a zombie state without its controller. This state
would persist until the server that
- // ran the controller shuts down or restarts. At that time, the listener
in DartWorkerRunner.BrokerListener calls
- // "controllerFailed()" on the Worker, and the zombie worker would exit.
-
- for (final String workerId : workerIds) {
- futures.add(workerClient.stopWorker(workerId));
- }
-
- // Block until messages are acknowledged, or until the worker we're
communicating with has failed.
-
- try {
- FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
- }
- catch (Throwable ignored) {
- // Suppress errors.
+ if (interrupt) {
+ final List<ListenableFuture<?>> futures = new ArrayList<>();
+
+ // Send stop commands to all workers. This ensures they exit promptly,
and do not get left in a zombie state.
+ // For this reason, the workerClient uses an unlimited retry policy.
If a stop command is lost, a worker
+ // could get stuck in a zombie state without its controller. This
state would persist until the server that
+ // ran the controller shuts down or restarts. At that time, the
listener in DartWorkerRunner.BrokerListener
+ // calls "controllerFailed()" on the Worker, and the zombie worker
would exit.
+
+ for (final String workerId : workerIds) {
+ futures.add(workerClient.stopWorker(workerId));
+ }
+
+ // Block until messages are acknowledged, or until the worker we're
communicating with has failed.
+
+ try {
+ FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
+ }
+ catch (Throwable ignored) {
+ // Suppress errors.
+ }
}
CloseableUtils.closeAndSuppressExceptions(workerClient, e -> log.warn(e,
"Failed to close workerClient"));
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
index ae136196a0f..d51a410fbb3 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
@@ -36,8 +36,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.worker.http.DartWorkerInfo;
import org.apache.druid.msq.dart.worker.http.GetWorkersResponse;
import org.apache.druid.msq.exec.Worker;
-import org.apache.druid.msq.indexing.error.CanceledFault;
-import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.msq.rpc.WorkerResource;
import org.apache.druid.query.QueryContext;
@@ -142,8 +141,7 @@ public class DartWorkerRunner
holder.worker.run();
}
catch (Throwable t) {
- if (Thread.interrupted()
- || t instanceof MSQException && ((MSQException)
t).getFault().getErrorCode().equals(CanceledFault.CODE)) {
+ if (Thread.interrupted() || MSQFaultUtils.isCanceledException(t)) {
log.debug(t, "Canceled, exiting thread.");
} else {
log.warn(t, "Worker for query[%s] failed and stopped.", queryId);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index e92d310cdde..8b6216861a1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -129,12 +129,17 @@ public class RunWorkOrder
STARTED,
/**
- * State entered upon calling {@link #stop()}.
+ * State entered upon failure of some work.
+ */
+ FAILED,
+
+ /**
+ * State entered upon calling {@link #stop(Throwable)}.
*/
STOPPING,
/**
- * State entered when a call to {@link #stop()} concludes.
+ * State entered when a call to {@link #stop(Throwable)} concludes.
*/
STOPPED
}
@@ -232,7 +237,7 @@ public class RunWorkOrder
setUpCompletionCallbacks();
}
catch (Throwable t) {
- stopUnchecked();
+ stopUnchecked(t);
}
}
@@ -242,64 +247,73 @@ public class RunWorkOrder
* are all properly cleaned up.
*
* Blocks until execution is fully stopped.
+ *
+ * @param t error to send to {@link RunWorkOrderListener#onFailure}, if
success/failure has not already been sent.
+ * Will also be thrown at the end of this method.
*/
- public void stop() throws InterruptedException
+ public void stop(@Nullable Throwable t) throws InterruptedException
{
if (state.compareAndSet(State.INIT, State.STOPPING)
- || state.compareAndSet(State.STARTED, State.STOPPING)) {
+ || state.compareAndSet(State.STARTED, State.STOPPING)
+ || state.compareAndSet(State.FAILED, State.STOPPING)) {
// Initiate stopping.
- Throwable e = null;
-
try {
exec.cancel(cancellationId);
}
catch (Throwable e2) {
- e = e2;
+ if (t == null) {
+ t = e2;
+ } else {
+ t.addSuppressed(e2);
+ }
}
try {
frameContext.close();
}
catch (Throwable e2) {
- if (e == null) {
- e = e2;
+ if (t == null) {
+ t = e2;
} else {
- e.addSuppressed(e2);
+ t.addSuppressed(e2);
}
}
try {
- // notifyListener will ignore this cancellation error if work has
already succeeded.
- notifyListener(Either.error(new
MSQException(CanceledFault.instance())));
+ // notifyListener will ignore this error if work has already succeeded.
+ notifyListener(Either.error(t != null ? t : new
MSQException(CanceledFault.instance())));
}
catch (Throwable e2) {
- if (e == null) {
- e = e2;
+ if (t == null) {
+ t = e2;
} else {
- e.addSuppressed(e2);
+ t.addSuppressed(e2);
}
}
stopLatch.countDown();
-
- if (e != null) {
- Throwables.throwIfInstanceOf(e, InterruptedException.class);
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
- }
}
stopLatch.await();
+
+ if (t != null) {
+ Throwables.throwIfInstanceOf(t, InterruptedException.class);
+ Throwables.throwIfUnchecked(t);
+ throw new RuntimeException(t);
+ }
}
/**
- * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link
InterruptedException}, this method sets
- * the interrupt flag and throws an unchecked exception.
+ * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)}
throws {@link InterruptedException},
+ * this method sets the interrupt flag and throws an unchecked exception.
+ *
+ * @param t error to send to {@link RunWorkOrderListener#onFailure}, if
success/failure has not already been sent.
+ * Will also be thrown at the end of this method.
*/
- public void stopUnchecked()
+ public void stopUnchecked(@Nullable final Throwable t)
{
try {
- stop();
+ stop(t);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -554,7 +568,11 @@ public class RunWorkOrder
@Override
public void onFailure(final Throwable t)
{
- notifyListener(Either.error(t));
+ if (state.compareAndSet(State.STARTED, State.FAILED)) {
+ // Call notifyListener only if we were STARTED. In particular,
if we were STOPPING, skip this and allow
+ // the stop() method to set its own Canceled error.
+ notifyListener(Either.error(t));
+ }
}
},
Execs.directExecutor()
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index c2bd4ec450a..46e7e4a1449 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -406,7 +406,7 @@ public class WorkerImpl implements Worker
);
// Set up processorCloser (called when processing is done).
- kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked);
+ kernelHolder.processorCloser.register(() ->
runWorkOrder.stopUnchecked(null));
// Start working on this stage immediately.
kernel.startReading();
@@ -625,6 +625,8 @@ public class WorkerImpl implements Worker
holder.finishProcessing(stageId);
final WorkerStageKernel kernel = holder.getKernelFor(stageId);
if (kernel != null) {
+ // Calling setStageFinished places the kernel into FINISHED state,
which also means we'll ignore any
+ // "Canceled" errors generated by "holder.finishProcessing(stageId)".
(See WorkerStageKernel.fail)
kernel.setStageFinished();
}
});
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
index 781639b17ad..5385645d5eb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java
@@ -50,4 +50,11 @@ public class MSQFaultUtils
return message.split(ERROR_CODE_DELIMITER, 2)[0];
}
+ /**
+ * Returns whether the provided throwable is a {@link MSQException} with
{@link CanceledFault}.
+ */
+ public static boolean isCanceledException(final Throwable t)
+ {
+ return t instanceof MSQException && ((MSQException)
t).getFault().getErrorCode().equals(CanceledFault.CODE);
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
index b838092ca71..5745e9b75af 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
@@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.kernel.ShuffleKind;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
@@ -195,12 +196,18 @@ public class WorkerStageKernel
{
Preconditions.checkNotNull(t, "t");
- transitionTo(WorkerStagePhase.FAILED);
- resultKeyStatisticsSnapshot = null;
- resultPartitionBoundaries = null;
-
- if (exceptionFromFail == null) {
- exceptionFromFail = t;
+ if (WorkerStagePhase.FAILED.canTransitionFrom(phase)) {
+ transitionTo(WorkerStagePhase.FAILED);
+ resultKeyStatisticsSnapshot = null;
+ resultPartitionBoundaries = null;
+
+ if (exceptionFromFail == null) {
+ exceptionFromFail = t;
+ }
+ } else if (!MSQFaultUtils.isCanceledException(t)) {
+ // Current phase is already terminal. Log and suppress this error. It
likely happened during cleanup.
+ // (Don't log CanceledFault though. Ignore those if they come after the
kernel is in a terminal phase.)
+ log.warn(t, "Stage[%s] failed while in phase[%s]",
getStageDefinition().getId(), phase);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
index 10543beeb06..797dd8d9f8a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
@@ -84,7 +84,7 @@ public enum WorkerStagePhase
@Override
public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
{
- return true;
+ return !priorPhase.isTerminal();
}
};
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
new file mode 100644
index 00000000000..dbd6857b272
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+public class RunWorkOrderTest
+{
+ private static final String CANCELLATION_ID = "my-cancellation-id";
+
+ @Test
+ public void test_stopUnchecked() throws InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ runWorkOrder.stopUnchecked(null);
+
+ // Calling a second time doesn't do anything special.
+ runWorkOrder.stopUnchecked(null);
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+
Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class));
+ }
+
+ @Test
+ public void test_stopUnchecked_error() throws InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ final ISE exception = new ISE("oops");
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(exception)
+ );
+
+ // Calling a second time doesn't do anything special. We already tried our
best.
+ runWorkOrder.stopUnchecked(null);
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+
+ @Test
+ public void test_stopUnchecked_errorDuringExecCancel() throws
InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+ final ISE exception = new ISE("oops");
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+ Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID);
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(null)
+ );
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+
+ @Test
+ public void test_stopUnchecked_errorDuringFrameContextClose() throws
InterruptedException
+ {
+ final FrameProcessorExecutor exec =
Mockito.mock(FrameProcessorExecutor.class);
+ final WorkerContext workerContext = Mockito.mock(WorkerContext.class);
+ final FrameContext frameContext = Mockito.mock(FrameContext.class);
+ final WorkerStorageParameters storageParameters =
Mockito.mock(WorkerStorageParameters.class);
+ final RunWorkOrderListener listener =
Mockito.mock(RunWorkOrderListener.class);
+
+ final ISE exception = new ISE("oops");
+
Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters);
+ Mockito.doThrow(exception).when(frameContext).close();
+
+ final RunWorkOrder runWorkOrder =
+ new RunWorkOrder(null, null, null, exec, CANCELLATION_ID,
workerContext, frameContext, listener, false, false);
+
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> runWorkOrder.stopUnchecked(null)
+ );
+
+ Mockito.verify(exec).cancel(CANCELLATION_ID);
+ Mockito.verify(frameContext).close();
+ Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception));
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
index a4a40d70a38..db20c609422 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
@@ -180,9 +180,9 @@ public class ReadableByteChunksFrameChannel implements
ReadableFrameChannel
public void setError(final Throwable t)
{
synchronized (lock) {
- if (noMoreWrites) {
- log.noStackTrace().warn(t, "Channel is no longer accepting writes,
cannot propagate exception");
- } else {
+ // Write error to the channel, unless "noMoreWrites" is set. If that's
set, suppress errors, so regular channel
+ // shutdown doesn't trigger warnings in the log.
+ if (!noMoreWrites) {
chunks.clear();
chunks.add(Either.error(t));
nextCompressedFrameLength = UNKNOWN_LENGTH;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]