akalash commented on a change in pull request #16885:
URL: https://github.com/apache/flink/pull/16885#discussion_r693877990



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -760,33 +763,26 @@ private void doRun() {
             // make sure the user code classloader is accessible thread-locally
             
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
 
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+            TaskInvokable finalInvokable = invokable;
             try {
-                // Restore invokable data to the last valid state
-                invokable.restore();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
-            }
+                runWithSystemExitMonitoring(finalInvokable::restore);
 
-            if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
-                throw new CancelTaskException();
-            }
+                if (!transitionState(ExecutionState.INITIALIZING, 
ExecutionState.RUNNING)) {
+                    throw new CancelTaskException();
+                }
 
-            // notify everyone that we switched to running
-            taskManagerActions.updateTaskExecutionState(
-                    new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
+                // notify everyone that we switched to running
+                taskManagerActions.updateTaskExecutionState(
+                        new TaskExecutionState(executionId, 
ExecutionState.RUNNING));
 
-            // Monitor user codes from exiting JVM covering user function 
invocation. This can be
-            // done in a finer-grained way like enclosing user callback 
functions individually,
-            // but as exit triggered by framework is not performed and 
expected in this invoke
-            // function anyhow, we can monitor exiting JVM for entire scope.
-            FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
-            try {
-                // run the invokable
-                invokable.invoke();
-            } finally {
-                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
+                runWithSystemExitMonitoring(finalInvokable::invoke);
+            } catch (Throwable throwable) {
+                if (!(throwable instanceof CancelTaskException)) {

Review comment:
       I don't sure about this condition. According to this code if 
CancelTaskException happens we just ignore it and continue the execution but 
the old implementation always throws exceptions.
   Right now, the logic is pretty confusing - if the problem happens inside of 
`restore` or `invoke` we cancel the task immediately in catch block but if it 
happens between these two calls(during the state transit) we will cancel the 
task a little later in the different catch block. I think it won't be wrong to 
unify this logic and cancel the task here whatever exception happened. WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1378,11 +1389,11 @@ private void declineCheckpoint(
     }
 
     public void notifyCheckpointComplete(final long checkpointID) {
-        final AbstractInvokable invokable = this.invokable;
+        final TaskInvokable invokable = this.invokable;
 
-        if (executionState == ExecutionState.RUNNING && invokable != null) {
+        if (executionState == ExecutionState.RUNNING && invokable instanceof 
CheckpointableTask) {

Review comment:
       as I understand, it is impossible/unexpected that `invokable instanceof 
CheckpointableTask` would be false. So maybe is it better to replace the `if 
condition` with `checkArgument`?(The same question to others `instanceof`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to