[ 
https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568269#comment-16568269
 ] 

ASF GitHub Bot commented on FLINK-10033:
----------------------------------------

asfgit closed pull request #6480: [FLINK-10033] [runtime] Task releases 
reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 60b2ed8fee7..92ae1676ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -78,6 +78,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -243,7 +244,9 @@
        /** atomic flag that makes sure the invokable is canceled exactly once 
upon error. */
        private final AtomicBoolean invokableHasBeenCanceled;
 
-       /** The invokable of this task, if initialized. */
+       /** The invokable of this task, if initialized. All accesses must copy 
the reference and
+        * check for null, as this field is cleared as part of the disposal 
logic. */
+       @Nullable
        private volatile AbstractInvokable invokable;
 
        /** The current execution state of the task. */
@@ -473,6 +476,12 @@ long getTaskCancellationTimeout() {
                return taskCancellationTimeout;
        }
 
+       @Nullable
+       @VisibleForTesting
+       AbstractInvokable getInvokable() {
+               return invokable;
+       }
+
        // 
------------------------------------------------------------------------
        //  Task Execution
        // 
------------------------------------------------------------------------
@@ -762,7 +771,7 @@ else if (current == ExecutionState.CANCELING) {
                                        if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
                                                if (t instanceof 
CancelTaskException) {
                                                        if 
(transitionState(current, ExecutionState.CANCELED)) {
-                                                               
cancelInvokable();
+                                                               
cancelInvokable(invokable);
 
                                                                
notifyObservers(ExecutionState.CANCELED, null);
                                                                break;
@@ -773,7 +782,7 @@ else if (current == ExecutionState.CANCELING) {
                                                                // proper 
failure of the task. record the exception as the root cause
                                                                String 
errorMessage = String.format("Execution of %s (%s) failed.", 
taskNameWithSubtask, executionId);
                                                                failureCause = 
t;
-                                                               
cancelInvokable();
+                                                               
cancelInvokable(invokable);
 
                                                                
notifyObservers(ExecutionState.FAILED, new Exception(errorMessage, t));
                                                                break;
@@ -808,6 +817,10 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
                        try {
                                LOG.info("Freeing task resources for {} ({}).", 
taskNameWithSubtask, executionId);
 
+                               // clear the reference to the invokable. this 
helps guard against holding references
+                               // to the invokable and its structures in cases 
where this Task object is still referenced
+                               this.invokable = null;
+
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
                                ExecutorService dispatcher = 
this.asyncCallDispatcher;
@@ -924,18 +937,20 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState newS
         * @throws IllegalStateException if the {@link Task} is not yet running
         */
        public void stopExecution() {
+               // copy reference to stack, to guard against concurrent setting 
to null
+               final AbstractInvokable invokable = this.invokable;
+
                if (invokable != null) {
-                       LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
                        if (invokable instanceof StoppableTask) {
-                               Runnable runnable = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       ((StoppableTask) 
invokable).stop();
-                                               } catch (RuntimeException e) {
-                                                       LOG.error("Stopping 
task {} ({}) failed.", taskNameWithSubtask, executionId, e);
-                                                       
taskManagerActions.failTask(executionId, e);
-                                               }
+                               LOG.info("Attempting to stop task {} ({}).", 
taskNameWithSubtask, executionId);
+                               final StoppableTask stoppable = (StoppableTask) 
invokable;
+
+                               Runnable runnable = () -> {
+                                       try {
+                                               stoppable.stop();
+                                       } catch (Throwable t) {
+                                               LOG.error("Stopping task {} 
({}) failed.", taskNameWithSubtask, executionId, t);
+                                               
taskManagerActions.failTask(executionId, t);
                                        }
                                };
                                executeAsyncCallRunnable(runnable, 
String.format("Stopping source task %s (%s).", taskNameWithSubtask, 
executionId));
@@ -945,7 +960,7 @@ public void run() {
                } else {
                        throw new IllegalStateException(
                                String.format(
-                                       "Cannot stop task %s (%s) because it is 
not yet running.",
+                                       "Cannot stop task %s (%s) because it is 
not running.",
                                        taskNameWithSubtask,
                                        executionId));
                }
@@ -1010,6 +1025,10 @@ else if (current == ExecutionState.RUNNING) {
                                if (transitionState(ExecutionState.RUNNING, 
targetState, cause)) {
                                        // we are canceling / failing out of 
the running state
                                        // we need to cancel the invokable
+
+                                       // copy reference to guard against 
concurrent null-ing out the reference
+                                       final AbstractInvokable invokable = 
this.invokable;
+
                                        if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                                                this.failureCause = cause;
                                                notifyObservers(
@@ -1363,9 +1382,9 @@ private void executeAsyncCallRunnable(Runnable runnable, 
String callName) {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private void cancelInvokable() {
+       private void cancelInvokable(AbstractInvokable invokable) {
                // in case of an exception during execution, we still call 
"cancel()" on the task
-               if (invokable != null && this.invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
+               if (invokable != null && 
invokableHasBeenCanceled.compareAndSet(false, true)) {
                        try {
                                invokable.cancel();
                        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1829e977489..3dfcfb3b059 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -174,6 +174,7 @@ public void testRegularExecution() {
                        assertEquals(ExecutionState.FINISHED, 
task.getExecutionState());
                        assertFalse(task.isCanceledOrFailed());
                        assertNull(task.getFailureCause());
+                       assertNull(task.getInvokable());
 
                        // verify listener messages
                        validateListenerMessage(ExecutionState.RUNNING, task, 
false);
@@ -202,6 +203,8 @@ public void testCancelRightAway() {
                        // verify final state
                        assertEquals(ExecutionState.CANCELED, 
task.getExecutionState());
                        validateUnregisterTask(task.getExecutionId());
+
+                       assertNull(task.getInvokable());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -258,6 +261,8 @@ public void testLibraryCacheRegistrationFailed() {
 
                        // make sure that the TaskManager received an message 
to unregister the task
                        validateUnregisterTask(task.getExecutionId());
+
+                       assertNull(task.getInvokable());
                }
                catch (Exception e) {
                        e.printStackTrace();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Let Task release reference to Invokable on shutdown
> ---------------------------------------------------
>
>                 Key: FLINK-10033
>                 URL: https://issues.apache.org/jira/browse/FLINK-10033
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.5.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.3, 1.6.0
>
>
> References to Task objects may under some conditions linger longer than for 
> the lifetime of the task. For example, in case of local network channels, the 
> receiving task may have a reference to the object of the task that produced 
> the data.
> To prevent against memory leaks, the Task needs to release all references to 
> its AbstractInvokable when it shuts down or cancels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to