rkhachatryan commented on a change in pull request #16885:
URL: https://github.com/apache/flink/pull/16885#discussion_r694061759



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -760,33 +763,26 @@ private void doRun() {
             // make sure the user code classloader is accessible thread-locally
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+            TaskInvokable finalInvokable = invokable;
             try {
-                // Restore invokable data to the last valid state
-                invokable.restore();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
-            }
+                runWithSystemExitMonitoring(finalInvokable::restore);
 
-            if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
-                throw new CancelTaskException();
-            }
+                if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
+                    throw new CancelTaskException();
+                }
 
-            // notify everyone that we switched to running
-            taskManagerActions.updateTaskExecutionState(
-                    new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
+                // notify everyone that we switched to running
+                taskManagerActions.updateTaskExecutionState(
+                        new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
 
-            // Monitor user codes from exiting JVM covering user function 
invocation. This can be
-            // done in a finer-grained way like enclosing user callback 
functions individually,
-            // but as exit triggered by framework is not performed and 
expected in this invoke
-            // function anyhow, we can monitor exiting JVM for entire scope.
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
-            try {
-                // run the invokable
-                invokable.invoke();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
+                runWithSystemExitMonitoring(finalInvokable::invoke);
+            } catch (Throwable throwable) {
+                if (!(throwable instanceof CancelTaskException)) {

Review comment:
       Good points, I addressed this in 
6843a959b16a601a3db3247eaa77e46e05b0f900 and 
4d64111967e37e0333e0b7310eb1108d6c04679a.
   
   So `cancelTask` is called unconditionally now from `cleanUp`.
   (I still left a separate `invokable.cancel()` call in Task when 
transitioning state or cancelling externally because even if we unify it for 
`StreamTask`, other Invokables can have diffferent logic in `cancel`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to