This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 82b2c874edf27e5d12ec01fc673c32364e5cfa5b Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Tue Nov 17 23:12:36 2020 +0800 [FLINK-20193][runtime] Catch all uncaught Throwables from the SplitEnumerator in the SourceCoordinator. This also makes the error message in CoordinatorExecutorThreadFactory clearer. This closes #14104 --- .../source/mocks/MockSplitEnumeratorContext.java | 257 +++++++++++++++++++++ .../source/coordinator/SourceCoordinator.java | 111 +++++---- .../coordinator/SourceCoordinatorContext.java | 22 +- .../coordinator/SourceCoordinatorProvider.java | 32 ++- .../MockOperatorCoordinatorContext.java | 7 + .../coordinator/SourceCoordinatorContextTest.java | 1 - .../coordinator/SourceCoordinatorProviderTest.java | 1 + .../source/coordinator/SourceCoordinatorTest.java | 51 +++- .../coordinator/SourceCoordinatorTestBase.java | 1 - 9 files changed, 417 insertions(+), 66 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java new file mode 100644 index 0000000..19f9ccc --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java @@ -0,0 +1,257 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.api.connector.source.mocks; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.ThrowableCatchingRunnable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +/** + * A mock class for {@link SplitEnumeratorContext}. + */ +public class MockSplitEnumeratorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> { + private final Map<Integer, List<SourceEvent>> sentSourceEvent; + private final ConcurrentMap<Integer, ReaderInfo> registeredReaders; + private final List<SplitsAssignment<SplitT>> splitsAssignmentSequence; + private final ExecutorService workerExecutor; + private final ExecutorService mainExecutor; + private final TestingExecutorThreadFactory mainThreadFactory; + private final AtomicReference<Throwable> errorInWorkerThread; + private final AtomicReference<Throwable> errorInMainThread; + private final BlockingQueue<Callable<Future<?>>> oneTimeCallables; + private final List<Callable<Future<?>>> periodicCallables; + private final AtomicBoolean stoppedAcceptAsyncCalls; + + private final int parallelism; + + public MockSplitEnumeratorContext(int parallelism) { + this.sentSourceEvent = new HashMap<>(); + this.registeredReaders = new ConcurrentHashMap<>(); + this.splitsAssignmentSequence = new ArrayList<>(); + this.parallelism = parallelism; + this.errorInWorkerThread = new AtomicReference<>(); + this.errorInMainThread = new AtomicReference<>(); + this.oneTimeCallables = new ArrayBlockingQueue<>(100); + this.periodicCallables = Collections.synchronizedList(new ArrayList<>()); + this.mainThreadFactory = getThreadFactory("SplitEnumerator-main", errorInMainThread); + this.workerExecutor = getExecutor(getThreadFactory("SplitEnumerator-worker", errorInWorkerThread)); + this.mainExecutor = getExecutor(mainThreadFactory); + this.stoppedAcceptAsyncCalls = new AtomicBoolean(false); + } + + @Override + public MetricGroup metricGroup() { + return new UnregisteredMetricsGroup(); + } + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + try { + if (!mainThreadFactory.isCurrentThreadMainExecutorThread()) { + mainExecutor.submit(() -> + sentSourceEvent.computeIfAbsent(subtaskId, id -> new ArrayList<>()).add(event)).get(); + } else { + sentSourceEvent.computeIfAbsent(subtaskId, id -> new ArrayList<>()).add(event); + } + } catch (Exception e) { + throw new RuntimeException("Failed to assign splits", e); + } + } + + @Override + public int currentParallelism() { + return parallelism; + } + + @Override + public Map<Integer, ReaderInfo> registeredReaders() { + return registeredReaders; + } + + @Override + public void assignSplits(SplitsAssignment<SplitT> newSplitAssignments) { + splitsAssignmentSequence.add(newSplitAssignments); + } + + @Override + public void signalNoMoreSplits(int subtask) {} + + @Override + public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) { + if (stoppedAcceptAsyncCalls.get()) { + return; + } + oneTimeCallables.add(() -> + workerExecutor.submit(wrap(errorInWorkerThread, () -> { + try { + T result = callable.call(); + mainExecutor.submit(wrap(errorInMainThread, () -> handler.accept(result, null))).get(); + } catch (Throwable t) { + handler.accept(null, t); + } + }))); + } + + @Override + public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) { + if (stoppedAcceptAsyncCalls.get()) { + return; + } + periodicCallables.add(() -> + workerExecutor.submit(wrap(errorInWorkerThread, () -> { + try { + T result = callable.call(); + mainExecutor.submit(wrap(errorInMainThread, () -> handler.accept(result, null))).get(); + } catch (Throwable t) { + handler.accept(null, t); + } + }))); + } + + @Override + public void runInCoordinatorThread(Runnable runnable) { + mainExecutor.execute(runnable); + } + + public void close() { + stoppedAcceptAsyncCalls.set(true); + } + + // ------------ helper method to manipulate the context ------------- + + public void runNextOneTimeCallable() throws Throwable { + oneTimeCallables.take().call().get(); + checkError(); + } + + public void runPeriodicCallable(int index) throws Throwable { + periodicCallables.get(index).call().get(); + checkError(); + } + + public Map<Integer, List<SourceEvent>> getSentSourceEvent() throws Exception { + return workerExecutor.submit(() -> new HashMap<>(sentSourceEvent)).get(); + } + + public void registerReader(ReaderInfo readerInfo) { + registeredReaders.put(readerInfo.getSubtaskId(), readerInfo); + } + + public void unregisterReader(int readerId) { + registeredReaders.remove(readerId); + } + + public List<Callable<Future<?>>> getPeriodicCallables() { + return periodicCallables; + } + + public BlockingQueue<Callable<Future<?>>> getOneTimeCallables() { + return oneTimeCallables; + } + + public List<SplitsAssignment<SplitT>> getSplitsAssignmentSequence() { + return splitsAssignmentSequence; + } + + // ------------- private helpers ------------- + + private void checkError() throws Throwable { + if (errorInMainThread.get() != null) { + throw errorInMainThread.get(); + } + if (errorInWorkerThread.get() != null) { + throw errorInWorkerThread.get(); + } + } + + private static TestingExecutorThreadFactory getThreadFactory(String threadName, AtomicReference<Throwable> error) { + return new TestingExecutorThreadFactory(threadName, error); + } + + private static ExecutorService getExecutor(TestingExecutorThreadFactory threadFactory) { + return Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + private static ThrowableCatchingRunnable wrap(AtomicReference<Throwable> error, Runnable r) { + return new ThrowableCatchingRunnable(t -> { + if (!error.compareAndSet(null, t)) { + error.get().addSuppressed(t); + } + }, r); + } + + // -------- private class ----------- + + /** + * A thread factory class that provides some helper methods. + */ + public static class TestingExecutorThreadFactory implements ThreadFactory { + private final String coordinatorThreadName; + private final AtomicReference<Throwable> error; + private Thread t; + + TestingExecutorThreadFactory(String coordinatorThreadName, AtomicReference<Throwable> error) { + this.coordinatorThreadName = coordinatorThreadName; + this.error = error; + this.t = null; + } + + @Override + public Thread newThread(Runnable r) { + if (t != null) { + throw new IllegalStateException("Should never happen. This factory should only be used by a " + + "SingleThreadExecutor."); + } + t = new Thread(r, coordinatorThreadName); + t.setUncaughtExceptionHandler((t1, e) -> { + if (!error.compareAndSet(null, e)) { + error.get().addSuppressed(e); + } + }); + return t; + } + + boolean isCurrentThreadMainExecutorThread() { + return Thread.currentThread() == t; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 2685ca6..07ebbdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -32,8 +32,10 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TemporaryClassLoaderContext; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,8 +123,11 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements // The start sequence is the first task in the coordinator executor. // We rely on the single-threaded coordinator executor to guarantee // the other methods are invoked after the enumerator has started. - coordinatorExecutor.execute(() -> enumerator.start()); started = true; + runInEventLoop( + () -> enumerator.start(), + "starting the SplitEnumerator." + ); } @Override @@ -145,10 +150,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } @Override - public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception { - ensureStarted(); - coordinatorExecutor.execute(() -> { - try { + public void handleEventFromOperator(int subtask, OperatorEvent event) { + runInEventLoop( + () -> { LOG.debug("Handling event from subtask {} of source {}: {}", subtask, operatorName, event); if (event instanceof RequestSplitEvent) { enumerator.handleSplitRequest(subtask, ((RequestSplitEvent) event).hostName()); @@ -159,76 +163,63 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } else { throw new FlinkException("Unrecognized Operator Event: " + event); } - } catch (Exception e) { - LOG.error("Failing the job due to exception when handling operator event {} from subtask {} " + - "of source {}.", event, subtask, operatorName, e); - context.failJob(e); - } - }); + }, + "handling operator event %s from subtask %d", event, subtask + ); } @Override public void subtaskFailed(int subtaskId, @Nullable Throwable reason) { - ensureStarted(); - coordinatorExecutor.execute(() -> { - try { + runInEventLoop( + () -> { LOG.info("Handling subtask {} failure of source {}.", subtaskId, operatorName); List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId); context.unregisterSourceReader(subtaskId); LOG.debug("Adding {} back to the split enumerator of source {}.", splitsToAddBack, operatorName); enumerator.addSplitsBack(splitsToAddBack, subtaskId); - } catch (Exception e) { - LOG.error("Failing the job due to exception when handling subtask {} failure in source {}.", - subtaskId, operatorName, e); - context.failJob(e); - } - }); + }, + "handling subtask %d failure", subtaskId + ); } @Override - public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception { - ensureStarted(); - - coordinatorExecutor.execute(() -> { - try { + public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) { + runInEventLoop( + () -> { LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", operatorName, checkpointId); - result.complete(toBytes(checkpointId)); - } catch (Exception e) { - result.completeExceptionally(new CompletionException( - String.format("Failed to checkpoint coordinator for source %s due to ", operatorName), e)); - } - }); + try { + result.complete(toBytes(checkpointId)); + } catch (Throwable e) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(e); + result.completeExceptionally(new CompletionException( + String.format("Failed to checkpoint SplitEnumerator for source %s", operatorName), e)); + } + }, + "taking checkpoint %d", checkpointId + ); } @Override public void notifyCheckpointComplete(long checkpointId) { - ensureStarted(); - coordinatorExecutor.execute(() -> { - try { + runInEventLoop( + () -> { LOG.info("Marking checkpoint {} as completed for source {}.", checkpointId, operatorName); context.onCheckpointComplete(checkpointId); enumerator.notifyCheckpointComplete(checkpointId); - } catch (Exception e) { - LOG.error("Failing the job due to exception when notifying the completion of the " - + "checkpoint {} for source {}.", checkpointId, operatorName, e); - context.failJob(e); - } - }); + }, + "notifying the enumerator of completion of checkpoint %d", checkpointId + ); } @Override public void notifyCheckpointAborted(long checkpointId) { - ensureStarted(); - coordinatorExecutor.execute(() -> { - try { + runInEventLoop( + () -> { LOG.info("Marking checkpoint {} as aborted for source {}.", checkpointId, operatorName); enumerator.notifyCheckpointAborted(checkpointId); - } catch (Exception e) { - LOG.error("Failing the job due to exception when notifying abortion of the " - + "checkpoint {} for source {}.", checkpointId, operatorName, e); - context.failJob(e); - } - }); + }, + "calling notifyCheckpointAborted()" + ); } @Override @@ -245,6 +236,28 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements } } + private void runInEventLoop( + final ThrowingRunnable<Throwable> action, + final String actionName, + final Object... actionNameFormatParameters) { + + ensureStarted(); + coordinatorExecutor.execute(() -> { + try { + action.run(); + } catch (Throwable t) { + // if we have a JVM critical error, promote it immediately, there is a good chance the + // logging or job failing will not succeed any more + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + + final String actionString = String.format(actionName, actionNameFormatParameters); + LOG.error("Uncaught exception in the SplitEnumerator for Source {} while {}. Triggering job failover.", + operatorName, actionString, t); + context.failJob(t); + } + }); + } + // --------------------------------------------------- @VisibleForTesting SplitEnumerator<SplitT, EnumChkT> getEnumerator() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 9199eeb..6d7d860 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -34,7 +34,12 @@ import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ThrowableCatchingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -46,6 +51,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -81,6 +87,9 @@ import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerde @Internal public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT>, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class); + private final ExecutorService coordinatorExecutor; private final ExecutorNotifier notifier; private final OperatorCoordinator.Context operatorCoordinatorContext; @@ -115,9 +124,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> this.registeredReaders = new ConcurrentHashMap<>(); this.assignmentTracker = splitAssignmentTracker; this.coordinatorThreadName = coordinatorThreadFactory.getCoordinatorThreadName(); + + final Executor errorHandlingCoordinatorExecutor = (runnable) -> + coordinatorExecutor.execute(new ThrowableCatchingRunnable(this::handleUncaughtExceptionFromAsyncCall, runnable)); + this.notifier = new ExecutorNotifier( Executors.newScheduledThreadPool(numWorkerThreads, new ExecutorThreadFactory(coordinatorThreadName + "-worker")), - coordinatorExecutor); + errorHandlingCoordinatorExecutor); } @Override @@ -227,6 +240,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> operatorCoordinatorContext.failJob(cause); } + void handleUncaughtExceptionFromAsyncCall(Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error("Exception while handling result from async call in {}. Triggering job failover.", + coordinatorThreadName, t); + failJob(t); + } + /** * Take a snapshot of this SourceCoordinatorContext. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index f13aa4e..e019359 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -18,12 +18,14 @@ package org.apache.flink.runtime.source.coordinator; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; +import org.apache.flink.runtime.util.FatalExitExceptionHandler; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -63,10 +65,10 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre } @Override - public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception { + public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { final String coordinatorThreadName = "SourceCoordinator-" + operatorName; CoordinatorExecutorThreadFactory coordinatorThreadFactory = - new CoordinatorExecutorThreadFactory(coordinatorThreadName, context, context.getUserCodeClassloader()); + new CoordinatorExecutorThreadFactory(coordinatorThreadName, context.getUserCodeClassloader()); ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer(); @@ -80,30 +82,40 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre * A thread factory class that provides some helper methods. */ public static class CoordinatorExecutorThreadFactory implements ThreadFactory { + private final String coordinatorThreadName; - private final OperatorCoordinator.Context context; private final ClassLoader cl; + private final Thread.UncaughtExceptionHandler errorHandler; + private Thread t; CoordinatorExecutorThreadFactory( final String coordinatorThreadName, - final OperatorCoordinator.Context context, final ClassLoader contextClassLoader) { + this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE); + } + + @VisibleForTesting + CoordinatorExecutorThreadFactory( + final String coordinatorThreadName, + final ClassLoader contextClassLoader, + final Thread.UncaughtExceptionHandler errorHandler) { this.coordinatorThreadName = coordinatorThreadName; - this.context = context; - this.t = null; this.cl = contextClassLoader; + this.errorHandler = errorHandler; } @Override - public Thread newThread(Runnable r) { + public synchronized Thread newThread(Runnable r) { if (t != null) { - throw new IllegalStateException("Should never happen. This factory should only be used by a " + - "SingleThreadExecutor."); + throw new Error( + "This indicates that a fatal error has happened and caused the " + + "coordinator executor thread to exit. Check the earlier logs" + + "to see the root cause of the problem."); } t = new Thread(r, coordinatorThreadName); t.setContextClassLoader(cl); - t.setUncaughtExceptionHandler((thread, throwable) -> context.failJob(throwable)); + t.setUncaughtExceptionHandler(errorHandler); return t; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java index e7184c9..c7f769e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java @@ -36,6 +36,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte private final Map<Integer, List<OperatorEvent>> eventsToOperator; private boolean jobFailed; + private Throwable jobFailureReason; public MockOperatorCoordinatorContext(OperatorID operatorID, int numSubtasks) { this(operatorID, numSubtasks, true); @@ -61,6 +62,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte this.numSubtasks = numSubtasks; this.eventsToOperator = new HashMap<>(); this.jobFailed = false; + this.jobFailureReason = null; this.failEventSending = failEventSending; this.userCodeClassLoader = userCodeClassLoader; } @@ -87,6 +89,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte @Override public void failJob(Throwable cause) { jobFailed = true; + jobFailureReason = cause; } @Override @@ -112,4 +115,8 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte public boolean isJobFailed() { return jobFailed; } + + public Throwable getJobFailureReason() { + return jobFailureReason; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 1228d85..c7a7c9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -151,7 +151,6 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase { SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( TEST_OPERATOR_ID.toHexString(), - operatorCoordinatorContext, getClass().getClassLoader()); try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java index cb8bfd8..dbb6a41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java @@ -45,6 +45,7 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class SourceCoordinatorProviderTest { private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L); + private static final String OPERATOR_NAME = "SourceCoordinatorProviderTest"; private static final int NUM_SPLITS = 10; private SourceCoordinatorProvider<MockSourceSplit> provider; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index d8182d4..bdb8c23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorCheckpointSerializer; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -252,6 +253,50 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { } @Test + public void testFailJobWhenExceptionThrownFromStart() throws Exception { + final RuntimeException failureReason = new RuntimeException("Artificial Exception"); + + final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> splitEnumerator = + new MockSplitEnumerator(1, new MockSplitEnumeratorContext<>(1)) { + @Override + public void start() { + throw failureReason; + } + }; + + final SourceCoordinator<?, ?> coordinator = new SourceCoordinator<>( + OPERATOR_NAME, coordinatorExecutor, new EnumeratorCreatingSource<>(() -> splitEnumerator), context); + + coordinator.start(); + waitUtil(() -> operatorCoordinatorContext.isJobFailed(), Duration.ofSeconds(10), + "The job should have failed due to the artificial exception."); + assertEquals(failureReason, operatorCoordinatorContext.getJobFailureReason()); + } + + @Test + public void testErrorThrownFromSplitEnumerator() throws Exception { + final Error error = new Error("Test Error"); + + final SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> splitEnumerator = + new MockSplitEnumerator(1, new MockSplitEnumeratorContext<>(1)) { + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + throw error; + } + }; + + final SourceCoordinator<?, ?> coordinator = new SourceCoordinator<>( + OPERATOR_NAME, coordinatorExecutor, new EnumeratorCreatingSource<>(() -> splitEnumerator), context); + + coordinator.start(); + coordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() {})); + + waitUtil(() -> operatorCoordinatorContext.isJobFailed(), Duration.ofSeconds(10), + "The job should have failed due to the artificial exception."); + assertEquals(error, operatorCoordinatorContext.getJobFailureReason()); + } + + @Test public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception { final ClassLoader testClassLoader = new URLClassLoader(new URL[0]); final OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), testClassLoader); @@ -308,13 +353,11 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { } private static byte[] createEmptyCheckpoint(long checkpointId) throws Exception { - final OperatorCoordinator.Context opContext = new MockOperatorCoordinatorContext(new OperatorID(), 0); - try (SourceCoordinatorContext<MockSourceSplit> emptyContext = new SourceCoordinatorContext<>( Executors.newDirectExecutorService(), - new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory("test", opContext, SourceCoordinatorProviderTest.class.getClassLoader()), + new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory("test", SourceCoordinatorProviderTest.class.getClassLoader()), 1, - opContext, + new MockOperatorCoordinatorContext(new OperatorID(), 0), new MockSourceSplitSerializer())) { return SourceCoordinator.writeCheckpointBytes( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index 87ae8c9..55e680c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -61,7 +61,6 @@ public abstract class SourceCoordinatorTestBase { SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory = new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory( coordinatorThreadName, - operatorCoordinatorContext, getClass().getClassLoader()); coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
