akalash commented on a change in pull request #18092:
URL: https://github.com/apache/flink/pull/18092#discussion_r770415935



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1200,48 +1200,63 @@ public boolean receiveAcknowledgeMessage(
      */
     private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
             throws CheckpointException {
-        final long checkpointId = pendingCheckpoint.getCheckpointId();
+        final long checkpointId = pendingCheckpoint.getCheckpointID();
         final CompletedCheckpoint completedCheckpoint;
         final CompletedCheckpoint lastSubsumed;
+        final CheckpointProperties props = pendingCheckpoint.getProps();
 
         // As a first step to complete the checkpoint, we register its state 
with the registry
-        Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
-        SharedStateRegistry sharedStateRegistry = 
completedCheckpointStore.getSharedStateRegistry();
-        sharedStateRegistry.registerAll(operatorStates.values());
+        // we do not
+        if (!props.isSavepoint()) {

Review comment:
       Do I understand correctly that technically we should check 
`props.isSynchronous()` here as well but we don't do it because we know that 
job will be stopped after this checkpoint? is it correct?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1200,92 +1200,69 @@ public boolean receiveAcknowledgeMessage(
      */
     private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
             throws CheckpointException {
-        final long checkpointId = pendingCheckpoint.getCheckpointId();
+        final long checkpointId = pendingCheckpoint.getCheckpointID();
         final CompletedCheckpoint completedCheckpoint;
+        final CompletedCheckpoint lastSubsumed;
+        final CheckpointProperties props = pendingCheckpoint.getProps();
 
         // As a first step to complete the checkpoint, we register its state 
with the registry
-        Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
-        SharedStateRegistry sharedStateRegistry = 
completedCheckpointStore.getSharedStateRegistry();
-        sharedStateRegistry.registerAll(operatorStates.values());
-
-        long lastSubsumedCheckpointId = 
CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+        // we do not

Review comment:
       I don't get the meaning of this comment. Is it unfinished yet?

##########
File path: docs/content/docs/ops/state/savepoints.md
##########
@@ -130,9 +130,23 @@ Unlike savepoints, checkpoints cannot generally be moved 
to a different location
 
 If you use `JobManagerCheckpointStorage`, metadata *and* savepoint state will 
be stored in the `_metadata` file, so don't be confused by the absence of 
additional data files.
 
-{{< hint warning  >}}
-It is discouraged to move or delete the last savepoint of a running job, 
because this might interfere with failure-recovery. Savepoints have 
side-effects on exactly-once sinks, therefore 
-to ensure exactly-once semantics, if there is no checkpoint after the last 
savepoint, the savepoint will be used for recovery. 
+{{< hint warning  >}} 
+Starting from Flink 1.15 intermediate savepoints (savepoints other than
+created with [stop-with-savepoint](#stopping-a-job-with-savepoint)) are not 
used for recovery and do
+not commit any side effects.
+
+This has to be taken into consideration, especially when running multiple jobs 
in the same
+checkpointing timeline. It is possible in that solution that if the original 
job (after taking a
+savepoint) fails, then it will fall back to a checkpoint prior to the 
savepoint. However, if we now
+resume a job from the savepoint, then we might commit transactions that 
might’ve never happened
+because of falling back to a checkpoint before the savepoint (assuming 
non-determinism).

Review comment:
       Do I understand correctly that right now recovering from manual 
savepoint doesn't guarantee the correctness of restored data/transactions? If 
so, what is the main value to have such a manual savepoint for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to