[
https://issues.apache.org/jira/browse/FLINK-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642960#comment-16642960
]
ASF GitHub Bot commented on FLINK-10513:
----------------------------------------
asfgit closed pull request #6804: [FLINK-10513] Replace
TaskManagerActions#notifyFinalState with #updateTaskExecutionState
URL: https://github.com/apache/flink/pull/6804
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ae69e561bc7..e2ec4822576 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1552,11 +1552,6 @@ private TaskManagerActionsImpl(JobMasterGateway
jobMasterGateway) {
this.jobMasterGateway = checkNotNull(jobMasterGateway);
}
- @Override
- public void notifyFinalState(final ExecutionAttemptID
executionAttemptID) {
- runAsync(() ->
unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID));
- }
-
@Override
public void notifyFatalError(String message, Throwable cause) {
try {
@@ -1574,7 +1569,11 @@ public void failTask(final ExecutionAttemptID
executionAttemptID, final Throwabl
@Override
public void updateTaskExecutionState(final TaskExecutionState
taskExecutionState) {
-
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway,
taskExecutionState);
+ if
(taskExecutionState.getExecutionState().isTerminal()) {
+ runAsync(() ->
unregisterTaskAndNotifyFinalState(jobMasterGateway,
taskExecutionState.getID()));
+ } else {
+
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway,
taskExecutionState);
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
deleted file mode 100644
index d729dbbf5ad..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskExecutionStateListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskExecutionStateListener implements
TaskExecutionStateListener {
-
- private final ActorGateway actorGateway;
-
- public ActorGatewayTaskExecutionStateListener(ActorGateway
actorGateway) {
- this.actorGateway = Preconditions.checkNotNull(actorGateway);
- }
-
- @Override
- public void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState) {
- TaskMessages.UpdateTaskExecutionState actorMessage = new
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
- actorGateway.tell(actorMessage);
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
index b3a0cbbac7a..1731fcc4ef2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
@@ -35,11 +35,6 @@ public ActorGatewayTaskManagerActions(ActorGateway
actorGateway) {
this.actorGateway = Preconditions.checkNotNull(actorGateway);
}
- @Override
- public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
- actorGateway.tell(new
TaskMessages.TaskInFinalState(executionAttemptID));
- }
-
@Override
public void notifyFatalError(String message, Throwable cause) {
actorGateway.tell(new TaskManagerMessages.FatalError(message,
cause));
@@ -52,8 +47,13 @@ public void failTask(ExecutionAttemptID executionAttemptID,
Throwable cause) {
@Override
public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
- TaskMessages.UpdateTaskExecutionState actorMessage = new
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
- actorGateway.tell(actorMessage);
+ final TaskMessages.TaskMessage taskMessage;
+ if (taskExecutionState.getExecutionState().isTerminal()) {
+ taskMessage = new
TaskMessages.TaskInFinalState(taskExecutionState.getID());
+ } else {
+ taskMessage = new
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+ }
+
+ actorGateway.tell(taskMessage);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 92ae1676ed6..9abf92f76fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -86,10 +86,8 @@
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -205,9 +203,6 @@
/** Checkpoint notifier used to communicate with the
CheckpointCoordinator. */
private final CheckpointResponder checkpointResponder;
- /** All listener that want to be notified about changes in the task's
execution state. */
- private final List<TaskExecutionStateListener>
taskExecutionStateListeners;
-
/** The BLOB cache, from which the task can request BLOB files. */
private final BlobCacheService blobService;
@@ -348,7 +343,6 @@ public Task(
this.network = Preconditions.checkNotNull(networkEnvironment);
this.taskManagerConfig =
Preconditions.checkNotNull(taskManagerConfig);
- this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
this.metrics = metricGroup;
this.partitionProducerStateChecker =
Preconditions.checkNotNull(partitionProducerStateChecker);
@@ -701,7 +695,6 @@ else if (current == ExecutionState.CANCELING) {
}
// notify everyone that we switched to running
- notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible
thread-locally
@@ -729,10 +722,7 @@ else if (current == ExecutionState.CANCELING) {
// try to mark the task as finished
// if that fails, the task was canceled/failed in the
meantime
- if (transitionState(ExecutionState.RUNNING,
ExecutionState.FINISHED)) {
- notifyObservers(ExecutionState.FINISHED, null);
- }
- else {
+ if (!transitionState(ExecutionState.RUNNING,
ExecutionState.FINISHED)) {
throw new CancelTaskException();
}
}
@@ -772,26 +762,21 @@ else if (current == ExecutionState.CANCELING) {
if (t instanceof
CancelTaskException) {
if
(transitionState(current, ExecutionState.CANCELED)) {
cancelInvokable(invokable);
-
-
notifyObservers(ExecutionState.CANCELED, null);
break;
}
}
else {
if
(transitionState(current, ExecutionState.FAILED, t)) {
// proper
failure of the task. record the exception as the root cause
- String
errorMessage = String.format("Execution of %s (%s) failed.",
taskNameWithSubtask, executionId);
failureCause =
t;
cancelInvokable(invokable);
-
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
break;
}
}
}
else if (current ==
ExecutionState.CANCELING) {
if (transitionState(current,
ExecutionState.CANCELED)) {
-
notifyObservers(ExecutionState.CANCELED, null);
break;
}
}
@@ -883,7 +868,8 @@ private ClassLoader createUserCodeClassloader() throws
Exception {
}
private void notifyFinalState() {
- taskManagerActions.notifyFinalState(executionId);
+ checkState(executionState.isTerminal());
+ taskManagerActions.updateTaskExecutionState(new
TaskExecutionState(jobId, executionId, executionState, failureCause));
}
private void notifyFatalError(String message, Throwable cause) {
@@ -1010,14 +996,6 @@ private void
cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwabl
// if we manage this state transition,
then the invokable gets never called
// we need not call cancel on it
this.failureCause = cause;
- notifyObservers(
- targetState,
- new Exception(
- String.format(
- "Cancel or fail
execution of %s (%s).",
-
taskNameWithSubtask,
- executionId),
- cause));
return;
}
}
@@ -1031,14 +1009,6 @@ else if (current == ExecutionState.RUNNING) {
if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
- notifyObservers(
- targetState,
- new Exception(
- String.format(
- "Cancel
or fail execution of %s (%s).",
-
taskNameWithSubtask,
-
executionId),
- cause));
LOG.info("Triggering
cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
@@ -1111,22 +1081,6 @@ else if (current == ExecutionState.RUNNING) {
}
}
- //
------------------------------------------------------------------------
- // State Listeners
- //
------------------------------------------------------------------------
-
- public void registerExecutionListener(TaskExecutionStateListener
listener) {
- taskExecutionStateListeners.add(listener);
- }
-
- private void notifyObservers(ExecutionState newState, Throwable error) {
- TaskExecutionState stateUpdate = new TaskExecutionState(jobId,
executionId, newState, error);
-
- for (TaskExecutionStateListener listener :
taskExecutionStateListeners) {
- listener.notifyTaskExecutionStateChanged(stateUpdate);
- }
- }
-
//
------------------------------------------------------------------------
// Partition State Listeners
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
index 2f3a0cb558d..09242c2bd2f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
@@ -25,13 +25,6 @@
*/
public interface TaskManagerActions {
- /**
- * Notifies the task manager that the given task is in a final state.
- *
- * @param executionAttemptID Execution attempt ID of the task
- */
- void notifyFinalState(ExecutionAttemptID executionAttemptID);
-
/**
* Notifies the task manager about a fatal error occurred in the task.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
similarity index 65%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
index 9fa9c9025e5..ea302c06f5c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/NoOpTaskManagerActions.java
@@ -18,12 +18,19 @@
package org.apache.flink.runtime.taskmanager;
-public interface TaskExecutionStateListener {
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
- /**
- * Called whenever the task's execution state changes
- *
- * @param taskExecutionState describing the task execution state change
- */
- void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState);
+/**
+ * Dummy implementation of {@link TaskManagerActions}.
+ */
+public class NoOpTaskManagerActions implements TaskManagerActions {
+
+ @Override
+ public void notifyFatalError(String message, Throwable cause) {}
+
+ @Override
+ public void failTask(ExecutionAttemptID executionAttemptID, Throwable
cause) {}
+
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 2da2ba49265..a397b6ca915 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -121,6 +121,9 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for the legacy {@link TaskManager}.
+ */
@SuppressWarnings("serial")
public class TaskManagerTest extends TestLogger {
@@ -133,7 +136,7 @@
private static ActorSystem system;
- final static UUID leaderSessionID = UUID.randomUUID();
+ static final UUID LEADER_SESSION_ID = UUID.randomUUID();
private TestingHighAvailabilityServices highAvailabilityServices;
@@ -215,7 +218,6 @@ protected void run() {
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<PermanentBlobKey>(),
Collections.emptyList(), 0);
-
new Within(d) {
@Override
@@ -261,8 +263,6 @@ else if (!(message instanceof
TaskManagerMessages.Heartbeat)) {
fail("Unexpected message: " + message);
}
} while
(System.currentTimeMillis() < deadline);
-
-
}
};
}
@@ -283,11 +283,11 @@ public void testJobSubmissionAndCanceling() {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -404,7 +404,7 @@ protected void run() {
}
};
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -424,11 +424,11 @@ public void testJobSubmissionAndStop() throws Exception {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -554,11 +554,11 @@ public void testGateChannelEdgeMismatch() {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -651,7 +651,7 @@ public void testRunJobWithForwardChannel() {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
final JobID jid = new JobID();
@@ -661,8 +661,8 @@ public void testRunJobWithForwardChannel() {
final ExecutionAttemptID eid1 = new
ExecutionAttemptID();
final ExecutionAttemptID eid2 = new
ExecutionAttemptID();
- ActorRef jm = system.actorOf(Props.create(new
SimpleLookupJobManagerCreator(leaderSessionID)));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ ActorRef jm = system.actorOf(Props.create(new
SimpleLookupJobManagerCreator(LEADER_SESSION_ID)));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -793,7 +793,7 @@ public void testCancellingDependentAndStateUpdateFails() {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
final JobID jid = new JobID();
@@ -806,12 +806,12 @@ public void testCancellingDependentAndStateUpdateFails() {
ActorRef jm = system.actorOf(
Props.create(
new
SimpleLookupFailingUpdateJobManagerCreator(
-
leaderSessionID,
+
LEADER_SESSION_ID,
eid2)
)
);
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -911,14 +911,14 @@ protected void run() {
assertEquals(0,
tasks.size());
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
};
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -943,16 +943,16 @@ public void testRemotePartitionNotFound() throws
Exception {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
final IntermediateDataSetID resultId = new
IntermediateDataSetID();
// Create the JM
ActorRef jm = system.actorOf(Props.create(
- new
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+ new
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID,
getTestActor())));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1017,12 +1017,12 @@ protected void run() {
// The task should fail after
repeated requests
assertEquals(ExecutionState.FAILED, msg.getExecutionState());
Throwable t =
msg.getError(ClassLoader.getSystemClassLoader());
- assertEquals("Thrown exception
was not a PartitionNotFoundException: " + t.getMessage(),
+ assertEquals("Thrown exception
was not a PartitionNotFoundException: " + t.getMessage(),
PartitionNotFoundException.class, t.getClass());
}
};
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -1065,16 +1065,16 @@ public void testLocalPartitionNotFound() throws
Exception {
final ActorGateway testActorGateway = new
AkkaActorGateway(
getTestActor(),
- leaderSessionID);
+ LEADER_SESSION_ID);
try {
final IntermediateDataSetID resultId = new
IntermediateDataSetID();
// Create the JM
ActorRef jm = system.actorOf(Props.create(
- new
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+ new
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID,
getTestActor())));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1143,7 +1143,7 @@ protected void run() {
}
};
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -1167,9 +1167,9 @@ public void testLogNotFoundHandling() throws Exception {
// Create the JM
ActorRef jm = system.actorOf(Props.create(
- new
SimplePartitionStateLookupJobManagerCreator(leaderSessionID, getTestActor())));
+ new
SimplePartitionStateLookupJobManagerCreator(LEADER_SESSION_ID,
getTestActor())));
- jobManager = new AkkaActorGateway(jm,
leaderSessionID);
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
final int dataPort =
NetUtils.getAvailablePort();
Configuration config = new Configuration();
@@ -1209,7 +1209,8 @@ protected void run() {
TestingUtils.stopActor(taskManager);
TestingUtils.stopActor(jobManager);
}
- }};}
+ }};
+ }
//
------------------------------------------------------------------------
// Stack trace sample
@@ -1555,7 +1556,7 @@ public void testTerminationOnFatalError() {
* Test that a failing schedule or update consumers call leads to the
failing of the respective
* task.
*
- * IMPORTANT: We have to make sure that the invokable's cancel method
is called, because only
+ * <p>IMPORTANT: We have to make sure that the invokable's cancel
method is called, because only
* then the future is completed. We do this by not eagerly deploy
consumer tasks and requiring
* the invokable to fill one memory segment. The completed memory
segment will trigger the
* scheduling of the downstream operator since it is in pipeline mode.
After we've filled the
@@ -1592,9 +1593,8 @@ public void testFailingScheduleOrUpdateConsumersMessage()
throws Exception {
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<>(), Collections.emptyList(), 0);
-
- ActorRef jmActorRef =
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class,
leaderSessionID), "jobmanager");
- ActorGateway jobManager = new
AkkaActorGateway(jmActorRef, leaderSessionID);
+ ActorRef jmActorRef =
system.actorOf(Props.create(FailingScheduleOrUpdateConsumersJobManager.class,
LEADER_SESSION_ID), "jobmanager");
+ ActorGateway jobManager = new
AkkaActorGateway(jmActorRef, LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1635,8 +1635,8 @@ public void testSubmitTaskFailure() throws Exception {
try {
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1696,8 +1696,8 @@ public void testStopTaskFailure() throws Exception {
try {
final ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1764,8 +1764,8 @@ public void testStackTraceSampleFailure() throws
Exception {
try {
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1813,8 +1813,8 @@ public void testUpdateTaskInputPartitionsFailure() throws
Exception {
final ExecutionAttemptID executionAttemptId = new
ExecutionAttemptID();
- ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, leaderSessionID));
- jobManager = new AkkaActorGateway(jm, leaderSessionID);
+ ActorRef jm =
system.actorOf(Props.create(SimpleJobManager.class, LEADER_SESSION_ID));
+ jobManager = new AkkaActorGateway(jm,
LEADER_SESSION_ID);
highAvailabilityServices.setJobMasterLeaderRetriever(
HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -1894,7 +1894,7 @@ public void handleMessage(Object message) throws
Exception {
),
self);
}
- else if(message instanceof
TaskMessages.UpdateTaskExecutionState){
+ else if (message instanceof
TaskMessages.UpdateTaskExecutionState){
getSender().tell(true, getSelf());
}
}
@@ -1958,7 +1958,7 @@ public void handleMessage(Object message) throws
Exception{
TaskMessages.UpdateTaskExecutionState updateMsg
=
(TaskMessages.UpdateTaskExecutionState) message;
-
if(validIDs.contains(updateMsg.taskExecutionState().getID())) {
+ if
(validIDs.contains(updateMsg.taskExecutionState().getID())) {
getSender().tell(true, getSelf());
} else {
getSender().tell(false, getSelf());
@@ -2021,7 +2021,7 @@ public SimpleLookupFailingUpdateJobManagerCreator(UUID
leaderSessionID, Executio
validIDs = new HashSet<ExecutionAttemptID>();
- for(ExecutionAttemptID id : ids) {
+ for (ExecutionAttemptID id : ids) {
this.validIDs.add(id);
}
}
@@ -2049,9 +2049,9 @@ public SimplePartitionStateLookupJobManager create()
throws Exception {
return new
SimplePartitionStateLookupJobManager(leaderSessionID, testActor);
}
}
-
+
//
--------------------------------------------------------------------------------------------
-
+
public static final class TestInvokableCorrect extends
AbstractInvokable {
public TestInvokableCorrect(Environment environment) {
@@ -2061,7 +2061,7 @@ public TestInvokableCorrect(Environment environment) {
@Override
public void invoke() {}
}
-
+
public static class TestInvokableBlockingCancelable extends
AbstractInvokable {
public TestInvokableBlockingCancelable(Environment environment)
{
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3dfcfb3b059..bb5cd17193a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -113,14 +113,12 @@
private ActorGateway taskManagerGateway;
private ActorGateway jobManagerGateway;
- private ActorGateway listenerGateway;
- private ActorGatewayTaskExecutionStateListener listener;
private ActorGatewayTaskManagerActions taskManagerConnection;
private BlockingQueue<Object> taskManagerMessages;
private BlockingQueue<Object> jobManagerMessages;
- private BlockingQueue<Object> listenerMessages;
+ private BlockingQueue<TaskExecutionState> listenerMessages;
@Before
public void createQueuesAndActors() {
@@ -129,10 +127,14 @@ public void createQueuesAndActors() {
listenerMessages = new LinkedBlockingQueue<>();
taskManagerGateway = new
ForwardingActorGateway(taskManagerMessages);
jobManagerGateway = new
ForwardingActorGateway(jobManagerMessages);
- listenerGateway = new ForwardingActorGateway(listenerMessages);
- listener = new
ActorGatewayTaskExecutionStateListener(listenerGateway);
- taskManagerConnection = new
ActorGatewayTaskManagerActions(taskManagerGateway);
+ taskManagerConnection = new
ActorGatewayTaskManagerActions(taskManagerGateway) {
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+
super.updateTaskExecutionState(taskExecutionState);
+ listenerMessages.add(taskExecutionState);
+ }
+ };
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
@@ -147,7 +149,6 @@ public void clearActorsAndMessages() {
taskManagerGateway = null;
jobManagerGateway = null;
- listenerGateway = null;
}
//
------------------------------------------------------------------------
@@ -164,8 +165,6 @@ public void testRegularExecution() {
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
- task.registerExecutionListener(listener);
-
// go into the run method. we should switch to
DEPLOYING, RUNNING, then
// FINISHED, and all should be good
task.run();
@@ -244,8 +243,6 @@ public void testLibraryCacheRegistrationFailed() {
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
- task.registerExecutionListener(listener);
-
// should fail
task.run();
@@ -292,8 +289,6 @@ public void testExecutionFailsInNetworkRegistration() {
Task task = createTask(TestInvokableCorrect.class,
blobService, libCache, network, consumableNotifier,
partitionProducerStateChecker, executor);
- task.registerExecutionListener(listener);
-
task.run();
assertEquals(ExecutionState.FAILED,
task.getExecutionState());
@@ -313,7 +308,6 @@ public void testExecutionFailsInNetworkRegistration() {
public void testInvokableInstantiationFailed() {
try {
Task task = createTask(InvokableNonInstantiable.class);
- task.registerExecutionListener(listener);
task.run();
@@ -334,7 +328,6 @@ public void testInvokableInstantiationFailed() {
public void testExecutionFailsInInvoke() {
try {
Task task =
createTask(InvokableWithExceptionInInvoke.class);
- task.registerExecutionListener(listener);
task.run();
@@ -360,7 +353,6 @@ public void testExecutionFailsInInvoke() {
public void testFailWithWrappedException() {
try {
Task task =
createTask(FailingInvokableWithChainedException.class);
- task.registerExecutionListener(listener);
task.run();
@@ -386,7 +378,6 @@ public void testFailWithWrappedException() {
public void testCancelDuringInvoke() {
try {
Task task = createTask(InvokableBlockingInInvoke.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -408,7 +399,7 @@ public void testCancelDuringInvoke() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateCancelingAndCanceledListenerMessage(task);
+ validateListenerMessage(ExecutionState.CANCELED, task,
false);
}
catch (Exception e) {
e.printStackTrace();
@@ -420,7 +411,6 @@ public void testCancelDuringInvoke() {
public void testFailExternallyDuringInvoke() {
try {
Task task = createTask(InvokableBlockingInInvoke.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -429,7 +419,7 @@ public void testFailExternallyDuringInvoke() {
awaitLatch.await();
task.failExternally(new Exception("test"));
- assertTrue(task.getExecutionState() ==
ExecutionState.FAILED);
+ assertEquals(ExecutionState.FAILED,
task.getExecutionState());
task.getExecutingThread().join();
@@ -453,7 +443,6 @@ public void testFailExternallyDuringInvoke() {
public void testCanceledAfterExecutionFailedInInvoke() {
try {
Task task =
createTask(InvokableWithExceptionInInvoke.class);
- task.registerExecutionListener(listener);
task.run();
@@ -477,10 +466,9 @@ public void testCanceledAfterExecutionFailedInInvoke() {
}
@Test
- public void testExecutionFailesAfterCanceling() {
+ public void testExecutionFailsAfterCanceling() {
try {
Task task =
createTask(InvokableWithExceptionOnTrigger.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -505,7 +493,7 @@ public void testExecutionFailesAfterCanceling() {
validateUnregisterTask(task.getExecutionId());
validateListenerMessage(ExecutionState.RUNNING, task,
false);
- validateCancelingAndCanceledListenerMessage(task);
+ validateListenerMessage(ExecutionState.CANCELED, task,
false);
}
catch (Exception e) {
e.printStackTrace();
@@ -517,7 +505,6 @@ public void testExecutionFailesAfterCanceling() {
public void testExecutionFailsAfterTaskMarkedFailed() {
try {
Task task =
createTask(InvokableWithExceptionOnTrigger.class);
- task.registerExecutionListener(listener);
// run the task asynchronous
task.startTaskThread();
@@ -1099,11 +1086,8 @@ private void validateListenerMessage(ExecutionState
state, Task task, boolean ha
try {
// we may have to wait for a bit to give the actors
time to receive the message
// and put it into the queue
- TaskMessages.UpdateTaskExecutionState message =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
- assertNotNull("There is no additional listener
message", message);
-
- TaskExecutionState taskState =
message.taskExecutionState();
+ final TaskExecutionState taskState =
listenerMessages.take();
+ assertNotNull("There is no additional listener
message", state);
assertEquals(task.getJobID(), taskState.getJobID());
assertEquals(task.getExecutionId(), taskState.getID());
@@ -1120,45 +1104,6 @@ private void validateListenerMessage(ExecutionState
state, Task task, boolean ha
}
}
- private void validateCancelingAndCanceledListenerMessage(Task task) {
- try {
- // we may have to wait for a bit to give the actors
time to receive the message
- // and put it into the queue
- TaskMessages.UpdateTaskExecutionState message1 =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
- TaskMessages.UpdateTaskExecutionState message2 =
- (TaskMessages.UpdateTaskExecutionState)
listenerMessages.take();
-
- assertNotNull("There is no additional listener
message", message1);
- assertNotNull("There is no additional listener
message", message2);
-
- TaskExecutionState taskState1 =
message1.taskExecutionState();
- TaskExecutionState taskState2 =
message2.taskExecutionState();
-
- assertEquals(task.getJobID(), taskState1.getJobID());
- assertEquals(task.getJobID(), taskState2.getJobID());
- assertEquals(task.getExecutionId(), taskState1.getID());
- assertEquals(task.getExecutionId(), taskState2.getID());
-
- ExecutionState state1 = taskState1.getExecutionState();
- ExecutionState state2 = taskState2.getExecutionState();
-
- // it may be (very rarely) that the following race
happens:
- // - OUTSIDE THREAD: call to cancel()
- // - OUTSIDE THREAD: atomic state change from running
to canceling
- // - TASK THREAD: finishes, atomic change from
canceling to canceled
- // - TASK THREAD: send notification that state is
canceled
- // - OUTSIDE THREAD: send notification that state is
canceling
-
- // for that reason, we allow the notification messages
in any order.
- assertTrue((state1 == ExecutionState.CANCELING &&
state2 == ExecutionState.CANCELED) ||
- (state2 ==
ExecutionState.CANCELING && state1 == ExecutionState.CANCELED));
- }
- catch (InterruptedException e) {
- fail("interrupted");
- }
- }
-
//
--------------------------------------------------------------------------------------------
// Mock invokable code
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index ea4bf955c58..c826e774ec2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -62,6 +62,7 @@
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -251,21 +252,6 @@ public void invoke() throws Exception {
}
}
- private static final class NoOpTaskManagerActions implements
TaskManagerActions {
-
- @Override
- public void notifyFinalState(ExecutionAttemptID
executionAttemptID) {}
-
- @Override
- public void notifyFatalError(String message, Throwable
cause) {}
-
- @Override
- public void failTask(ExecutionAttemptID
executionAttemptID, Throwable cause) {}
-
- @Override
- public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {}
- }
-
private static final class NoOpInputSplitProvider implements
InputSplitProvider {
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a9dda4694e..72a8c415f27 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,9 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
@@ -86,9 +84,9 @@
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.DirectExecutorService;
@@ -108,7 +106,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CloseableIterable;
-import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -126,19 +123,15 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -160,6 +153,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
@@ -181,43 +175,26 @@
*/
@Test
public void testEarlyCanceling() throws Exception {
- Deadline deadline = Deadline.fromNow(Duration.ofMinutes(2));
- StreamConfig cfg = new StreamConfig(new Configuration());
+ final StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setOperatorID(new OperatorID(4711L, 42L));
cfg.setStreamOperator(new SlowlyDeserializingOperator());
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- Task task = createTask(SourceStreamTask.class, cfg, new
Configuration());
+ final TaskManagerActions taskManagerActions = spy(new
NoOpTaskManagerActions());
+ final Task task = createTask(SourceStreamTask.class, cfg, new
Configuration(), taskManagerActions);
- TestingExecutionStateListener testingExecutionStateListener =
new TestingExecutionStateListener();
+ final TaskExecutionState state = new TaskExecutionState(
+ task.getJobID(), task.getExecutionId(),
ExecutionState.RUNNING);
- task.registerExecutionListener(testingExecutionStateListener);
task.startTaskThread();
- Future<ExecutionState> running =
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.RUNNING);
-
- // wait until the task thread reached state RUNNING
- ExecutionState executionState =
running.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
- // make sure the task is really running
- if (executionState != ExecutionState.RUNNING) {
- fail("Task entered state " + task.getExecutionState() +
" with error "
- +
ExceptionUtils.stringifyException(task.getFailureCause()));
- }
+ verify(taskManagerActions,
timeout(2000L)).updateTaskExecutionState(eq(state));
// send a cancel. because the operator takes a long time to
deserialize, this should
// hit the task before the operator is deserialized
task.cancelExecution();
- Future<ExecutionState> canceling =
testingExecutionStateListener.notifyWhenExecutionState(ExecutionState.CANCELING);
-
- executionState = canceling.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS);
-
- // the task should reach state canceled eventually
- assertTrue(executionState == ExecutionState.CANCELING ||
- executionState == ExecutionState.CANCELED);
-
- task.getExecutingThread().join(deadline.timeLeft().toMillis());
+ task.getExecutingThread().join();
assertFalse("Task did not cancel",
task.getExecutingThread().isAlive());
assertEquals(ExecutionState.CANCELED, task.getExecutionState());
@@ -885,50 +862,27 @@ public void close() throws Exception {
}
}
- private static class TestingExecutionStateListener implements
TaskExecutionStateListener {
-
- private ExecutionState executionState = null;
-
- private final PriorityQueue<Tuple2<ExecutionState,
CompletableFuture<ExecutionState>>> priorityQueue =
- new PriorityQueue<>(1, Comparator.comparingInt(o ->
o.f0.ordinal()));
-
- Future<ExecutionState> notifyWhenExecutionState(ExecutionState
executionState) {
- synchronized (priorityQueue) {
- if (this.executionState != null &&
this.executionState.ordinal() >= executionState.ordinal()) {
- return
CompletableFuture.completedFuture(executionState);
- } else {
- CompletableFuture<ExecutionState>
promise = new CompletableFuture<>();
-
priorityQueue.offer(Tuple2.of(executionState, promise));
- return promise;
- }
- }
- }
-
- @Override
- public void notifyTaskExecutionStateChanged(TaskExecutionState
taskExecutionState) {
- synchronized (priorityQueue) {
- this.executionState =
taskExecutionState.getExecutionState();
-
- while (!priorityQueue.isEmpty() &&
priorityQueue.peek().f0.ordinal() <= executionState.ordinal()) {
- CompletableFuture<ExecutionState>
promise = priorityQueue.poll().f1;
- promise.complete(executionState);
- }
- }
- }
+ public static Task createTask(
+ Class<? extends AbstractInvokable> invokable,
+ StreamConfig taskConfig,
+ Configuration taskManagerConfig) throws Exception {
+ return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager(), mock(TaskManagerActions.class));
}
public static Task createTask(
Class<? extends AbstractInvokable> invokable,
StreamConfig taskConfig,
- Configuration taskManagerConfig) throws Exception {
- return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager());
+ Configuration taskManagerConfig,
+ TaskManagerActions taskManagerActions) throws Exception {
+ return createTask(invokable, taskConfig, taskManagerConfig, new
TestTaskStateManager(), taskManagerActions);
}
public static Task createTask(
Class<? extends AbstractInvokable> invokable,
StreamConfig taskConfig,
Configuration taskManagerConfig,
- TestTaskStateManager taskStateManager) throws Exception
{
+ TestTaskStateManager taskStateManager,
+ TaskManagerActions taskManagerActions) throws Exception
{
BlobCacheService blobService =
new BlobCacheService(mock(PermanentBlobCache.class),
mock(TransientBlobCache.class));
@@ -980,7 +934,7 @@ public static Task createTask(
network,
mock(BroadcastVariableManager.class),
taskStateManager,
- mock(TaskManagerActions.class),
+ taskManagerActions,
mock(InputSplitProvider.class),
mock(CheckpointResponder.class),
blobService,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Replace TaskManagerActions#notifyFinalState with #updateTaskExecutionState
> --------------------------------------------------------------------------
>
> Key: FLINK-10513
> URL: https://issues.apache.org/jira/browse/FLINK-10513
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 1.7.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> We can simplify the {{TaskManagerActions}} interface by replacing calls to
> the {{notifyFinalState}} method by {{updateTaskExecutionState}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)