[ 
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642961#comment-16642961
 ] 

ASF GitHub Bot commented on FLINK-10386:
----------------------------------------

asfgit closed pull request #6729: [FLINK-10386] [taskmanager] Remove legacy 
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
deleted file mode 100644
index d729dbbf5ad..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskExecutionStateListener implements 
TaskExecutionStateListener {
-
-       private final ActorGateway actorGateway;
-
-       public ActorGatewayTaskExecutionStateListener(ActorGateway 
actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       @Override
-       public void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState) {
-               TaskMessages.UpdateTaskExecutionState actorMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
-               actorGateway.tell(actorMessage);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 92ae1676ed6..66f212f0fee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -86,10 +86,8 @@
 import java.net.URL;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -205,9 +203,6 @@
        /** Checkpoint notifier used to communicate with the 
CheckpointCoordinator. */
        private final CheckpointResponder checkpointResponder;
 
-       /** All listener that want to be notified about changes in the task's 
execution state. */
-       private final List<TaskExecutionStateListener> 
taskExecutionStateListeners;
-
        /** The BLOB cache, from which the task can request BLOB files. */
        private final BlobCacheService blobService;
 
@@ -348,7 +343,6 @@ public Task(
                this.network = Preconditions.checkNotNull(networkEnvironment);
                this.taskManagerConfig = 
Preconditions.checkNotNull(taskManagerConfig);
 
-               this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
                this.metrics = metricGroup;
 
                this.partitionProducerStateChecker = 
Preconditions.checkNotNull(partitionProducerStateChecker);
@@ -701,7 +695,6 @@ else if (current == ExecutionState.CANCELING) {
                        }
 
                        // notify everyone that we switched to running
-                       notifyObservers(ExecutionState.RUNNING, null);
                        taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
                        // make sure the user code classloader is accessible 
thread-locally
@@ -729,10 +722,7 @@ else if (current == ExecutionState.CANCELING) {
 
                        // try to mark the task as finished
                        // if that fails, the task was canceled/failed in the 
meantime
-                       if (transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
-                               notifyObservers(ExecutionState.FINISHED, null);
-                       }
-                       else {
+                       if (!transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                                throw new CancelTaskException();
                        }
                }
@@ -772,26 +762,21 @@ else if (current == ExecutionState.CANCELING) {
                                                if (t instanceof 
CancelTaskException) {
                                                        if 
(transitionState(current, ExecutionState.CANCELED)) {
                                                                
cancelInvokable(invokable);
-
-                                                               
notifyObservers(ExecutionState.CANCELED, null);
                                                                break;
                                                        }
                                                }
                                                else {
                                                        if 
(transitionState(current, ExecutionState.FAILED, t)) {
                                                                // proper 
failure of the task. record the exception as the root cause
-                                                               String 
errorMessage = String.format("Execution of %s (%s) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
                                                                
cancelInvokable(invokable);
 
-                                                               
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
                                                        }
                                                }
                                        }
                                        else if (current == 
ExecutionState.CANCELING) {
                                                if (transitionState(current, 
ExecutionState.CANCELED)) {
-                                                       
notifyObservers(ExecutionState.CANCELED, null);
                                                        break;
                                                }
                                        }
@@ -1010,14 +995,6 @@ private void 
cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwabl
                                        // if we manage this state transition, 
then the invokable gets never called
                                        // we need not call cancel on it
                                        this.failureCause = cause;
-                                       notifyObservers(
-                                               targetState,
-                                               new Exception(
-                                                       String.format(
-                                                               "Cancel or fail 
execution of %s (%s).",
-                                                               
taskNameWithSubtask,
-                                                               executionId),
-                                                       cause));
                                        return;
                                }
                        }
@@ -1031,14 +1008,6 @@ else if (current == ExecutionState.RUNNING) {
 
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
-                                               notifyObservers(
-                                                       targetState,
-                                                       new Exception(
-                                                               String.format(
-                                                                       "Cancel 
or fail execution of %s (%s).",
-                                                                       
taskNameWithSubtask,
-                                                                       
executionId),
-                                                               cause));
 
                                                LOG.info("Triggering 
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
@@ -1111,22 +1080,6 @@ else if (current == ExecutionState.RUNNING) {
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  State Listeners
-       // 
------------------------------------------------------------------------
-
-       public void registerExecutionListener(TaskExecutionStateListener 
listener) {
-               taskExecutionStateListeners.add(listener);
-       }
-
-       private void notifyObservers(ExecutionState newState, Throwable error) {
-               TaskExecutionState stateUpdate = new TaskExecutionState(jobId, 
executionId, newState, error);
-
-               for (TaskExecutionStateListener listener : 
taskExecutionStateListeners) {
-                       listener.notifyTaskExecutionStateChanged(stateUpdate);
-               }
-       }
-
        // 
------------------------------------------------------------------------
        //  Partition State Listeners
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
similarity index 61%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
index 9fa9c9025e5..b39a7441df0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
@@ -18,12 +18,22 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-public interface TaskExecutionStateListener {
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
-       /**
-        * Called whenever the task's execution state changes
-        *
-        * @param taskExecutionState describing the task execution state change
-        */
-       void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState);
+/**
+ * Dummy implementation of {@link TaskManagerActions}.
+ */
+public class NoOpTaskManagerActions implements TaskManagerActions {
+
+       @Override
+       public void notifyFinalState(ExecutionAttemptID executionAttemptID) {}
+
+       @Override
+       public void notifyFatalError(String message, Throwable cause) {}
+
+       @Override
+       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {}
+
+       @Override
+       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {}
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3dfcfb3b059..403d1e29c1d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -113,14 +113,12 @@
 
        private ActorGateway taskManagerGateway;
        private ActorGateway jobManagerGateway;
-       private ActorGateway listenerGateway;
 
-       private ActorGatewayTaskExecutionStateListener listener;
        private ActorGatewayTaskManagerActions taskManagerConnection;
 
        private BlockingQueue<Object> taskManagerMessages;
        private BlockingQueue<Object> jobManagerMessages;
-       private BlockingQueue<Object> listenerMessages;
+       private BlockingQueue<TaskExecutionState> listenerMessages;
 
        @Before
        public void createQueuesAndActors() {
@@ -129,10 +127,14 @@ public void createQueuesAndActors() {
                listenerMessages = new LinkedBlockingQueue<>();
                taskManagerGateway = new 
ForwardingActorGateway(taskManagerMessages);
                jobManagerGateway = new 
ForwardingActorGateway(jobManagerMessages);
-               listenerGateway = new ForwardingActorGateway(listenerMessages);
 
-               listener = new 
ActorGatewayTaskExecutionStateListener(listenerGateway);
-               taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway);
+               taskManagerConnection = new 
ActorGatewayTaskManagerActions(taskManagerGateway) {
+                       @Override
+                       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+                               
super.updateTaskExecutionState(taskExecutionState);
+                               listenerMessages.add(taskExecutionState);
+                       }
+               };
 
                awaitLatch = new OneShotLatch();
                triggerLatch = new OneShotLatch();
@@ -147,7 +149,6 @@ public void clearActorsAndMessages() {
 
                taskManagerGateway = null;
                jobManagerGateway = null;
-               listenerGateway = null;
        }
 
        // 
------------------------------------------------------------------------
@@ -164,8 +165,6 @@ public void testRegularExecution() {
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
 
-                       task.registerExecutionListener(listener);
-
                        // go into the run method. we should switch to 
DEPLOYING, RUNNING, then
                        // FINISHED, and all should be good
                        task.run();
@@ -178,7 +177,6 @@ public void testRegularExecution() {
 
                        // verify listener messages
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FINISHED, task, 
false);
 
                        // make sure that the TaskManager received an message 
to unregister the task
                        validateTaskManagerStateChange(ExecutionState.RUNNING, 
task, false);
@@ -244,8 +242,6 @@ public void testLibraryCacheRegistrationFailed() {
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
 
-                       task.registerExecutionListener(listener);
-
                        // should fail
                        task.run();
 
@@ -256,9 +252,6 @@ public void testLibraryCacheRegistrationFailed() {
                        assertNotNull(task.getFailureCause().getMessage());
                        
assertTrue(task.getFailureCause().getMessage().contains("classloader"));
 
-                       // verify listener messages
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
-
                        // make sure that the TaskManager received an message 
to unregister the task
                        validateUnregisterTask(task.getExecutionId());
 
@@ -292,8 +285,6 @@ public void testExecutionFailsInNetworkRegistration() {
 
                        Task task = createTask(TestInvokableCorrect.class, 
blobService, libCache, network, consumableNotifier, 
partitionProducerStateChecker, executor);
 
-                       task.registerExecutionListener(listener);
-
                        task.run();
 
                        assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
@@ -301,7 +292,6 @@ public void testExecutionFailsInNetworkRegistration() {
                        
assertTrue(task.getFailureCause().getMessage().contains("buffers"));
 
                        validateUnregisterTask(task.getExecutionId());
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -313,7 +303,6 @@ public void testExecutionFailsInNetworkRegistration() {
        public void testInvokableInstantiationFailed() {
                try {
                        Task task = createTask(InvokableNonInstantiable.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -322,7 +311,6 @@ public void testInvokableInstantiationFailed() {
                        
assertTrue(task.getFailureCause().getMessage().contains("instantiate"));
 
                        validateUnregisterTask(task.getExecutionId());
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -334,7 +322,6 @@ public void testInvokableInstantiationFailed() {
        public void testExecutionFailsInInvoke() {
                try {
                        Task task = 
createTask(InvokableWithExceptionInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -348,7 +335,6 @@ public void testExecutionFailsInInvoke() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -360,7 +346,6 @@ public void testExecutionFailsInInvoke() {
        public void testFailWithWrappedException() {
                try {
                        Task task = 
createTask(FailingInvokableWithChainedException.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -374,7 +359,6 @@ public void testFailWithWrappedException() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -386,7 +370,6 @@ public void testFailWithWrappedException() {
        public void testCancelDuringInvoke() {
                try {
                        Task task = createTask(InvokableBlockingInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -408,7 +391,6 @@ public void testCancelDuringInvoke() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateCancelingAndCanceledListenerMessage(task);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -420,7 +402,6 @@ public void testCancelDuringInvoke() {
        public void testFailExternallyDuringInvoke() {
                try {
                        Task task = createTask(InvokableBlockingInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -429,7 +410,7 @@ public void testFailExternallyDuringInvoke() {
                        awaitLatch.await();
 
                        task.failExternally(new Exception("test"));
-                       assertTrue(task.getExecutionState() == 
ExecutionState.FAILED);
+                       assertEquals(ExecutionState.FAILED, 
task.getExecutionState());
 
                        task.getExecutingThread().join();
 
@@ -441,7 +422,6 @@ public void testFailExternallyDuringInvoke() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -453,7 +433,6 @@ public void testFailExternallyDuringInvoke() {
        public void testCanceledAfterExecutionFailedInInvoke() {
                try {
                        Task task = 
createTask(InvokableWithExceptionInInvoke.class);
-                       task.registerExecutionListener(listener);
 
                        task.run();
 
@@ -468,7 +447,6 @@ public void testCanceledAfterExecutionFailedInInvoke() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -477,10 +455,9 @@ public void testCanceledAfterExecutionFailedInInvoke() {
        }
 
        @Test
-       public void testExecutionFailesAfterCanceling() {
+       public void testExecutionFailsAfterCanceling() {
                try {
                        Task task = 
createTask(InvokableWithExceptionOnTrigger.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -505,7 +482,6 @@ public void testExecutionFailesAfterCanceling() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateCancelingAndCanceledListenerMessage(task);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -517,7 +493,6 @@ public void testExecutionFailesAfterCanceling() {
        public void testExecutionFailsAfterTaskMarkedFailed() {
                try {
                        Task task = 
createTask(InvokableWithExceptionOnTrigger.class);
-                       task.registerExecutionListener(listener);
 
                        // run the task asynchronous
                        task.startTaskThread();
@@ -541,7 +516,6 @@ public void testExecutionFailsAfterTaskMarkedFailed() {
                        validateUnregisterTask(task.getExecutionId());
 
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
-                       validateListenerMessage(ExecutionState.FAILED, task, 
true);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1099,11 +1073,8 @@ private void validateListenerMessage(ExecutionState 
state, Task task, boolean ha
                try {
                        // we may have to wait for a bit to give the actors 
time to receive the message
                        // and put it into the queue
-                       TaskMessages.UpdateTaskExecutionState message =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-                       assertNotNull("There is no additional listener 
message", message);
-
-                       TaskExecutionState taskState =  
message.taskExecutionState();
+                       final TaskExecutionState taskState = 
listenerMessages.take();
+                       assertNotNull("There is no additional listener 
message", state);
 
                        assertEquals(task.getJobID(), taskState.getJobID());
                        assertEquals(task.getExecutionId(), taskState.getID());
@@ -1120,45 +1091,6 @@ private void validateListenerMessage(ExecutionState 
state, Task task, boolean ha
                }
        }
 
-       private void validateCancelingAndCanceledListenerMessage(Task task) {
-               try {
-                       // we may have to wait for a bit to give the actors 
time to receive the message
-                       // and put it into the queue
-                       TaskMessages.UpdateTaskExecutionState message1 =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-                       TaskMessages.UpdateTaskExecutionState message2 =
-                                       (TaskMessages.UpdateTaskExecutionState) 
listenerMessages.take();
-
-                       assertNotNull("There is no additional listener 
message", message1);
-                       assertNotNull("There is no additional listener 
message", message2);
-
-                       TaskExecutionState taskState1 =  
message1.taskExecutionState();
-                       TaskExecutionState taskState2 =  
message2.taskExecutionState();
-
-                       assertEquals(task.getJobID(), taskState1.getJobID());
-                       assertEquals(task.getJobID(), taskState2.getJobID());
-                       assertEquals(task.getExecutionId(), taskState1.getID());
-                       assertEquals(task.getExecutionId(), taskState2.getID());
-
-                       ExecutionState state1 = taskState1.getExecutionState();
-                       ExecutionState state2 = taskState2.getExecutionState();
-
-                       // it may be (very rarely) that the following race 
happens:
-                       //  - OUTSIDE THREAD: call to cancel()
-                       //  - OUTSIDE THREAD: atomic state change from running 
to canceling
-                       //  - TASK THREAD: finishes, atomic change from 
canceling to canceled
-                       //  - TASK THREAD: send notification that state is 
canceled
-                       //  - OUTSIDE THREAD: send notification that state is 
canceling
-
-                       // for that reason, we allow the notification messages 
in any order.
-                       assertTrue((state1 == ExecutionState.CANCELING && 
state2 == ExecutionState.CANCELED) ||
-                                               (state2 == 
ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
-               }
-               catch (InterruptedException e) {
-                       fail("interrupted");
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Mock invokable code
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ea4bf955c58..c826e774ec2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -62,6 +62,7 @@
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,21 +252,6 @@ public void invoke() throws Exception {
                        }
                }
 
-               private static final class NoOpTaskManagerActions implements 
TaskManagerActions {
-
-                       @Override
-                       public void notifyFinalState(ExecutionAttemptID 
executionAttemptID) {}
-
-                       @Override
-                       public void notifyFatalError(String message, Throwable 
cause) {}
-
-                       @Override
-                       public void failTask(ExecutionAttemptID 
executionAttemptID, Throwable cause) {}
-
-                       @Override
-                       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {}
-               }
-
                private static final class NoOpInputSplitProvider implements 
InputSplitProvider {
 
                        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a9dda4694e..23625e86906 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,7 +23,6 @@
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -86,9 +85,9 @@
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.DirectExecutorService;
@@ -108,7 +107,6 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CloseableIterable;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -126,19 +124,15 @@
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +154,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
@@ -181,43 +176,26 @@
         */
        @Test
        public void testEarlyCanceling() throws Exception {
-               Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
-               StreamConfig cfg = new StreamConfig(new Configuration());
+               final StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setOperatorID(new OperatorID(4711L, 42L));
                cfg.setStreamOperator(new SlowlyDeserializingOperator());
                cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-               Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration());
+               final TaskManagerActions taskManagerActions = spy(new 
NoOpTaskManagerActions());
+               final Task task = createTask(SourceStreamTask.class, cfg, new 
Configuration(), taskManagerActions);
 
-               TestingExecutionStateListener testingExecutionStateListener = 
new TestingExecutionStateListener();
+               final TaskExecutionState state = new TaskExecutionState(
+                       task.getJobID(), task.getExecutionId(), 
ExecutionState.RUNNING);
 
-               task.registerExecutionListener(testingExecutionStateListener);
                task.startTaskThread();
 
-               Future<ExecutionState> running = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
-
-               // wait until the task thread reached state RUNNING
-               ExecutionState executionState = 
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-               // make sure the task is really running
-               if (executionState != ExecutionState.RUNNING) {
-                       fail("Task entered state " + task.getExecutionState() + 
" with error "
-                                       + 
ExceptionUtils.stringifyException(task.getFailureCause()));
-               }
+               verify(taskManagerActions, 
timeout(2000L)).updateTaskExecutionState(eq(state));
 
                // send a cancel. because the operator takes a long time to 
deserialize, this should
                // hit the task before the operator is deserialized
                task.cancelExecution();
 
-               Future<ExecutionState> canceling = 
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
-
-               executionState = canceling.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-               // the task should reach state canceled eventually
-               assertTrue(executionState == ExecutionState.CANCELING ||
-                               executionState == ExecutionState.CANCELED);
-
-               task.getExecutingThread().join(deadline.timeLeft().toMillis());
+               task.getExecutingThread().join();
 
                assertFalse("Task did not cancel", 
task.getExecutingThread().isAlive());
                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
@@ -846,6 +824,23 @@ public void 
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
        //  Test Utilities
        // 
------------------------------------------------------------------------
 
+       private static void waitUntilExecutionState(Task task, ExecutionState 
exceptedState, Deadline deadline) {
+               while (deadline.hasTimeLeft()) {
+                       if (exceptedState == task.getExecutionState()) {
+                               return;
+                       }
+
+                       try {
+                               
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
+                       } catch (InterruptedException e) {
+                               // do nothing
+                       }
+               }
+
+               throw new IllegalStateException("Task " + task + " does not 
arrive state " + exceptedState + "in time. "
+                       + "Current state is " + task.getExecutionState());
+       }
+
        /**
         * Operator that does nothing.
         *
@@ -885,50 +880,27 @@ public void close() throws Exception {
                }
        }
 
-       private static class TestingExecutionStateListener implements 
TaskExecutionStateListener {
-
-               private ExecutionState executionState = null;
-
-               private final PriorityQueue<Tuple2<ExecutionState, 
CompletableFuture<ExecutionState>>> priorityQueue =
-                       new PriorityQueue<>(1, Comparator.comparingInt(o -> 
o.f0.ordinal()));
-
-               Future<ExecutionState> notifyWhenExecutionState(ExecutionState 
executionState) {
-                       synchronized (priorityQueue) {
-                               if (this.executionState != null && 
this.executionState.ordinal() >= executionState.ordinal()) {
-                                       return 
CompletableFuture.completedFuture(executionState);
-                               } else {
-                                       CompletableFuture<ExecutionState> 
promise = new CompletableFuture<>();
-                                       
priorityQueue.offer(Tuple2.of(executionState, promise));
-                                       return promise;
-                               }
-                       }
-               }
-
-               @Override
-               public void notifyTaskExecutionStateChanged(TaskExecutionState 
taskExecutionState) {
-                       synchronized (priorityQueue) {
-                               this.executionState = 
taskExecutionState.getExecutionState();
-
-                               while (!priorityQueue.isEmpty() && 
priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
-                                       CompletableFuture<ExecutionState> 
promise = priorityQueue.poll().f1;
-                                       promise.complete(executionState);
-                               }
-                       }
-               }
+       public static Task createTask(
+               Class<? extends AbstractInvokable> invokable,
+               StreamConfig taskConfig,
+               Configuration taskManagerConfig) throws Exception {
+               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager(), mock(TaskManagerActions.class));
        }
 
        public static Task createTask(
                Class<? extends AbstractInvokable> invokable,
                StreamConfig taskConfig,
-               Configuration taskManagerConfig) throws Exception {
-               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager());
+               Configuration taskManagerConfig,
+               TaskManagerActions taskManagerActions) throws Exception {
+               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager(), taskManagerActions);
        }
 
        public static Task createTask(
                        Class<? extends AbstractInvokable> invokable,
                        StreamConfig taskConfig,
                        Configuration taskManagerConfig,
-                       TestTaskStateManager taskStateManager) throws Exception 
{
+                       TestTaskStateManager taskStateManager,
+                       TaskManagerActions taskManagerActions) throws Exception 
{
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
@@ -980,7 +952,7 @@ public static Task createTask(
                        network,
                        mock(BroadcastVariableManager.class),
                        taskStateManager,
-                       mock(TaskManagerActions.class),
+                       taskManagerActions,
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
                        blobService,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy class TaskExecutionStateListener
> ----------------------------------------------
>
>                 Key: FLINK-10386
>                 URL: https://issues.apache.org/jira/browse/FLINK-10386
>             Project: Flink
>          Issue Type: Sub-task
>          Components: TaskManager
>    Affects Versions: 1.7.0
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> After a discussion 
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
>  with [~trohrm...@apache.org]. I start to analyze the usage of 
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any 
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take 
> the role for the communication of {{Task}} with {{TaskManager}}. No one 
> except {{TaskManager}} should directly communicate with {{Task}}. So it can 
> be safely remove legacy class {{TaskExecutionStateListener}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to