akalash commented on a change in pull request #18092:
URL: https://github.com/apache/flink/pull/18092#discussion_r770415935
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1200,48 +1200,63 @@ public boolean receiveAcknowledgeMessage(
*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
- final long checkpointId = pendingCheckpoint.getCheckpointId();
+ final long checkpointId = pendingCheckpoint.getCheckpointID();
final CompletedCheckpoint completedCheckpoint;
final CompletedCheckpoint lastSubsumed;
+ final CheckpointProperties props = pendingCheckpoint.getProps();
// As a first step to complete the checkpoint, we register its state
with the registry
- Map<OperatorID, OperatorState> operatorStates =
pendingCheckpoint.getOperatorStates();
- SharedStateRegistry sharedStateRegistry =
completedCheckpointStore.getSharedStateRegistry();
- sharedStateRegistry.registerAll(operatorStates.values());
+ // we do not
+ if (!props.isSavepoint()) {
Review comment:
Do I understand correctly that technically we should check
`props.isSynchronous()` here as well but we don't do it because we know that
job will be stopped after this checkpoint? is it correct?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1200,92 +1200,69 @@ public boolean receiveAcknowledgeMessage(
*/
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
throws CheckpointException {
- final long checkpointId = pendingCheckpoint.getCheckpointId();
+ final long checkpointId = pendingCheckpoint.getCheckpointID();
final CompletedCheckpoint completedCheckpoint;
+ final CompletedCheckpoint lastSubsumed;
+ final CheckpointProperties props = pendingCheckpoint.getProps();
// As a first step to complete the checkpoint, we register its state
with the registry
- Map<OperatorID, OperatorState> operatorStates =
pendingCheckpoint.getOperatorStates();
- SharedStateRegistry sharedStateRegistry =
completedCheckpointStore.getSharedStateRegistry();
- sharedStateRegistry.registerAll(operatorStates.values());
-
- long lastSubsumedCheckpointId =
CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+ // we do not
Review comment:
I don't get the meaning of this comment. Is it unfinished yet?
##########
File path: docs/content/docs/ops/state/savepoints.md
##########
@@ -130,9 +130,23 @@ Unlike savepoints, checkpoints cannot generally be moved
to a different location
If you use `JobManagerCheckpointStorage`, metadata *and* savepoint state will
be stored in the `_metadata` file, so don't be confused by the absence of
additional data files.
-{{< hint warning >}}
-It is discouraged to move or delete the last savepoint of a running job,
because this might interfere with failure-recovery. Savepoints have
side-effects on exactly-once sinks, therefore
-to ensure exactly-once semantics, if there is no checkpoint after the last
savepoint, the savepoint will be used for recovery.
+{{< hint warning >}}
+Starting from Flink 1.15 intermediate savepoints (savepoints other than
+created with [stop-with-savepoint](#stopping-a-job-with-savepoint)) are not
used for recovery and do
+not commit any side effects.
+
+This has to be taken into consideration, especially when running multiple jobs
in the same
+checkpointing timeline. It is possible in that solution that if the original
job (after taking a
+savepoint) fails, then it will fall back to a checkpoint prior to the
savepoint. However, if we now
+resume a job from the savepoint, then we might commit transactions that
might’ve never happened
+because of falling back to a checkpoint before the savepoint (assuming
non-determinism).
Review comment:
Do I understand correctly that right now recovering from manual
savepoint doesn't guarantee the correctness of restored data/transactions? If
so, what is the main value to have such a manual savepoint for users?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]