[
https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642961#comment-16642961
]
ASF GitHub Bot commented on FLINK-10386:
----------------------------------------
asfgit closed pull request #6729: [FLINK-10386] [taskmanager] Remove legacy
class TaskExecutionStateListener
URL: https://github.com/apache/flink/pull/6729
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
deleted file mode 100644
index d729dbbf5ad..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskExecutionStateListener implements
TaskExecutionStateListener {
-
- private final ActorGateway actorGateway;
-
- public ActorGatewayTaskExecutionStateListener(ActorGateway
actorGateway) {
- this.actorGateway = Preconditions.checkNotNull(actorGateway);
- }
-
- @Override
- public void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState) {
- TaskMessages.UpdateTaskExecutionState actorMessage = new
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
- actorGateway.tell(actorMessage);
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 92ae1676ed6..66f212f0fee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -86,10 +86,8 @@
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -205,9 +203,6 @@
/** Checkpoint notifier used to communicate with the
CheckpointCoordinator. */
private final CheckpointResponder checkpointResponder;
- /** All listener that want to be notified about changes in the task's
execution state. */
- private final List<TaskExecutionStateListener>
taskExecutionStateListeners;
-
/** The BLOB cache, from which the task can request BLOB files. */
private final BlobCacheService blobService;
@@ -348,7 +343,6 @@ public Task(
this.network = Preconditions.checkNotNull(networkEnvironment);
this.taskManagerConfig =
Preconditions.checkNotNull(taskManagerConfig);
- this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
this.metrics = metricGroup;
this.partitionProducerStateChecker =
Preconditions.checkNotNull(partitionProducerStateChecker);
@@ -701,7 +695,6 @@ else if (current == ExecutionState.CANCELING) {
}
// notify everyone that we switched to running
- notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible
thread-locally
@@ -729,10 +722,7 @@ else if (current == ExecutionState.CANCELING) {
// try to mark the task as finished
// if that fails, the task was canceled/failed in the
meantime
- if (transitionState(ExecutionState.RUNNING,
ExecutionState.FINISHED)) {
- notifyObservers(ExecutionState.FINISHED, null);
- }
- else {
+ if (!transitionState(ExecutionState.RUNNING,
ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
}
@@ -772,26 +762,21 @@ else if (current == ExecutionState.CANCELING) {
if (t instanceof
CancelTaskException) {
if
(transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable(invokable);
-
-
notifyObservers(ExecutionState.CANCELED, null);
break;
}
}
else {
if
(transitionState(current, ExecutionState.FAILED, t)) {
// proper
failure of the task. record the exception as the root cause
- String
errorMessage = String.format("Execution of %s (%s) failed.",
taskNameWithSubtask, executionId);
failureCause =
t;
cancelInvokable(invokable);
-
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
break;
}
}
}
else if (current ==
ExecutionState.CANCELING) {
if (transitionState(current,
ExecutionState.CANCELED)) {
-
notifyObservers(ExecutionState.CANCELED, null);
break;
}
}
@@ -1010,14 +995,6 @@ private void
cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwabl
// if we manage this state transition,
then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
- notifyObservers(
- targetState,
- new Exception(
- String.format(
- "Cancel or fail
execution of %s (%s).",
-
taskNameWithSubtask,
- executionId),
- cause));
return;
}
}
@@ -1031,14 +1008,6 @@ else if (current == ExecutionState.RUNNING) {
if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
- notifyObservers(
- targetState,
- new Exception(
- String.format(
- "Cancel
or fail execution of %s (%s).",
-
taskNameWithSubtask,
-
executionId),
- cause));
LOG.info("Triggering
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
@@ -1111,22 +1080,6 @@ else if (current == ExecutionState.RUNNING) {
}
}
- //
------------------------------------------------------------------------
- // State Listeners
- //
------------------------------------------------------------------------
-
- public void registerExecutionListener(TaskExecutionStateListener
listener) {
- taskExecutionStateListeners.add(listener);
- }
-
- private void notifyObservers(ExecutionState newState, Throwable error) {
- TaskExecutionState stateUpdate = new TaskExecutionState(jobId,
executionId, newState, error);
-
- for (TaskExecutionStateListener listener :
taskExecutionStateListeners) {
- listener.notifyTaskExecutionStateChanged(stateUpdate);
- }
- }
-
//
------------------------------------------------------------------------
// Partition State Listeners
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
similarity index 61%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
index 9fa9c9025e5..b39a7441df0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
@@ -18,12 +18,22 @@
package org.apache.flink.runtime.taskmanager;
-public interface TaskExecutionStateListener {
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
- /**
- * Called whenever the task's execution state changes
- *
- * @param taskExecutionState describing the task execution state change
- */
- void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState);
+/**
+ * Dummy implementation of {@link TaskManagerActions}.
+ */
+public class NoOpTaskManagerActions implements TaskManagerActions {
+
+ @Override
+ public void notifyFinalState(ExecutionAttemptID executionAttemptID) {}
+
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {}
+
+ @Override
+ public void failTask(ExecutionAttemptID executionAttemptID, Throwable
cause) {}
+
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3dfcfb3b059..403d1e29c1d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -113,14 +113,12 @@
private ActorGateway taskManagerGateway;
private ActorGateway jobManagerGateway;
- private ActorGateway listenerGateway;
- private ActorGatewayTaskExecutionStateListener listener;
private ActorGatewayTaskManagerActions taskManagerConnection;
private BlockingQueue<Object> taskManagerMessages;
private BlockingQueue<Object> jobManagerMessages;
- private BlockingQueue<Object> listenerMessages;
+ private BlockingQueue<TaskExecutionState> listenerMessages;
@Before
public void createQueuesAndActors() {
@@ -129,10 +127,14 @@ public void createQueuesAndActors() {
listenerMessages = new LinkedBlockingQueue<>();
taskManagerGateway = new
ForwardingActorGateway(taskManagerMessages);
jobManagerGateway = new
ForwardingActorGateway(jobManagerMessages);
- listenerGateway = new ForwardingActorGateway(listenerMessages);
- listener = new
ActorGatewayTaskExecutionStateListener(listenerGateway);
- taskManagerConnection = new
ActorGatewayTaskManagerActions(taskManagerGateway);
+ taskManagerConnection = new
ActorGatewayTaskManagerActions(taskManagerGateway) {
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+
super.updateTaskExecutionState(taskExecutionState);
+ listenerMessages.add(taskExecutionState);
+ }
+ };
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
@@ -147,7 +149,6 @@ public void clearActorsAndMessages() {
taskManagerGateway = null;
jobManagerGateway = null;
- listenerGateway = null;
}
//
------------------------------------------------------------------------
@@ -164,8 +165,6 @@ public void testRegularExecution() {
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
- task.registerExecutionListener(listener);
-
// go into the run method. we should switch to
DEPLOYING, RUNNING, then
// FINISHED, and all should be good
task.run();
@@ -178,7 +177,6 @@ public void testRegularExecution() {
// verify listener messages
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FINISHED, task,
false);
// make sure that the TaskManager received an message
to unregister the task
validateTaskManagerStateChange(ExecutionState.RUNNING,
task, false);
@@ -244,8 +242,6 @@ public void testLibraryCacheRegistrationFailed() {
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
- task.registerExecutionListener(listener);
-
// should fail
task.run();
@@ -256,9 +252,6 @@ public void testLibraryCacheRegistrationFailed() {
assertNotNull(task.getFailureCause().getMessage());
assertTrue(task.getFailureCause().getMessage().contains("classloader"));
- // verify listener messages
- validateListenerMessage(ExecutionState.FAILED, task,
true);
-
// make sure that the TaskManager received an message
to unregister the task
validateUnregisterTask(task.getExecutionId());
@@ -292,8 +285,6 @@ public void testExecutionFailsInNetworkRegistration() {
Task task = createTask(TestInvokableCorrect.class,
blobService, libCache, network, consumableNotifier,
partitionProducerStateChecker, executor);
- task.registerExecutionListener(listener);
-
task.run();
assertEquals(ExecutionState.FAILED,
task.getExecutionState());
@@ -301,7 +292,6 @@ public void testExecutionFailsInNetworkRegistration() {
assertTrue(task.getFailureCause().getMessage().contains("buffers"));
validateUnregisterTask(task.getExecutionId());
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -313,7 +303,6 @@ public void testExecutionFailsInNetworkRegistration() {
public void testInvokableInstantiationFailed() {
try {
Task task = createTask(InvokableNonInstantiable.class);
- task.registerExecutionListener(listener);
task.run();
@@ -322,7 +311,6 @@ public void testInvokableInstantiationFailed() {
assertTrue(task.getFailureCause().getMessage().contains("instantiate"));
validateUnregisterTask(task.getExecutionId());
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -334,7 +322,6 @@ public void testInvokableInstantiationFailed() {
public void testExecutionFailsInInvoke() {
try {
Task task =
createTask(InvokableWithExceptionInInvoke.class);
- task.registerExecutionListener(listener);
task.run();
@@ -348,7 +335,6 @@ public void testExecutionFailsInInvoke() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -360,7 +346,6 @@ public void testExecutionFailsInInvoke() {
public void testFailWithWrappedException() {
try {
Task task =
createTask(FailingInvokableWithChainedException.class);
- task.registerExecutionListener(listener);
task.run();
@@ -374,7 +359,6 @@ public void testFailWithWrappedException() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -386,7 +370,6 @@ public void testFailWithWrappedException() {
public void testCancelDuringInvoke() {
try {
Task task = createTask(InvokableBlockingInInvoke.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -408,7 +391,6 @@ public void testCancelDuringInvoke() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateCancelingAndCanceledListenerMessage(task);
}
catch (Exception e) {
e.printStackTrace();
@@ -420,7 +402,6 @@ public void testCancelDuringInvoke() {
public void testFailExternallyDuringInvoke() {
try {
Task task = createTask(InvokableBlockingInInvoke.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -429,7 +410,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
task.failExternally(new Exception("test"));
- assertTrue(task.getExecutionState() ==
ExecutionState.FAILED);
+ assertEquals(ExecutionState.FAILED,
task.getExecutionState());
task.getExecutingThread().join();
@@ -441,7 +422,6 @@ public void testFailExternallyDuringInvoke() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -453,7 +433,6 @@ public void testFailExternallyDuringInvoke() {
public void testCanceledAfterExecutionFailedInInvoke() {
try {
Task task =
createTask(InvokableWithExceptionInInvoke.class);
- task.registerExecutionListener(listener);
task.run();
@@ -468,7 +447,6 @@ public void testCanceledAfterExecutionFailedInInvoke() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -477,10 +455,9 @@ public void testCanceledAfterExecutionFailedInInvoke() {
}
@Test
- public void testExecutionFailesAfterCanceling() {
+ public void testExecutionFailsAfterCanceling() {
try {
Task task =
createTask(InvokableWithExceptionOnTrigger.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -505,7 +482,6 @@ public void testExecutionFailesAfterCanceling() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateCancelingAndCanceledListenerMessage(task);
}
catch (Exception e) {
e.printStackTrace();
@@ -517,7 +493,6 @@ public void testExecutionFailesAfterCanceling() {
public void testExecutionFailsAfterTaskMarkedFailed() {
try {
Task task =
createTask(InvokableWithExceptionOnTrigger.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -541,7 +516,6 @@ public void testExecutionFailsAfterTaskMarkedFailed() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateListenerMessage(ExecutionState.FAILED, task,
true);
}
catch (Exception e) {
e.printStackTrace();
@@ -1099,11 +1073,8 @@ private void validateListenerMessage(ExecutionState
state, Task task, boolean ha
try {
// we may have to wait for a bit to give the actors
time to receive the message
// and put it into the queue
- TaskMessages.UpdateTaskExecutionState message =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
- assertNotNull("There is no additional listener
message", message);
-
- TaskExecutionState taskState =
message.taskExecutionState();
+ final TaskExecutionState taskState =
listenerMessages.take();
+ assertNotNull("There is no additional listener
message", state);
assertEquals(task.getJobID(), taskState.getJobID());
assertEquals(task.getExecutionId(), taskState.getID());
@@ -1120,45 +1091,6 @@ private void validateListenerMessage(ExecutionState
state, Task task, boolean ha
}
}
- private void validateCancelingAndCanceledListenerMessage(Task task) {
- try {
- // we may have to wait for a bit to give the actors
time to receive the message
- // and put it into the queue
- TaskMessages.UpdateTaskExecutionState message1 =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
- TaskMessages.UpdateTaskExecutionState message2 =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
-
- assertNotNull("There is no additional listener
message", message1);
- assertNotNull("There is no additional listener
message", message2);
-
- TaskExecutionState taskState1 =
message1.taskExecutionState();
- TaskExecutionState taskState2 =
message2.taskExecutionState();
-
- assertEquals(task.getJobID(), taskState1.getJobID());
- assertEquals(task.getJobID(), taskState2.getJobID());
- assertEquals(task.getExecutionId(), taskState1.getID());
- assertEquals(task.getExecutionId(), taskState2.getID());
-
- ExecutionState state1 = taskState1.getExecutionState();
- ExecutionState state2 = taskState2.getExecutionState();
-
- // it may be (very rarely) that the following race
happens:
- // - OUTSIDE THREAD: call to cancel()
- // - OUTSIDE THREAD: atomic state change from running
to canceling
- // - TASK THREAD: finishes, atomic change from
canceling to canceled
- // - TASK THREAD: send notification that state is
canceled
- // - OUTSIDE THREAD: send notification that state is
canceling
-
- // for that reason, we allow the notification messages
in any order.
- assertTrue((state1 == ExecutionState.CANCELING &&
state2 == ExecutionState.CANCELED) ||
- (state2 ==
ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
- }
- catch (InterruptedException e) {
- fail("interrupted");
- }
- }
-
//
--------------------------------------------------------------------------------------------
// Mock invokable code
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ea4bf955c58..c826e774ec2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -62,6 +62,7 @@
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,21 +252,6 @@ public void invoke() throws Exception {
}
}
- private static final class NoOpTaskManagerActions implements
TaskManagerActions {
-
- @Override
- public void notifyFinalState(ExecutionAttemptID
executionAttemptID) {}
-
- @Override
- public void notifyFatalError(String message, Throwable
cause) {}
-
- @Override
- public void failTask(ExecutionAttemptID
executionAttemptID, Throwable cause) {}
-
- @Override
- public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {}
- }
-
private static final class NoOpInputSplitProvider implements
InputSplitProvider {
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a9dda4694e..23625e86906 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -23,7 +23,6 @@
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -86,9 +85,9 @@
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.DirectExecutorService;
@@ -108,7 +107,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CloseableIterable;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -126,19 +124,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +154,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
@@ -181,43 +176,26 @@
*/
@Test
public void testEarlyCanceling() throws Exception {
- Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
- StreamConfig cfg = new StreamConfig(new Configuration());
+ final StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setOperatorID(new OperatorID(4711L, 42L));
cfg.setStreamOperator(new SlowlyDeserializingOperator());
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- Task task = createTask(SourceStreamTask.class, cfg, new
Configuration());
+ final TaskManagerActions taskManagerActions = spy(new
NoOpTaskManagerActions());
+ final Task task = createTask(SourceStreamTask.class, cfg, new
Configuration(), taskManagerActions);
- TestingExecutionStateListener testingExecutionStateListener =
new TestingExecutionStateListener();
+ final TaskExecutionState state = new TaskExecutionState(
+ task.getJobID(), task.getExecutionId(),
ExecutionState.RUNNING);
- task.registerExecutionListener(testingExecutionStateListener);
task.startTaskThread();
- Future<ExecutionState> running =
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
-
- // wait until the task thread reached state RUNNING
- ExecutionState executionState =
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- // make sure the task is really running
- if (executionState != ExecutionState.RUNNING) {
- fail("Task entered state " + task.getExecutionState() +
" with error "
- +
ExceptionUtils.stringifyException(task.getFailureCause()));
- }
+ verify(taskManagerActions,
timeout(2000L)).updateTaskExecutionState(eq(state));
// send a cancel. because the operator takes a long time to
deserialize, this should
// hit the task before the operator is deserialized
task.cancelExecution();
- Future<ExecutionState> canceling =
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
-
- executionState = canceling.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
-
- // the task should reach state canceled eventually
- assertTrue(executionState == ExecutionState.CANCELING ||
- executionState == ExecutionState.CANCELED);
-
- task.getExecutingThread().join(deadline.timeLeft().toMillis());
+ task.getExecutingThread().join();
assertFalse("Task did not cancel",
task.getExecutingThread().isAlive());
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
@@ -846,6 +824,23 @@ public void
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
// Test Utilities
//
------------------------------------------------------------------------
+ private static void waitUntilExecutionState(Task task, ExecutionState
exceptedState, Deadline deadline) {
+ while (deadline.hasTimeLeft()) {
+ if (exceptedState == task.getExecutionState()) {
+ return;
+ }
+
+ try {
+
Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200));
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+
+ throw new IllegalStateException("Task " + task + " does not
arrive state " + exceptedState + "in time. "
+ + "Current state is " + task.getExecutionState());
+ }
+
/**
* Operator that does nothing.
*
@@ -885,50 +880,27 @@ public void close() throws Exception {
}
}
- private static class TestingExecutionStateListener implements
TaskExecutionStateListener {
-
- private ExecutionState executionState = null;
-
- private final PriorityQueue<Tuple2<ExecutionState,
CompletableFuture<ExecutionState>>> priorityQueue =
- new PriorityQueue<>(1, Comparator.comparingInt(o ->
o.f0.ordinal()));
-
- Future<ExecutionState> notifyWhenExecutionState(ExecutionState
executionState) {
- synchronized (priorityQueue) {
- if (this.executionState != null &&
this.executionState.ordinal() >= executionState.ordinal()) {
- return
CompletableFuture.completedFuture(executionState);
- } else {
- CompletableFuture<ExecutionState>
promise = new CompletableFuture<>();
-
priorityQueue.offer(Tuple2.of(executionState, promise));
- return promise;
- }
- }
- }
-
- @Override
- public void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState) {
- synchronized (priorityQueue) {
- this.executionState =
taskExecutionState.getExecutionState();
-
- while (!priorityQueue.isEmpty() &&
priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
- CompletableFuture<ExecutionState>
promise = priorityQueue.poll().f1;
- promise.complete(executionState);
- }
- }
- }
+ public static Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ StreamConfig taskConfig,
+ Configuration taskManagerConfig) throws Exception {
+ return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager(), mock(TaskManagerActions.class));
}
public static Task createTask(
Class<? extends AbstractInvokable> invokable,
StreamConfig taskConfig,
- Configuration taskManagerConfig) throws Exception {
- return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager());
+ Configuration taskManagerConfig,
+ TaskManagerActions taskManagerActions) throws Exception {
+ return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager(), taskManagerActions);
}
public static Task createTask(
Class<? extends AbstractInvokable> invokable,
StreamConfig taskConfig,
Configuration taskManagerConfig,
- TestTaskStateManager taskStateManager) throws Exception
{
+ TestTaskStateManager taskStateManager,
+ TaskManagerActions taskManagerActions) throws Exception
{
BlobCacheService blobService =
new BlobCacheService(mock(PermanentBlobCache.class),
mock(TransientBlobCache.class));
@@ -980,7 +952,7 @@ public static Task createTask(
network,
mock(BroadcastVariableManager.class),
taskStateManager,
- mock(TaskManagerActions.class),
+ taskManagerActions,
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
blobService,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Remove legacy class TaskExecutionStateListener
> ----------------------------------------------
>
> Key: FLINK-10386
> URL: https://issues.apache.org/jira/browse/FLINK-10386
> Project: Flink
> Issue Type: Sub-task
> Components: TaskManager
> Affects Versions: 1.7.0
> Reporter: TisonKun
> Assignee: TisonKun
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After a discussion
> [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257]
> with [[email protected]]. I start to analyze the usage of
> {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}.
> In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any
> component rely on it. Instead, we introduce {{TaskManagerActions}} to take
> the role for the communication of {{Task}} with {{TaskManager}}. No one
> except {{TaskManager}} should directly communicate with {{Task}}. So it can
> be safely remove legacy class {{TaskExecutionStateListener}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)