[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom Operators
can make their containing task fail when needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c

Branch: refs/heads/master
Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85
Parents: fdf4360
Author: kl0u <kklou...@gmail.com>
Authored: Tue May 10 18:56:58 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/execution/Environment.java  | 11 +++++++++++
 .../runtime/taskmanager/RuntimeEnvironment.java      |  9 +++++++++
 .../org/apache/flink/runtime/taskmanager/Task.java   |  4 ++--
 .../operators/testutils/DummyEnvironment.java        |  5 +++++
 .../runtime/operators/testutils/MockEnvironment.java |  5 +++++
 .../flink/streaming/runtime/tasks/StreamTask.java    | 15 ++++++++++++++-
 .../runtime/tasks/StreamMockEnvironment.java         |  5 +++++
 7 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 121936c..9f779ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -166,6 +166,17 @@ public interface Environment {
         */
        void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
+       /**
+        * Marks task execution failed for an external reason (a reason other 
than the task code itself
+        * throwing an exception). If the task is already in a terminal state
+        * (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.
+        * Otherwise it sets the state to FAILED, and, if the invokable code is 
running,
+        * starts an asynchronous thread that aborts that code.
+        *
+        * <p>This method never blocks.</p>
+        */
+       void failExternally(Throwable cause);
+
        // 
--------------------------------------------------------------------------------------------
        //  Fields relevant to the I/O system. Should go into Task
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 1f93a0d..80c5fbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment {
        private final TaskManagerRuntimeInfo taskManagerInfo;
        private final TaskMetricGroup metrics;
 
+       private final Task containingTask;
+
        // 
------------------------------------------------------------------------
 
        public RuntimeEnvironment(
@@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment {
                        InputGate[] inputGates,
                        ActorGateway jobManager,
                        TaskManagerRuntimeInfo taskManagerInfo,
+                       Task containingTask,
                        TaskMetricGroup metrics) {
 
                this.jobId = checkNotNull(jobId);
@@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment {
                this.inputGates = checkNotNull(inputGates);
                this.jobManager = checkNotNull(jobManager);
                this.taskManagerInfo = checkNotNull(taskManagerInfo);
+               this.containingTask = containingTask;
                this.metrics = metrics;
        }
 
@@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment {
 
                jobManager.tell(message);
        }
+
+       @Override
+       public void failExternally(Throwable cause) {
+               this.containingTask.failExternally(cause);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1f766e1..c1cbaa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
                                        userCodeClassLoader, memoryManager, 
ioManager,
                                        broadcastVariableManager, 
accumulatorRegistry,
                                        splitProvider, distributedCacheEntries,
-                                       writers, inputGates, jobManager, 
taskManagerConfig, metrics);
+                                       writers, inputGates, jobManager, 
taskManagerConfig, this, metrics);
 
                        // let the task code create its readers and writers
                        invokable.setEnvironment(env);
@@ -703,7 +703,7 @@ public class Task implements Runnable {
                                LOG.error(message, t);
                                notifyFatalError(message, t);
                        }
-                       
+
                        // un-register the metrics at the end so that the task 
may already be
                        // counted as finished when this happens
                        // errors here will only be logged

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 78fb422..063e295 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
        public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {}
 
        @Override
+       public void failExternally(Throwable cause) {
+               throw new UnsupportedOperationException("DummyEnvironment does 
not support external task failure.");
+       }
+
+       @Override
        public ResultPartitionWriter getWriter(int index) {
                return null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0220149..78e4cce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -289,4 +289,9 @@ public class MockEnvironment implements Environment {
        public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {
                throw new UnsupportedOperationException();
        }
+
+       @Override
+       public void failExternally(Throwable cause) {
+               throw new UnsupportedOperationException("MockEnvironment does 
not support external task failure.");
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 51904b3..a771c85 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -295,7 +295,20 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        }
                }
        }
-       
+
+       /**
+        * Marks task execution failed for an external reason (a reason other 
than the task code itself
+        * throwing an exception). If the task is already in a terminal state
+        * (such as FINISHED, CANCELED, FAILED), or if the task is already 
canceling this does nothing.
+        * Otherwise it sets the state to FAILED, and, if the invokable code is 
running,
+        * starts an asynchronous thread that aborts that code.
+        *
+        * <p>This method never blocks.</p>
+        */
+       public void failExternally(Throwable cause) {
+               getEnvironment().failExternally(cause);
+       }
+
        @Override
        public final void cancel() throws Exception {
                isRunning = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f8c36de..a8dd49b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,11 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
+       public void failExternally(Throwable cause) {
+               throw new UnsupportedOperationException("StreamMockEnvironment 
does not support external task failure.");
+       }
+
+       @Override
        public TaskManagerRuntimeInfo getTaskManagerInfo() {
                return new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir"));
        }

Reply via email to