This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ddd456712d9e964d3c6759482d061076c8d6075 Author: Stephan Ewen <se...@apache.org> AuthorDate: Sat Apr 10 17:38:36 2021 +0200 [FLINK-21996][refactor] Unify exception handling for Operator Coordinator Events sent to not-running tasks Sending an event to a not-running task sometimes throws an exception directly from the method (if the event is immediately sent) and sometimes completes the resulting future with an exception (for example if the event had to be enqueued until after checkpoint barrier injection to preserve exactly-once sematics). This changes the code to always report those exceptions through the result future and never through direct exception throwing, to simplify and unify the way this can be handled by the calling code. --- .../coordination/OperatorCoordinator.java | 3 +-- .../coordination/OperatorCoordinatorHolder.java | 8 ++++++- .../RecreateOnResetOperatorCoordinator.java | 2 +- .../coordinator/SourceCoordinatorContext.java | 28 ++++------------------ .../CoordinatorEventsExactlyOnceITCase.java | 14 +++++------ .../MockOperatorCoordinatorContext.java | 3 +-- 6 files changed, 20 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index b71fad4..4f50c8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -202,8 +202,7 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * target TaskManager. The future is completed exceptionally if the event cannot be sent. * That includes situations where the target task is not running. */ - CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) - throws TaskNotRunningException; + CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask); /** * Fails the job and trigger a global failover operation. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 83159d2..d09a4a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -464,7 +465,12 @@ public class OperatorCoordinatorHolder throw new FlinkRuntimeException("Cannot serialize operator event", e); } - return eventValve.sendEvent(serializedEvent, targetSubtask); + try { + return eventValve.sendEvent(serializedEvent, targetSubtask); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + return FutureUtils.completedExceptionally(t); + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 9685083..5b12388 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -212,7 +212,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { @Override public synchronized CompletableFuture<Acknowledge> sendEvent( - OperatorEvent evt, int targetSubtask) throws TaskNotRunningException { + OperatorEvent evt, int targetSubtask) { // Do not enter the sending procedure if the context has been quiesced. if (quiesced) { return CompletableFuture.completedFuture(Acknowledge.get()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 6789832..1c05321 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -30,7 +30,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; @@ -147,16 +146,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> public void sendEventToSourceReader(int subtaskId, SourceEvent event) { callInCoordinatorThread( () -> { - try { - operatorCoordinatorContext.sendEvent( - new SourceEventWrapper(event), subtaskId); - return null; - } catch (TaskNotRunningException e) { - throw new FlinkRuntimeException( - String.format( - "Failed to send event %s to subtask %d", event, subtaskId), - e); - } + operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId); + return null; }, String.format("Failed to send event %s to subtask %d", event, subtaskId)); } @@ -195,12 +186,6 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> operatorCoordinatorContext.sendEvent( new AddSplitEvent<>(splits, splitSerializer), id); - } catch (TaskNotRunningException e) { - throw new FlinkRuntimeException( - String.format( - "Failed to assign splits %s to reader %d.", - splits, id), - e); } catch (IOException e) { throw new FlinkRuntimeException( "Failed to serialize splits.", e); @@ -216,13 +201,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> // Ensure the split assignment is done by the the coordinator executor. callInCoordinatorThread( () -> { - try { - operatorCoordinatorContext.sendEvent(new NoMoreSplitsEvent(), subtask); - return null; // void return value - } catch (TaskNotRunningException e) { - throw new FlinkRuntimeException( - "Failed to send 'NoMoreSplits' to reader " + subtask, e); - } + operatorCoordinatorContext.sendEvent(new NoMoreSplitsEvent(), subtask); + return null; // void return value }, "Failed to send 'NoMoreSplits' to reader " + subtask); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 968537d..e227278 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -349,15 +349,13 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { if (nextNumber > maxNumber) { return; } - try { - if (nextNumber == maxNumber) { - context.sendEvent(new EndEvent(), 0); - } else { - context.sendEvent(new IntegerEvent(nextNumber), 0); - } - nextNumber++; - } catch (TaskNotRunningException ignored) { + + if (nextNumber == maxNumber) { + context.sendEvent(new EndEvent(), 0); + } else { + context.sendEvent(new IntegerEvent(nextNumber), 0); } + nextNumber++; } private void checkWhetherToTriggerFailure() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java index 4dc71dd..c20915c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java @@ -75,8 +75,7 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte } @Override - public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) - throws TaskNotRunningException { + public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) { eventsToOperator.computeIfAbsent(targetSubtask, subtaskId -> new ArrayList<>()).add(evt); if (failEventSending) { CompletableFuture<Acknowledge> future = new CompletableFuture<>();