[FLINK-3896] Allow a StreamTask to be Externally Cancelled It adds a method failExternally() to the StreamTask, so that custom Operators can make their containing task fail when needed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c Branch: refs/heads/master Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85 Parents: fdf4360 Author: kl0u <kklou...@gmail.com> Authored: Tue May 10 18:56:58 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Jun 14 18:11:22 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/execution/Environment.java | 11 +++++++++++ .../runtime/taskmanager/RuntimeEnvironment.java | 9 +++++++++ .../org/apache/flink/runtime/taskmanager/Task.java | 4 ++-- .../operators/testutils/DummyEnvironment.java | 5 +++++ .../runtime/operators/testutils/MockEnvironment.java | 5 +++++ .../flink/streaming/runtime/tasks/StreamTask.java | 15 ++++++++++++++- .../runtime/tasks/StreamMockEnvironment.java | 5 +++++ 7 files changed, 51 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 121936c..9f779ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -166,6 +166,17 @@ public interface Environment { */ void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state); + /** + * Marks task execution failed for an external reason (a reason other than the task code itself + * throwing an exception). If the task is already in a terminal state + * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. + * Otherwise it sets the state to FAILED, and, if the invokable code is running, + * starts an asynchronous thread that aborts that code. + * + * <p>This method never blocks.</p> + */ + void failExternally(Throwable cause); + // -------------------------------------------------------------------------------------------- // Fields relevant to the I/O system. Should go into Task // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 1f93a0d..80c5fbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment { private final TaskManagerRuntimeInfo taskManagerInfo; private final TaskMetricGroup metrics; + private final Task containingTask; + // ------------------------------------------------------------------------ public RuntimeEnvironment( @@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment { InputGate[] inputGates, ActorGateway jobManager, TaskManagerRuntimeInfo taskManagerInfo, + Task containingTask, TaskMetricGroup metrics) { this.jobId = checkNotNull(jobId); @@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment { this.inputGates = checkNotNull(inputGates); this.jobManager = checkNotNull(jobManager); this.taskManagerInfo = checkNotNull(taskManagerInfo); + this.containingTask = containingTask; this.metrics = metrics; } @@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment { jobManager.tell(message); } + + @Override + public void failExternally(Throwable cause) { + this.containingTask.failExternally(cause); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1f766e1..c1cbaa6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -526,7 +526,7 @@ public class Task implements Runnable { userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, accumulatorRegistry, splitProvider, distributedCacheEntries, - writers, inputGates, jobManager, taskManagerConfig, metrics); + writers, inputGates, jobManager, taskManagerConfig, this, metrics); // let the task code create its readers and writers invokable.setEnvironment(env); @@ -703,7 +703,7 @@ public class Task implements Runnable { LOG.error(message, t); notifyFatalError(message, t); } - + // un-register the metrics at the end so that the task may already be // counted as finished when this happens // errors here will only be logged http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 78fb422..063e295 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment { public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {} @Override + public void failExternally(Throwable cause) { + throw new UnsupportedOperationException("DummyEnvironment does not support external task failure."); + } + + @Override public ResultPartitionWriter getWriter(int index) { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 0220149..78e4cce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -289,4 +289,9 @@ public class MockEnvironment implements Environment { public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) { throw new UnsupportedOperationException(); } + + @Override + public void failExternally(Throwable cause) { + throw new UnsupportedOperationException("MockEnvironment does not support external task failure."); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 51904b3..a771c85 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -295,7 +295,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } } } - + + /** + * Marks task execution failed for an external reason (a reason other than the task code itself + * throwing an exception). If the task is already in a terminal state + * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing. + * Otherwise it sets the state to FAILED, and, if the invokable code is running, + * starts an asynchronous thread that aborts that code. + * + * <p>This method never blocks.</p> + */ + public void failExternally(Throwable cause) { + getEnvironment().failExternally(cause); + } + @Override public final void cancel() throws Exception { isRunning = false; http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index f8c36de..a8dd49b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -302,6 +302,11 @@ public class StreamMockEnvironment implements Environment { } @Override + public void failExternally(Throwable cause) { + throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure."); + } + + @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")); }