AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r697712620



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
##########
@@ -254,11 +254,9 @@ private void assignNonFinishedStateToTask(
                     operatorID.getGeneratedOperatorID(), operatorSubtaskState);
         }
 
-        if (!statelessTask) {
-            JobManagerTaskRestore taskRestore =
-                    new JobManagerTaskRestore(restoreCheckpointId, taskState);
-            currentExecutionAttempt.setInitialState(taskRestore);
-        }
+        JobManagerTaskRestore taskRestore =
+                new JobManagerTaskRestore(restoreCheckpointId, taskState);
+        currentExecutionAttempt.setInitialState(taskRestore);

Review comment:
       I adjusted `ExecutionVertex#getPreferredLocationBasedOnState` to also 
check if there is state. That should restore the old behavior. (`empty` on 
fresh starts and stateless tasks) So good catch! 
   
   For `TaskStateManagerImpl`, the `SequentialChannelStateReaderImpl` already 
creates empty state when initialized, so that is effectively unchanged. For 
`getInputRescalingDescriptor`, `getOutputRescalingDescriptor`, and 
`isFinishedOnRestore` the return value is also the same (NO_RESCALE / false). 
`prioritizedOperatorState` returns a different 
`PrioritizedOperatorSubtaskState` where all fields are the same as before 
except the newly added `restoreCheckpointId`. I have not seen anything that 
directly checks for the 
`PrioritizedOperatorSubtaskState.EMPTY_NON_RESTORED_INSTANCE` on equality.
   TL;DR there is no change in `TaskStateManagerImpl`.
   
   Apart from that almost all `@Nullable` remain valid as it is still null if 
the job is started anew. My change is only for stateless tasks on recovery. So 
`Execution#setInitialState` will now be called for every task but only on 
recovery. The field must stay `@Nullable` and with it all depending paths.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to