pnowojski commented on a change in pull request #16885:
URL: https://github.com/apache/flink/pull/16885#discussion_r693983236



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -760,33 +763,26 @@ private void doRun() {
             // make sure the user code classloader is accessible thread-locally
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+            TaskInvokable finalInvokable = invokable;
             try {
-                // Restore invokable data to the last valid state
-                invokable.restore();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
-            }
+                runWithSystemExitMonitoring(finalInvokable::restore);
 
-            if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
-                throw new CancelTaskException();
-            }
+                if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
+                    throw new CancelTaskException();
+                }
 
-            // notify everyone that we switched to running
-            taskManagerActions.updateTaskExecutionState(
-                    new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
+                // notify everyone that we switched to running
+                taskManagerActions.updateTaskExecutionState(
+                        new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
 
-            // Monitor user codes from exiting JVM covering user function 
invocation. This can be
-            // done in a finer-grained way like enclosing user callback 
functions individually,
-            // but as exit triggered by framework is not performed and 
expected in this invoke
-            // function anyhow, we can monitor exiting JVM for entire scope.
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
-            try {
-                // run the invokable
-                invokable.invoke();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
+                runWithSystemExitMonitoring(finalInvokable::invoke);
+            } catch (Throwable throwable) {
+                if (!(throwable instanceof CancelTaskException)) {

Review comment:
       It looks strange to me as well.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -762,17 +762,25 @@ private void doRun() {
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
             AbstractInvokable finalInvokable = invokable;
-            runWithSystemExitMonitoring(finalInvokable::restore);
+            try {

Review comment:
       Can you extract at least some of this code to smaller methods? `doRun()` 
is already too large, and you are making it even larger. (you don't have to fix 
all of it (would be great if you want to ;) ), but at least some of it

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -762,17 +762,21 @@ private void doRun() {
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
             AbstractInvokable finalInvokable = invokable;
-            runWithSystemExitMonitoring(finalInvokable::restore);
+            try {
+                runWithSystemExitMonitoring(finalInvokable::restore);
 
-            if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
-                throw new CancelTaskException();
-            }
+                if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
+                    throw new CancelTaskException();
+                }
 
-            // notify everyone that we switched to running
-            taskManagerActions.updateTaskExecutionState(
-                    new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
+                // notify everyone that we switched to running
+                taskManagerActions.updateTaskExecutionState(
+                        new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
 
-            runWithSystemExitMonitoring(finalInvokable::invoke);
+                runWithSystemExitMonitoring(finalInvokable::invoke);
+            } finally {
+                runWithSystemExitMonitoring(finalInvokable::cleanUp);
+            }

Review comment:
       I would merge this to both branches. It doesn't seems as a very risky 
refactor. But that's a question to the release managers. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to