[
https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568269#comment-16568269
]
ASF GitHub Bot commented on FLINK-10033:
----------------------------------------
asfgit closed pull request #6480: [FLINK-10033] [runtime] Task releases
reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 60b2ed8fee7..92ae1676ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -78,6 +78,7 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -243,7 +244,9 @@
/** atomic flag that makes sure the invokable is canceled exactly once
upon error. */
private final AtomicBoolean invokableHasBeenCanceled;
- /** The invokable of this task, if initialized. */
+ /** The invokable of this task, if initialized. All accesses must copy
the reference and
+ * check for null, as this field is cleared as part of the disposal
logic. */
+ @Nullable
private volatile AbstractInvokable invokable;
/** The current execution state of the task. */
@@ -473,6 +476,12 @@ long getTaskCancellationTimeout() {
return taskCancellationTimeout;
}
+ @Nullable
+ @VisibleForTesting
+ AbstractInvokable getInvokable() {
+ return invokable;
+ }
+
//
------------------------------------------------------------------------
// Task Execution
//
------------------------------------------------------------------------
@@ -762,7 +771,7 @@ else if (current == ExecutionState.CANCELING) {
if (current == ExecutionState.RUNNING
|| current == ExecutionState.DEPLOYING) {
if (t instanceof
CancelTaskException) {
if
(transitionState(current, ExecutionState.CANCELED)) {
-
cancelInvokable();
+
cancelInvokable(invokable);
notifyObservers(ExecutionState.CANCELED, null);
break;
@@ -773,7 +782,7 @@ else if (current == ExecutionState.CANCELING) {
// proper
failure of the task. record the exception as the root cause
String
errorMessage = String.format("Execution of %s (%s) failed.",
taskNameWithSubtask, executionId);
failureCause =
t;
-
cancelInvokable();
+
cancelInvokable(invokable);
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
break;
@@ -808,6 +817,10 @@ else if (transitionState(current, ExecutionState.FAILED,
t)) {
try {
LOG.info("Freeing task resources for {} ({}).",
taskNameWithSubtask, executionId);
+ // clear the reference to the invokable. this
helps guard against holding references
+ // to the invokable and its structures in cases
where this Task object is still referenced
+ this.invokable = null;
+
// stop the async dispatcher.
// copy dispatcher reference to stack, against
concurrent release
ExecutorService dispatcher =
this.asyncCallDispatcher;
@@ -924,18 +937,20 @@ private boolean transitionState(ExecutionState
currentState, ExecutionState newS
* @throws IllegalStateException if the {@link Task} is not yet running
*/
public void stopExecution() {
+ // copy reference to stack, to guard against concurrent setting
to null
+ final AbstractInvokable invokable = this.invokable;
+
if (invokable != null) {
- LOG.info("Attempting to stop task {} ({}).",
taskNameWithSubtask, executionId);
if (invokable instanceof StoppableTask) {
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- ((StoppableTask)
invokable).stop();
- } catch (RuntimeException e) {
- LOG.error("Stopping
task {} ({}) failed.", taskNameWithSubtask, executionId, e);
-
taskManagerActions.failTask(executionId, e);
- }
+ LOG.info("Attempting to stop task {} ({}).",
taskNameWithSubtask, executionId);
+ final StoppableTask stoppable = (StoppableTask)
invokable;
+
+ Runnable runnable = () -> {
+ try {
+ stoppable.stop();
+ } catch (Throwable t) {
+ LOG.error("Stopping task {}
({}) failed.", taskNameWithSubtask, executionId, t);
+
taskManagerActions.failTask(executionId, t);
}
};
executeAsyncCallRunnable(runnable,
String.format("Stopping source task %s (%s).", taskNameWithSubtask,
executionId));
@@ -945,7 +960,7 @@ public void run() {
} else {
throw new IllegalStateException(
String.format(
- "Cannot stop task %s (%s) because it is
not yet running.",
+ "Cannot stop task %s (%s) because it is
not running.",
taskNameWithSubtask,
executionId));
}
@@ -1010,6 +1025,10 @@ else if (current == ExecutionState.RUNNING) {
if (transitionState(ExecutionState.RUNNING,
targetState, cause)) {
// we are canceling / failing out of
the running state
// we need to cancel the invokable
+
+ // copy reference to guard against
concurrent null-ing out the reference
+ final AbstractInvokable invokable =
this.invokable;
+
if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
this.failureCause = cause;
notifyObservers(
@@ -1363,9 +1382,9 @@ private void executeAsyncCallRunnable(Runnable runnable,
String callName) {
// Utilities
//
------------------------------------------------------------------------
- private void cancelInvokable() {
+ private void cancelInvokable(AbstractInvokable invokable) {
// in case of an exception during execution, we still call
"cancel()" on the task
- if (invokable != null && this.invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
+ if (invokable != null &&
invokableHasBeenCanceled.compareAndSet(false, true)) {
try {
invokable.cancel();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1829e977489..3dfcfb3b059 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -174,6 +174,7 @@ public void testRegularExecution() {
assertEquals(ExecutionState.FINISHED,
task.getExecutionState());
assertFalse(task.isCanceledOrFailed());
assertNull(task.getFailureCause());
+ assertNull(task.getInvokable());
// verify listener messages
validateListenerMessage(ExecutionState.RUNNING, task,
false);
@@ -202,6 +203,8 @@ public void testCancelRightAway() {
// verify final state
assertEquals(ExecutionState.CANCELED,
task.getExecutionState());
validateUnregisterTask(task.getExecutionId());
+
+ assertNull(task.getInvokable());
}
catch (Exception e) {
e.printStackTrace();
@@ -258,6 +261,8 @@ public void testLibraryCacheRegistrationFailed() {
// make sure that the TaskManager received an message
to unregister the task
validateUnregisterTask(task.getExecutionId());
+
+ assertNull(task.getInvokable());
}
catch (Exception e) {
e.printStackTrace();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Let Task release reference to Invokable on shutdown
> ---------------------------------------------------
>
> Key: FLINK-10033
> URL: https://issues.apache.org/jira/browse/FLINK-10033
> Project: Flink
> Issue Type: Bug
> Components: TaskManager
> Affects Versions: 1.5.2
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> References to Task objects may under some conditions linger longer than for
> the lifetime of the task. For example, in case of local network channels, the
> receiving task may have a reference to the object of the task that produced
> the data.
> To prevent against memory leaks, the Task needs to release all references to
> its AbstractInvokable when it shuts down or cancels.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)