[ 
https://issues.apache.org/jira/browse/FLINK-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642960#comment-16642960
 ] 

ASF GitHub Bot commented on FLINK-10513:
----------------------------------------

asfgit closed pull request #6804: [FLINK-10513] Replace 
TaskManagerActions#notifyFinalState with #updateTaskExecutionState
URL: https://github.com/apache/flink/pull/6804
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ae69e561bc7..e2ec4822576 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1552,11 +1552,6 @@ private TaskManagerActionsImpl(JobMasterGateway 
jobMasterGateway) {
                        this.jobMasterGateway = checkNotNull(jobMasterGateway);
                }
 
-               @Override
-               public void notifyFinalState(final ExecutionAttemptID 
executionAttemptID) {
-                       runAsync(() -> 
unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID));
-               }
-
                @Override
                public void notifyFatalError(String message, Throwable cause) {
                        try {
@@ -1574,7 +1569,11 @@ public void failTask(final ExecutionAttemptID 
executionAttemptID, final Throwabl
 
                @Override
                public void updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
-                       
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, 
taskExecutionState);
+                       if 
(taskExecutionState.getExecutionState().isTerminal()) {
+                               runAsync(() -> 
unregisterTaskAndNotifyFinalState(jobMasterGateway, 
taskExecutionState.getID()));
+                       } else {
+                               
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, 
taskExecutionState);
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
deleted file mode 100644
index d729dbbf5ad..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskExecutionStateListener implements 
TaskExecutionStateListener {
-
-       private final ActorGateway actorGateway;
-
-       public ActorGatewayTaskExecutionStateListener(ActorGateway 
actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       @Override
-       public void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState) {
-               TaskMessages.UpdateTaskExecutionState actorMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
-               actorGateway.tell(actorMessage);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
index b3a0cbbac7a..1731fcc4ef2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
@@ -35,11 +35,6 @@ public ActorGatewayTaskManagerActions(ActorGateway 
actorGateway) {
                this.actorGateway = Preconditions.checkNotNull(actorGateway);
        }
 
-       @Override
-       public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
-               actorGateway.tell(new 
TaskMessages.TaskInFinalState(executionAttemptID));
-       }
-
        @Override
        public void notifyFatalError(String message, Throwable cause) {
                actorGateway.tell(new TaskManagerMessages.FatalError(message, 
cause));
@@ -52,8 +47,13 @@ public void failTask(ExecutionAttemptID executionAttemptID, 
Throwable cause) {
 
        @Override
        public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               TaskMessages.UpdateTaskExecutionState actorMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
-               actorGateway.tell(actorMessage);
+               final TaskMessages.TaskMessage taskMessage;
+               if (taskExecutionState.getExecutionState().isTerminal()) {
+                       taskMessage = new 
TaskMessages.TaskInFinalState(taskExecutionState.getID());
+               } else {
+                       taskMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+               }
+
+               actorGateway.tell(taskMessage);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 92ae1676ed6..9abf92f76fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -86,10 +86,8 @@
 import java.net.URL;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -205,9 +203,6 @@
        /** Checkpoint notifier used to communicate with the 
CheckpointCoordinator. */
        private final CheckpointResponder checkpointResponder;
 
-       /** All listener that want to be notified about changes in the task's 
execution state. */
-       private final List<TaskExecutionStateListener> 
taskExecutionStateListeners;
-
        /** The BLOB cache, from which the task can request BLOB files. */
        private final BlobCacheService blobService;
 
@@ -348,7 +343,6 @@ public Task(
                this.network = Preconditions.checkNotNull(networkEnvironment);
                this.taskManagerConfig = 
Preconditions.checkNotNull(taskManagerConfig);
 
-               this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
                this.metrics = metricGroup;
 
                this.partitionProducerStateChecker = 
Preconditions.checkNotNull(partitionProducerStateChecker);
@@ -701,7 +695,6 @@ else if (current == ExecutionState.CANCELING) {
                        }
 
                        // notify everyone that we switched to running
-                       notifyObservers(ExecutionState.RUNNING, null);
                        taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
                        // make sure the user code classloader is accessible 
thread-locally
@@ -729,10 +722,7 @@ else if (current == ExecutionState.CANCELING) {
 
                        // try to mark the task as finished
                        // if that fails, the task was canceled/failed in the 
meantime
-                       if (transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
-                               notifyObservers(ExecutionState.FINISHED, null);
-                       }
-                       else {
+                       if (!transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                                throw new CancelTaskException();
                        }
                }
@@ -772,26 +762,21 @@ else if (current == ExecutionState.CANCELING) {
                                                if (t instanceof 
CancelTaskException) {
                                                        if 
(transitionState(current, ExecutionState.CANCELED)) {
                                                                
cancelInvokable(invokable);
-
-                                                               
notifyObservers(ExecutionState.CANCELED, null);
                                                                break;
                                                        }
                                                }
                                                else {
                                                        if 
(transitionState(current, ExecutionState.FAILED, t)) {
                                                                // proper 
failure of the task. record the exception as the root cause
-                                                               String 
errorMessage = String.format("Execution of %s (%s) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
                                                                
cancelInvokable(invokable);
 
-                                                               
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
                                                        }
                                                }
                                        }
                                        else if (current == 
ExecutionState.CANCELING) {
                                                if (transitionState(current, 
ExecutionState.CANCELED)) {
-                                                       
notifyObservers(ExecutionState.CANCELED, null);
                                                        break;
                                                }
                                        }
@@ -883,7 +868,8 @@ private ClassLoader createUserCodeClassloader() throws 
Exception {
        }
 
        private void notifyFinalState() {
-               taskManagerActions.notifyFinalState(executionId);
+               checkState(executionState.isTerminal());
+               taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, executionState, failureCause));
        }
 
        private void notifyFatalError(String message, Throwable cause) {
@@ -1010,14 +996,6 @@ private void 
cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwabl
                                        // if we manage this state transition, 
then the invokable gets never called
                                        // we need not call cancel on it
                                        this.failureCause = cause;
-                                       notifyObservers(
-                                               targetState,
-                                               new Exception(
-                                                       String.format(
-                                                               "Cancel or fail 
execution of %s (%s).",
-                                                               
taskNameWithSubtask,
-                                                               executionId),
-                                                       cause));
                                        return;
                                }
                        }
@@ -1031,14 +1009,6 @@ else if (current == ExecutionState.RUNNING) {
 
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
-                                               notifyObservers(
-                                                       targetState,
-                                                       new Exception(
-                                                               String.format(
-                                                                       "Cancel 
or fail execution of %s (%s).",
-                                                                       
taskNameWithSubtask,
-                                                                       
executionId),
-                                                               cause));
 
                                                LOG.info("Triggering 
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
@@ -1111,22 +1081,6 @@ else if (current == ExecutionState.RUNNING) {
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  State Listeners
-       // 
------------------------------------------------------------------------
-
-       public void registerExecutionListener(TaskExecutionStateListener 
listener) {
-               taskExecutionStateListeners.add(listener);
-       }
-
-       private void notifyObservers(ExecutionState newState, Throwable error) {
-               TaskExecutionState stateUpdate = new TaskExecutionState(jobId, 
executionId, newState, error);
-
-               for (TaskExecutionStateListener listener : 
taskExecutionStateListeners) {
-                       listener.notifyTaskExecutionStateChanged(stateUpdate);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Partition State Listeners
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
index 2f3a0cb558d..09242c2bd2f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
@@ -25,13 +25,6 @@
  */
 public interface TaskManagerActions {
 
-       /**
-        * Notifies the task manager that the given task is in a final state.
-        *
-        * @param executionAttemptID Execution attempt ID of the task
-        */
-       void notifyFinalState(ExecutionAttemptID executionAttemptID);
-
        /**
         * Notifies the task manager about a fatal error occurred in the task.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
similarity index 65%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
index 9fa9c9025e5..ea302c06f5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
@@ -18,12 +18,19 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-public interface TaskExecutionStateListener {
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
-       /**
-        * Called whenever the task's execution state changes
-        *
-        * @param taskExecutionState describing the task execution state change
-        */
-       void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState);
+/**
+ * Dummy implementation of {@link TaskManagerActions}.
+ */
+public class NoOpTaskManagerActions implements TaskManagerActions {
+
+       @Override
+       public void notifyFatalError(String message, Throwable cause) {}
+
+       @Override
+       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {}
+
+       @Override
+       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2da2ba49265..a397b6ca915 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -121,6 +121,9 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the legacy {@link TaskManager}.
+ */
 @SuppressWarnings("serial")
 public class TaskManagerTest extends TestLogger {
 
@@ -133,7 +136,7 @@
 
        private static ActorSystem system;
 
-       final static UUID leaderSessionID = UUID.randomUUID();
+       static final UUID LEADER_SESSION_ID = UUID.randomUUID();
 
        private TestingHighAvailabilityServices highAvailabilityServices;
 
@@ -215,7 +218,6 @@ protected void run() {
                                        
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                        new ArrayList<PermanentBlobKey>(), 
Collections.emptyList(), 0);
 
-
                                new Within(d) {
 
                                        @Override
@@ -261,8 +263,6 @@ else if (!(message instanceof 
TaskManagerMessages.Heartbeat)) {
                                                                
fail("Unexpected message: " + message);
                                                        }
                                                } while 
(System.currentTimeMillis() < deadline);
-
-
                                        }
                                };
                        }
@@ -283,11 +283,11 @@ public void testJobSubmissionAndCanceling() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -404,7 +404,7 @@ protected void run() {
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -424,11 +424,11 @@ public void testJobSubmissionAndStop() throws Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -554,11 +554,11 @@ public void testGateChannelEdgeMismatch() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
-                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -651,7 +651,7 @@ public void testRunJobWithForwardChannel() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
                        try {
                                final JobID jid = new JobID();
 
@@ -661,8 +661,8 @@ public void testRunJobWithForwardChannel() {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(leaderSessionID)));
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               ActorRef jm = system.actorOf(Props.create(new 
SimpleLookupJobManagerCreator(LEADER_SESSION_ID)));
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -793,7 +793,7 @@ public void testCancellingDependentAndStateUpdateFails() {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
                        try {
                                final JobID jid = new JobID();
 
@@ -806,12 +806,12 @@ public void testCancellingDependentAndStateUpdateFails() {
                                ActorRef jm = system.actorOf(
                                                Props.create(
                                                                new 
SimpleLookupFailingUpdateJobManagerCreator(
-                                                                               
leaderSessionID,
+                                                                       
LEADER_SESSION_ID,
                                                                                
eid2)
                                                )
                                );
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -911,14 +911,14 @@ protected void run() {
 
                                                        assertEquals(0, 
tasks.size());
                                                }
-                                               catch(Exception e) {
+                                               catch (Exception e) {
                                                        e.printStackTrace();
                                                        fail(e.getMessage());
                                                }
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -943,16 +943,16 @@ public void testRemotePartitionNotFound() throws 
Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
                                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1017,12 +1017,12 @@ protected void run() {
                                                // The task should fail after 
repeated requests
                                                
assertEquals(ExecutionState.FAILED, msg.getExecutionState());
                                                Throwable t = 
msg.getError(ClassLoader.getSystemClassLoader());
-                                               assertEquals("Thrown exception 
was not a PartitionNotFoundException: " + t.getMessage(), 
+                                               assertEquals("Thrown exception 
was not a PartitionNotFoundException: " + t.getMessage(),
                                                        
PartitionNotFoundException.class, t.getClass());
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -1065,16 +1065,16 @@ public void testLocalPartitionNotFound() throws 
Exception {
 
                        final ActorGateway testActorGateway = new 
AkkaActorGateway(
                                        getTestActor(),
-                                       leaderSessionID);
+                               LEADER_SESSION_ID);
 
                        try {
                                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                               new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                               new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                
highAvailabilityServices.setJobMasterLeaderRetriever(
                                        HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1143,7 +1143,7 @@ protected void run() {
                                        }
                                };
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
                        }
@@ -1167,9 +1167,9 @@ public void testLogNotFoundHandling() throws Exception {
 
                                // Create the JM
                                ActorRef jm = system.actorOf(Props.create(
-                                       new 
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+                                       new 
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID, 
getTestActor())));
 
-                               jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
+                               jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                                final int dataPort = 
NetUtils.getAvailablePort();
                                Configuration config = new Configuration();
@@ -1209,7 +1209,8 @@ protected void run() {
                                TestingUtils.stopActor(taskManager);
                                TestingUtils.stopActor(jobManager);
                        }
-               }};}
+               }};
+       }
 
        // 
------------------------------------------------------------------------
        // Stack trace sample
@@ -1555,7 +1556,7 @@ public void testTerminationOnFatalError() {
         * Test that a failing schedule or update consumers call leads to the 
failing of the respective
         * task.
         *
-        * IMPORTANT: We have to make sure that the invokable's cancel method 
is called, because only
+        * <p>IMPORTANT: We have to make sure that the invokable's cancel 
method is called, because only
         * then the future is completed. We do this by not eagerly deploy 
consumer tasks and requiring
         * the invokable to fill one memory segment. The completed memory 
segment will trigger the
         * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
@@ -1592,9 +1593,8 @@ public void testFailingScheduleOrUpdateConsumersMessage() 
throws Exception {
                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                new ArrayList<>(), Collections.emptyList(), 0);
 
-
-                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
leaderSessionID), "jobmanager");
-                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, leaderSessionID);
+                       ActorRef jmActorRef = 
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class, 
LEADER_SESSION_ID), "jobmanager");
+                       ActorGateway jobManager = new 
AkkaActorGateway(jmActorRef, LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1635,8 +1635,8 @@ public void testSubmitTaskFailure() throws Exception {
 
                try {
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1696,8 +1696,8 @@ public void testStopTaskFailure() throws Exception {
                try {
                        final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1764,8 +1764,8 @@ public void testStackTraceSampleFailure() throws 
Exception {
 
                try {
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1813,8 +1813,8 @@ public void testUpdateTaskInputPartitionsFailure() throws 
Exception {
 
                        final ExecutionAttemptID executionAttemptId = new 
ExecutionAttemptID();
 
-                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
-                       jobManager = new AkkaActorGateway(jm, leaderSessionID);
+                       ActorRef jm = 
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+                       jobManager = new AkkaActorGateway(jm, 
LEADER_SESSION_ID);
 
                        highAvailabilityServices.setJobMasterLeaderRetriever(
                                HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1894,7 +1894,7 @@ public void handleMessage(Object message) throws 
Exception {
                                                ),
                                                self);
                        }
-                       else if(message instanceof 
TaskMessages.UpdateTaskExecutionState){
+                       else if (message instanceof 
TaskMessages.UpdateTaskExecutionState){
                                getSender().tell(true, getSelf());
                        }
                }
@@ -1958,7 +1958,7 @@ public void handleMessage(Object message) throws 
Exception{
                                TaskMessages.UpdateTaskExecutionState updateMsg 
=
                                                
(TaskMessages.UpdateTaskExecutionState) message;
 
-                               
if(validIDs.contains(updateMsg.taskExecutionState().getID())) {
+                               if 
(validIDs.contains(updateMsg.taskExecutionState().getID())) {
                                        getSender().tell(true, getSelf());
                                } else {
                                        getSender().tell(false, getSelf());
@@ -2021,7 +2021,7 @@ public SimpleLookupFailingUpdateJobManagerCreator(UUID 
leaderSessionID, Executio
 
                        validIDs = new HashSet<ExecutionAttemptID>();
 
-                       for(ExecutionAttemptID id : ids) {
+                       for (ExecutionAttemptID id : ids) {
                                this.validIDs.add(id);
                        }
                }
@@ -2049,9 +2049,9 @@ public SimplePartitionStateLookupJobManager create() 
throws Exception {
                        return new 
SimplePartitionStateLookupJobManager(leaderSessionID, testActor);
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static final class TestInvokableCorrect extends 
AbstractInvokable {
 
                public TestInvokableCorrect(Environment environment) {
@@ -2061,7 +2061,7 @@ public TestInvokableCorrect(Environment environment) {
                @Override
                public void invoke() {}
        }
-       
+
        public static class TestInvokableBlockingCancelable extends 
AbstractInvokable {
 
                public TestInvokableBlockingCancelable(Environment environment) 
{
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3dfcfb3b059..bb5cd17193a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -113,14 +113,12 @@
 
        private ActorGateway taskManagerGateway;
        private ActorGateway jobManagerGateway;
-       private ActorGateway listenerGateway;
 
-       private ActorGatewayTaskExecutionStateListener listener;
        private ActorGatewayTaskManagerActions taskManagerConnection;
 
        private BlockingQueue<Object> taskManagerMessages;
        private BlockingQueue<Object> jobManagerMessages;
-       private BlockingQueue<Object> listenerMessages;
+       private BlockingQueue<TaskExecutionState> listenerMessages;
 
        @Before
        public void createQueuesAndActors() {
@@ -129,10 +127,14 @@ public void createQueuesAndActors() {
                listenerMessages = new LinkedBlockingQueue<>();
                taskManagerGateway = new 
ForwardingActorGateway(taskManagerMessages);
                jobManagerGateway = new 
ForwardingActorGateway(jobManagerMessages);
-               listenerGateway = new ForwardingActorGateway(listenerMessages);
 
-               listener = new 
ActorGatewayTaskExecutionStateListener(listenerGateway);
-               taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway);
+               taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway) {
+                       @Override
+                       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+                               
super.updateTaskExecutionState(taskExecutionState);
+                               listenerMessages.add(taskExecutionState);
+                       }
+               };
 
                awaitLatch = new OneShotLatch();
                triggerLatch = new OneShotLatch();
@@ -147,7 +149,6 @@ public void clearActorsAndMessages() {
 
                taskManagerGateway = null;
                jobManagerGateway = null;
-               listenerGateway = null;
        }
 
        // 
------------------------------------------------------------------------
@@ -164,8 +165,6 @@ public void testRegularExecution() {
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
 
-                       task.registerExecutionListener(listener);
-
                        // go into the run method. we should switch to 
DEPLOYING, RUNNING, then
                        // FINISHED, and all should be good
                        task.run();
@@ -244,8 +243,6 @@ public void testLibraryCacheRegistrationFailed() {
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
 
-                       task.registerExecutionListener(listener);
-
                        // should fail
                        task.run();
 
@@ -292,8 +289,6 @@ public void testExecutionFailsInNetworkRegistration() {
 
                        Task task = createTask(TestInvokableCorrect.class, 
blobService, libCache, network, consumableNotifier, 
partitionProducerStateChecker, executor);
 
-                       task.registerExecutionListener(listener);
-
                        task.run();
 
                        assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
@@ -313,7 +308,6 @@ public void testExecutionFailsInNetworkRegistration() {
        public void testInvokableInstantiationFailed() {
                try {
                        Task task = createTask(InvokableNonInstantiable.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -334,7 +328,6 @@ public void testInvokableInstantiationFailed() {
        public void testExecutionFailsInInvoke() {
                try {
                        Task task = 
createTask(InvokableWithExceptionInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -360,7 +353,6 @@ public void testExecutionFailsInInvoke() {
        public void testFailWithWrappedException() {
                try {
                        Task task = 
createTask(FailingInvokableWithChainedException.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -386,7 +378,6 @@ public void testFailWithWrappedException() {
        public void testCancelDuringInvoke() {
                try {
                        Task task = createTask(InvokableBlockingInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -408,7 +399,7 @@ public void testCancelDuringInvoke() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateCancelingAndCanceledListenerMessage(task);
+                       validateListenerMessage(ExecutionState.CANCELED, task, 
false);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -420,7 +411,6 @@ public void testCancelDuringInvoke() {
        public void testFailExternallyDuringInvoke() {
                try {
                        Task task = createTask(InvokableBlockingInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -429,7 +419,7 @@ public void testFailExternallyDuringInvoke() {
                        awaitLatch.await();
 
                        task.failExternally(new Exception("test"));
-                       assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
+                       assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
 
                        task.getExecutingThread().join();
 
@@ -453,7 +443,6 @@ public void testFailExternallyDuringInvoke() {
        public void testCanceledAfterExecutionFailedInInvoke() {
                try {
                        Task task = 
createTask(InvokableWithExceptionInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -477,10 +466,9 @@ public void testCanceledAfterExecutionFailedInInvoke() {
        }
 
        @Test
-       public void testExecutionFailesAfterCanceling() {
+       public void testExecutionFailsAfterCanceling() {
                try {
                        Task task = 
createTask(InvokableWithExceptionOnTrigger.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -505,7 +493,7 @@ public void testExecutionFailesAfterCanceling() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateCancelingAndCanceledListenerMessage(task);
+                       validateListenerMessage(ExecutionState.CANCELED, task, 
false);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -517,7 +505,6 @@ public void testExecutionFailesAfterCanceling() {
        public void testExecutionFailsAfterTaskMarkedFailed() {
                try {
                        Task task = 
createTask(InvokableWithExceptionOnTrigger.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -1099,11 +1086,8 @@ private void validateListenerMessage(ExecutionState 
state, Task task, boolean ha
                try {
                        // we may have to wait for a bit to give the actors 
time to receive the message
                        // and put it into the queue
-                       TaskMessages.UpdateTaskExecutionState message =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-                       assertNotNull("There is no additional listener 
message", message);
-
-                       TaskExecutionState taskState =  
message.taskExecutionState();
+                       final TaskExecutionState taskState = 
listenerMessages.take();
+                       assertNotNull("There is no additional listener 
message", state);
 
                        assertEquals(task.getJobID(), taskState.getJobID());
                        assertEquals(task.getExecutionId(), taskState.getID());
@@ -1120,45 +1104,6 @@ private void validateListenerMessage(ExecutionState 
state, Task task, boolean ha
                }
        }
 
-       private void validateCancelingAndCanceledListenerMessage(Task task) {
-               try {
-                       // we may have to wait for a bit to give the actors 
time to receive the message
-                       // and put it into the queue
-                       TaskMessages.UpdateTaskExecutionState message1 =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-                       TaskMessages.UpdateTaskExecutionState message2 =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-
-                       assertNotNull("There is no additional listener 
message", message1);
-                       assertNotNull("There is no additional listener 
message", message2);
-
-                       TaskExecutionState taskState1 =  
message1.taskExecutionState();
-                       TaskExecutionState taskState2 =  
message2.taskExecutionState();
-
-                       assertEquals(task.getJobID(), taskState1.getJobID());
-                       assertEquals(task.getJobID(), taskState2.getJobID());
-                       assertEquals(task.getExecutionId(), taskState1.getID());
-                       assertEquals(task.getExecutionId(), taskState2.getID());
-
-                       ExecutionState state1 = taskState1.getExecutionState();
-                       ExecutionState state2 = taskState2.getExecutionState();
-
-                       // it may be (very rarely) that the following race 
happens:
-                       //  - OUTSIDE THREAD: call to cancel()
-                       //  - OUTSIDE THREAD: atomic state change from running 
to canceling
-                       //  - TASK THREAD: finishes, atomic change from 
canceling to canceled
-                       //  - TASK THREAD: send notification that state is 
canceled
-                       //  - OUTSIDE THREAD: send notification that state is 
canceling
-
-                       // for that reason, we allow the notification messages 
in any order.
-                       assertTrue((state1 == ExecutionState.CANCELING && 
state2 == ExecutionState.CANCELED) ||
-                                               (state2 == 
ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
-               }
-               catch (InterruptedException e) {
-                       fail("interrupted");
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Mock invokable code
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ea4bf955c58..c826e774ec2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -62,6 +62,7 @@
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,21 +252,6 @@ public void invoke() throws Exception {
                        }
                }
 
-               private static final class NoOpTaskManagerActions implements 
TaskManagerActions {
-
-                       @Override
-                       public void notifyFinalState(ExecutionAttemptID 
executionAttemptID) {}
-
-                       @Override
-                       public void notifyFatalError(String message, Throwable 
cause) {}
-
-                       @Override
-                       public void failTask(ExecutionAttemptID 
executionAttemptID, Throwable cause) {}
-
-                       @Override
-                       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {}
-               }
-
                private static final class NoOpInputSplitProvider implements 
InputSplitProvider {
 
                        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a9dda4694e..72a8c415f27 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,9 +21,7 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -86,9 +84,9 @@
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.DirectExecutorService;
@@ -108,7 +106,6 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CloseableIterable;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -126,19 +123,15 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +153,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
@@ -181,43 +175,26 @@
         */
        @Test
        public void testEarlyCanceling() throws Exception {
-               Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
-               StreamConfig cfg = new StreamConfig(new Configuration());
+               final StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setOperatorID(new OperatorID(4711L, 42L));
                cfg.setStreamOperator(new SlowlyDeserializingOperator());
                cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration());
+               final TaskManagerActions taskManagerActions = spy(new 
NoOpTaskManagerActions());
+               final Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration(), taskManagerActions);
 
-               TestingExecutionStateListener testingExecutionStateListener = 
new TestingExecutionStateListener();
+               final TaskExecutionState state = new TaskExecutionState(
+                       task.getJobID(), task.getExecutionId(), 
ExecutionState.RUNNING);
 
-               task.registerExecutionListener(testingExecutionStateListener);
                task.startTaskThread();
 
-               Future<ExecutionState> running = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
-
-               // wait until the task thread reached state RUNNING
-               ExecutionState executionState = 
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-               // make sure the task is really running
-               if (executionState != ExecutionState.RUNNING) {
-                       fail("Task entered state " + task.getExecutionState() + 
" with error "
-                                       + 
ExceptionUtils.stringifyException(task.getFailureCause()));
-               }
+               verify(taskManagerActions, 
timeout(2000L)).updateTaskExecutionState(eq(state));
 
                // send a cancel. because the operator takes a long time to 
deserialize, this should
                // hit the task before the operator is deserialized
                task.cancelExecution();
 
-               Future<ExecutionState> canceling = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
-
-               executionState = canceling.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-               // the task should reach state canceled eventually
-               assertTrue(executionState == ExecutionState.CANCELING ||
-                               executionState == ExecutionState.CANCELED);
-
-               task.getExecutingThread().join(deadline.timeLeft().toMillis());
+               task.getExecutingThread().join();
 
                assertFalse("Task did not cancel", 
task.getExecutingThread().isAlive());
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
@@ -885,50 +862,27 @@ public void close() throws Exception {
                }
        }
 
-       private static class TestingExecutionStateListener implements 
TaskExecutionStateListener {
-
-               private ExecutionState executionState = null;
-
-               private final PriorityQueue<Tuple2<ExecutionState, 
CompletableFuture<ExecutionState>>> priorityQueue =
-                       new PriorityQueue<>(1, Comparator.comparingInt(o -> 
o.f0.ordinal()));
-
-               Future<ExecutionState> notifyWhenExecutionState(ExecutionState 
executionState) {
-                       synchronized (priorityQueue) {
-                               if (this.executionState != null && 
this.executionState.ordinal() >= executionState.ordinal()) {
-                                       return 
CompletableFuture.completedFuture(executionState);
-                               } else {
-                                       CompletableFuture<ExecutionState> 
promise = new CompletableFuture<>();
-                                       
priorityQueue.offer(Tuple2.of(executionState, promise));
-                                       return promise;
-                               }
-                       }
-               }
-
-               @Override
-               public void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState) {
-                       synchronized (priorityQueue) {
-                               this.executionState = 
taskExecutionState.getExecutionState();
-
-                               while (!priorityQueue.isEmpty() && 
priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
-                                       CompletableFuture<ExecutionState> 
promise = priorityQueue.poll().f1;
-                                       promise.complete(executionState);
-                               }
-                       }
-               }
+       public static Task createTask(
+               Class<? extends AbstractInvokable> invokable,
+               StreamConfig taskConfig,
+               Configuration taskManagerConfig) throws Exception {
+               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager(), mock(TaskManagerActions.class));
        }
 
        public static Task createTask(
                Class<? extends AbstractInvokable> invokable,
                StreamConfig taskConfig,
-               Configuration taskManagerConfig) throws Exception {
-               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager());
+               Configuration taskManagerConfig,
+               TaskManagerActions taskManagerActions) throws Exception {
+               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager(), taskManagerActions);
        }
 
        public static Task createTask(
                        Class<? extends AbstractInvokable> invokable,
                        StreamConfig taskConfig,
                        Configuration taskManagerConfig,
-                       TestTaskStateManager taskStateManager) throws Exception 
{
+                       TestTaskStateManager taskStateManager,
+                       TaskManagerActions taskManagerActions) throws Exception 
{
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
@@ -980,7 +934,7 @@ public static Task createTask(
                        network,
                        mock(BroadcastVariableManager.class),
                        taskStateManager,
-                       mock(TaskManagerActions.class),
+                       taskManagerActions,
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
                        blobService,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Replace TaskManagerActions#notifyFinalState with #updateTaskExecutionState
> --------------------------------------------------------------------------
>
>                 Key: FLINK-10513
>                 URL: https://issues.apache.org/jira/browse/FLINK-10513
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> We can simplify the {{TaskManagerActions}} interface by replacing calls to 
> the {{notifyFinalState}} method by {{updateTaskExecutionState}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to