[ https://issues.apache.org/jira/browse/FLINK-10386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642961#comment-16642961 ]
ASF GitHub Bot commented on FLINK-10386: ---------------------------------------- asfgit closed pull request #6729: [FLINK-10386] [taskmanager] Remove legacy class TaskExecutionStateListener URL: https://github.com/apache/flink/pull/6729 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java deleted file mode 100644 index d729dbbf5ad..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.TaskMessages; -import org.apache.flink.util.Preconditions; - -/** - * Implementation using {@link ActorGateway} to forward the messages. - */ -public class ActorGatewayTaskExecutionStateListener implements TaskExecutionStateListener { - - private final ActorGateway actorGateway; - - public ActorGatewayTaskExecutionStateListener(ActorGateway actorGateway) { - this.actorGateway = Preconditions.checkNotNull(actorGateway); - } - - @Override - public void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState) { - TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState); - - actorGateway.tell(actorMessage); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 92ae1676ed6..66f212f0fee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -86,10 +86,8 @@ import java.net.URL; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -205,9 +203,6 @@ /** Checkpoint notifier used to communicate with the CheckpointCoordinator. */ private final CheckpointResponder checkpointResponder; - /** All listener that want to be notified about changes in the task's execution state. */ - private final List<TaskExecutionStateListener> taskExecutionStateListeners; - /** The BLOB cache, from which the task can request BLOB files. */ private final BlobCacheService blobService; @@ -348,7 +343,6 @@ public Task( this.network = Preconditions.checkNotNull(networkEnvironment); this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig); - this.taskExecutionStateListeners = new CopyOnWriteArrayList<>(); this.metrics = metricGroup; this.partitionProducerStateChecker = Preconditions.checkNotNull(partitionProducerStateChecker); @@ -701,7 +695,6 @@ else if (current == ExecutionState.CANCELING) { } // notify everyone that we switched to running - notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally @@ -729,10 +722,7 @@ else if (current == ExecutionState.CANCELING) { // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime - if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { - notifyObservers(ExecutionState.FINISHED, null); - } - else { + if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } @@ -772,26 +762,21 @@ else if (current == ExecutionState.CANCELING) { if (t instanceof CancelTaskException) { if (transitionState(current, ExecutionState.CANCELED)) { cancelInvokable(invokable); - - notifyObservers(ExecutionState.CANCELED, null); break; } } else { if (transitionState(current, ExecutionState.FAILED, t)) { // proper failure of the task. record the exception as the root cause - String errorMessage = String.format("Execution of %s (%s) failed.", taskNameWithSubtask, executionId); failureCause = t; cancelInvokable(invokable); - notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t)); break; } } } else if (current == ExecutionState.CANCELING) { if (transitionState(current, ExecutionState.CANCELED)) { - notifyObservers(ExecutionState.CANCELED, null); break; } } @@ -1010,14 +995,6 @@ private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwabl // if we manage this state transition, then the invokable gets never called // we need not call cancel on it this.failureCause = cause; - notifyObservers( - targetState, - new Exception( - String.format( - "Cancel or fail execution of %s (%s).", - taskNameWithSubtask, - executionId), - cause)); return; } } @@ -1031,14 +1008,6 @@ else if (current == ExecutionState.RUNNING) { if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) { this.failureCause = cause; - notifyObservers( - targetState, - new Exception( - String.format( - "Cancel or fail execution of %s (%s).", - taskNameWithSubtask, - executionId), - cause)); LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId); @@ -1111,22 +1080,6 @@ else if (current == ExecutionState.RUNNING) { } } - // ------------------------------------------------------------------------ - // State Listeners - // ------------------------------------------------------------------------ - - public void registerExecutionListener(TaskExecutionStateListener listener) { - taskExecutionStateListeners.add(listener); - } - - private void notifyObservers(ExecutionState newState, Throwable error) { - TaskExecutionState stateUpdate = new TaskExecutionState(jobId, executionId, newState, error); - - for (TaskExecutionStateListener listener : taskExecutionStateListeners) { - listener.notifyTaskExecutionStateChanged(stateUpdate); - } - } - // ------------------------------------------------------------------------ // Partition State Listeners // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java similarity index 61% rename from flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java index 9fa9c9025e5..b39a7441df0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java @@ -18,12 +18,22 @@ package org.apache.flink.runtime.taskmanager; -public interface TaskExecutionStateListener { +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; - /** - * Called whenever the task's execution state changes - * - * @param taskExecutionState describing the task execution state change - */ - void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState); +/** + * Dummy implementation of {@link TaskManagerActions}. + */ +public class NoOpTaskManagerActions implements TaskManagerActions { + + @Override + public void notifyFinalState(ExecutionAttemptID executionAttemptID) {} + + @Override + public void notifyFatalError(String message, Throwable cause) {} + + @Override + public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {} + + @Override + public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 3dfcfb3b059..403d1e29c1d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -113,14 +113,12 @@ private ActorGateway taskManagerGateway; private ActorGateway jobManagerGateway; - private ActorGateway listenerGateway; - private ActorGatewayTaskExecutionStateListener listener; private ActorGatewayTaskManagerActions taskManagerConnection; private BlockingQueue<Object> taskManagerMessages; private BlockingQueue<Object> jobManagerMessages; - private BlockingQueue<Object> listenerMessages; + private BlockingQueue<TaskExecutionState> listenerMessages; @Before public void createQueuesAndActors() { @@ -129,10 +127,14 @@ public void createQueuesAndActors() { listenerMessages = new LinkedBlockingQueue<>(); taskManagerGateway = new ForwardingActorGateway(taskManagerMessages); jobManagerGateway = new ForwardingActorGateway(jobManagerMessages); - listenerGateway = new ForwardingActorGateway(listenerMessages); - listener = new ActorGatewayTaskExecutionStateListener(listenerGateway); - taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway); + taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway) { + @Override + public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { + super.updateTaskExecutionState(taskExecutionState); + listenerMessages.add(taskExecutionState); + } + }; awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); @@ -147,7 +149,6 @@ public void clearActorsAndMessages() { taskManagerGateway = null; jobManagerGateway = null; - listenerGateway = null; } // ------------------------------------------------------------------------ @@ -164,8 +165,6 @@ public void testRegularExecution() { assertFalse(task.isCanceledOrFailed()); assertNull(task.getFailureCause()); - task.registerExecutionListener(listener); - // go into the run method. we should switch to DEPLOYING, RUNNING, then // FINISHED, and all should be good task.run(); @@ -178,7 +177,6 @@ public void testRegularExecution() { // verify listener messages validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FINISHED, task, false); // make sure that the TaskManager received an message to unregister the task validateTaskManagerStateChange(ExecutionState.RUNNING, task, false); @@ -244,8 +242,6 @@ public void testLibraryCacheRegistrationFailed() { assertFalse(task.isCanceledOrFailed()); assertNull(task.getFailureCause()); - task.registerExecutionListener(listener); - // should fail task.run(); @@ -256,9 +252,6 @@ public void testLibraryCacheRegistrationFailed() { assertNotNull(task.getFailureCause().getMessage()); assertTrue(task.getFailureCause().getMessage().contains("classloader")); - // verify listener messages - validateListenerMessage(ExecutionState.FAILED, task, true); - // make sure that the TaskManager received an message to unregister the task validateUnregisterTask(task.getExecutionId()); @@ -292,8 +285,6 @@ public void testExecutionFailsInNetworkRegistration() { Task task = createTask(TestInvokableCorrect.class, blobService, libCache, network, consumableNotifier, partitionProducerStateChecker, executor); - task.registerExecutionListener(listener); - task.run(); assertEquals(ExecutionState.FAILED, task.getExecutionState()); @@ -301,7 +292,6 @@ public void testExecutionFailsInNetworkRegistration() { assertTrue(task.getFailureCause().getMessage().contains("buffers")); validateUnregisterTask(task.getExecutionId()); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -313,7 +303,6 @@ public void testExecutionFailsInNetworkRegistration() { public void testInvokableInstantiationFailed() { try { Task task = createTask(InvokableNonInstantiable.class); - task.registerExecutionListener(listener); task.run(); @@ -322,7 +311,6 @@ public void testInvokableInstantiationFailed() { assertTrue(task.getFailureCause().getMessage().contains("instantiate")); validateUnregisterTask(task.getExecutionId()); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -334,7 +322,6 @@ public void testInvokableInstantiationFailed() { public void testExecutionFailsInInvoke() { try { Task task = createTask(InvokableWithExceptionInInvoke.class); - task.registerExecutionListener(listener); task.run(); @@ -348,7 +335,6 @@ public void testExecutionFailsInInvoke() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -360,7 +346,6 @@ public void testExecutionFailsInInvoke() { public void testFailWithWrappedException() { try { Task task = createTask(FailingInvokableWithChainedException.class); - task.registerExecutionListener(listener); task.run(); @@ -374,7 +359,6 @@ public void testFailWithWrappedException() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -386,7 +370,6 @@ public void testFailWithWrappedException() { public void testCancelDuringInvoke() { try { Task task = createTask(InvokableBlockingInInvoke.class); - task.registerExecutionListener(listener); // run the task asynchronous task.startTaskThread(); @@ -408,7 +391,6 @@ public void testCancelDuringInvoke() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateCancelingAndCanceledListenerMessage(task); } catch (Exception e) { e.printStackTrace(); @@ -420,7 +402,6 @@ public void testCancelDuringInvoke() { public void testFailExternallyDuringInvoke() { try { Task task = createTask(InvokableBlockingInInvoke.class); - task.registerExecutionListener(listener); // run the task asynchronous task.startTaskThread(); @@ -429,7 +410,7 @@ public void testFailExternallyDuringInvoke() { awaitLatch.await(); task.failExternally(new Exception("test")); - assertTrue(task.getExecutionState() == ExecutionState.FAILED); + assertEquals(ExecutionState.FAILED, task.getExecutionState()); task.getExecutingThread().join(); @@ -441,7 +422,6 @@ public void testFailExternallyDuringInvoke() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -453,7 +433,6 @@ public void testFailExternallyDuringInvoke() { public void testCanceledAfterExecutionFailedInInvoke() { try { Task task = createTask(InvokableWithExceptionInInvoke.class); - task.registerExecutionListener(listener); task.run(); @@ -468,7 +447,6 @@ public void testCanceledAfterExecutionFailedInInvoke() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -477,10 +455,9 @@ public void testCanceledAfterExecutionFailedInInvoke() { } @Test - public void testExecutionFailesAfterCanceling() { + public void testExecutionFailsAfterCanceling() { try { Task task = createTask(InvokableWithExceptionOnTrigger.class); - task.registerExecutionListener(listener); // run the task asynchronous task.startTaskThread(); @@ -505,7 +482,6 @@ public void testExecutionFailesAfterCanceling() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateCancelingAndCanceledListenerMessage(task); } catch (Exception e) { e.printStackTrace(); @@ -517,7 +493,6 @@ public void testExecutionFailesAfterCanceling() { public void testExecutionFailsAfterTaskMarkedFailed() { try { Task task = createTask(InvokableWithExceptionOnTrigger.class); - task.registerExecutionListener(listener); // run the task asynchronous task.startTaskThread(); @@ -541,7 +516,6 @@ public void testExecutionFailsAfterTaskMarkedFailed() { validateUnregisterTask(task.getExecutionId()); validateListenerMessage(ExecutionState.RUNNING, task, false); - validateListenerMessage(ExecutionState.FAILED, task, true); } catch (Exception e) { e.printStackTrace(); @@ -1099,11 +1073,8 @@ private void validateListenerMessage(ExecutionState state, Task task, boolean ha try { // we may have to wait for a bit to give the actors time to receive the message // and put it into the queue - TaskMessages.UpdateTaskExecutionState message = - (TaskMessages.UpdateTaskExecutionState) listenerMessages.take(); - assertNotNull("There is no additional listener message", message); - - TaskExecutionState taskState = message.taskExecutionState(); + final TaskExecutionState taskState = listenerMessages.take(); + assertNotNull("There is no additional listener message", state); assertEquals(task.getJobID(), taskState.getJobID()); assertEquals(task.getExecutionId(), taskState.getID()); @@ -1120,45 +1091,6 @@ private void validateListenerMessage(ExecutionState state, Task task, boolean ha } } - private void validateCancelingAndCanceledListenerMessage(Task task) { - try { - // we may have to wait for a bit to give the actors time to receive the message - // and put it into the queue - TaskMessages.UpdateTaskExecutionState message1 = - (TaskMessages.UpdateTaskExecutionState) listenerMessages.take(); - TaskMessages.UpdateTaskExecutionState message2 = - (TaskMessages.UpdateTaskExecutionState) listenerMessages.take(); - - assertNotNull("There is no additional listener message", message1); - assertNotNull("There is no additional listener message", message2); - - TaskExecutionState taskState1 = message1.taskExecutionState(); - TaskExecutionState taskState2 = message2.taskExecutionState(); - - assertEquals(task.getJobID(), taskState1.getJobID()); - assertEquals(task.getJobID(), taskState2.getJobID()); - assertEquals(task.getExecutionId(), taskState1.getID()); - assertEquals(task.getExecutionId(), taskState2.getID()); - - ExecutionState state1 = taskState1.getExecutionState(); - ExecutionState state2 = taskState2.getExecutionState(); - - // it may be (very rarely) that the following race happens: - // - OUTSIDE THREAD: call to cancel() - // - OUTSIDE THREAD: atomic state change from running to canceling - // - TASK THREAD: finishes, atomic change from canceling to canceled - // - TASK THREAD: send notification that state is canceled - // - OUTSIDE THREAD: send notification that state is canceling - - // for that reason, we allow the notification messages in any order. - assertTrue((state1 == ExecutionState.CANCELING && state2 == ExecutionState.CANCELED) || - (state2 == ExecutionState.CANCELING && state1 == ExecutionState.CANCELED)); - } - catch (InterruptedException e) { - fail("interrupted"); - } - } - // -------------------------------------------------------------------------------------------- // Mock invokable code // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index ea4bf955c58..c826e774ec2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -251,21 +252,6 @@ public void invoke() throws Exception { } } - private static final class NoOpTaskManagerActions implements TaskManagerActions { - - @Override - public void notifyFinalState(ExecutionAttemptID executionAttemptID) {} - - @Override - public void notifyFatalError(String message, Throwable cause) {} - - @Override - public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {} - - @Override - public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {} - } - private static final class NoOpInputSplitProvider implements InputSplitProvider { @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 6a9dda4694e..23625e86906 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; @@ -86,9 +85,9 @@ import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.DirectExecutorService; @@ -108,7 +107,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.CloseableIterable; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -126,19 +124,15 @@ import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.PriorityQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -160,6 +154,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.whenNew; @@ -181,43 +176,26 @@ */ @Test public void testEarlyCanceling() throws Exception { - Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2)); - StreamConfig cfg = new StreamConfig(new Configuration()); + final StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setOperatorID(new OperatorID(4711L, 42L)); cfg.setStreamOperator(new SlowlyDeserializingOperator()); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - Task task = createTask(SourceStreamTask.class, cfg, new Configuration()); + final TaskManagerActions taskManagerActions = spy(new NoOpTaskManagerActions()); + final Task task = createTask(SourceStreamTask.class, cfg, new Configuration(), taskManagerActions); - TestingExecutionStateListener testingExecutionStateListener = new TestingExecutionStateListener(); + final TaskExecutionState state = new TaskExecutionState( + task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING); - task.registerExecutionListener(testingExecutionStateListener); task.startTaskThread(); - Future<ExecutionState> running = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING); - - // wait until the task thread reached state RUNNING - ExecutionState executionState = running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - // make sure the task is really running - if (executionState != ExecutionState.RUNNING) { - fail("Task entered state " + task.getExecutionState() + " with error " - + ExceptionUtils.stringifyException(task.getFailureCause())); - } + verify(taskManagerActions, timeout(2000L)).updateTaskExecutionState(eq(state)); // send a cancel. because the operator takes a long time to deserialize, this should // hit the task before the operator is deserialized task.cancelExecution(); - Future<ExecutionState> canceling = testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING); - - executionState = canceling.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - // the task should reach state canceled eventually - assertTrue(executionState == ExecutionState.CANCELING || - executionState == ExecutionState.CANCELED); - - task.getExecutingThread().join(deadline.timeLeft().toMillis()); + task.getExecutingThread().join(); assertFalse("Task did not cancel", task.getExecutingThread().isAlive()); assertEquals(ExecutionState.CANCELED, task.getExecutionState()); @@ -846,6 +824,23 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable // Test Utilities // ------------------------------------------------------------------------ + private static void waitUntilExecutionState(Task task, ExecutionState exceptedState, Deadline deadline) { + while (deadline.hasTimeLeft()) { + if (exceptedState == task.getExecutionState()) { + return; + } + + try { + Thread.sleep(Math.min(deadline.timeLeft().toMillis(), 200)); + } catch (InterruptedException e) { + // do nothing + } + } + + throw new IllegalStateException("Task " + task + " does not arrive state " + exceptedState + "in time. " + + "Current state is " + task.getExecutionState()); + } + /** * Operator that does nothing. * @@ -885,50 +880,27 @@ public void close() throws Exception { } } - private static class TestingExecutionStateListener implements TaskExecutionStateListener { - - private ExecutionState executionState = null; - - private final PriorityQueue<Tuple2<ExecutionState, CompletableFuture<ExecutionState>>> priorityQueue = - new PriorityQueue<>(1, Comparator.comparingInt(o -> o.f0.ordinal())); - - Future<ExecutionState> notifyWhenExecutionState(ExecutionState executionState) { - synchronized (priorityQueue) { - if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) { - return CompletableFuture.completedFuture(executionState); - } else { - CompletableFuture<ExecutionState> promise = new CompletableFuture<>(); - priorityQueue.offer(Tuple2.of(executionState, promise)); - return promise; - } - } - } - - @Override - public void notifyTaskExecutionStateChanged(TaskExecutionState taskExecutionState) { - synchronized (priorityQueue) { - this.executionState = taskExecutionState.getExecutionState(); - - while (!priorityQueue.isEmpty() && priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) { - CompletableFuture<ExecutionState> promise = priorityQueue.poll().f1; - promise.complete(executionState); - } - } - } + public static Task createTask( + Class<? extends AbstractInvokable> invokable, + StreamConfig taskConfig, + Configuration taskManagerConfig) throws Exception { + return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), mock(TaskManagerActions.class)); } public static Task createTask( Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, - Configuration taskManagerConfig) throws Exception { - return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager()); + Configuration taskManagerConfig, + TaskManagerActions taskManagerActions) throws Exception { + return createTask(invokable, taskConfig, taskManagerConfig, new TestTaskStateManager(), taskManagerActions); } public static Task createTask( Class<? extends AbstractInvokable> invokable, StreamConfig taskConfig, Configuration taskManagerConfig, - TestTaskStateManager taskStateManager) throws Exception { + TestTaskStateManager taskStateManager, + TaskManagerActions taskManagerActions) throws Exception { BlobCacheService blobService = new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class)); @@ -980,7 +952,7 @@ public static Task createTask( network, mock(BroadcastVariableManager.class), taskStateManager, - mock(TaskManagerActions.class), + taskManagerActions, mock(InputSplitProvider.class), mock(CheckpointResponder.class), blobService, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove legacy class TaskExecutionStateListener > ---------------------------------------------- > > Key: FLINK-10386 > URL: https://issues.apache.org/jira/browse/FLINK-10386 > Project: Flink > Issue Type: Sub-task > Components: TaskManager > Affects Versions: 1.7.0 > Reporter: TisonKun > Assignee: TisonKun > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > After a discussion > [here|https://github.com/apache/flink/commit/0735b5b935b0c0757943e2d58047afcfb9949560#commitcomment-30584257] > with [~trohrm...@apache.org]. I start to analyze the usage of > {{ActorGatewayTaskExecutionStateListener}} and {{TaskExecutionStateListener}}. > In conclusion, we abort {{TaskExecutionStateListener}} strategy and no any > component rely on it. Instead, we introduce {{TaskManagerActions}} to take > the role for the communication of {{Task}} with {{TaskManager}}. No one > except {{TaskManager}} should directly communicate with {{Task}}. So it can > be safely remove legacy class {{TaskExecutionStateListener}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)