rkhachatryan commented on a change in pull request #18086:
URL: https://github.com/apache/flink/pull/18086#discussion_r768430372
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1271,6 +1273,18 @@ private boolean performCheckpoint(
}
}
+ private void checkForcedFullSnapshotSupport(CheckpointOptions
checkpointOptions) {
+ if (checkpointOptions.isForcedFullSnapshot()
+ && !stateBackend.supportsForcedFullSnapshot()) {
+ throw new IllegalStateException(
Review comment:
I think it makes sense to check backend support ASAP, maybe in
`StreamingJobGraphGenerator`. All the info should already be available there.
However, it can also be added later as an optimization.
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1664,10 +1675,13 @@ public boolean restoreSavepoint(
allowNonRestored,
checkpointProperties);
- // register shared state - even before adding the checkpoint to the
store
- // because the latter might trigger subsumption so the ref counts must
be up-to-date
- savepoint.registerSharedStatesAfterRestored(
- completedCheckpointStore.getSharedStateRegistry());
+ // in claim mode we should not register any shared handles
+ if (restoreSettings.getRestoreMode() != RestoreMode.NO_CLAIM) {
+ // register shared state - even before adding the checkpoint to
the store
+ // because the latter might trigger subsumption so the ref counts
must be up-to-date
+ savepoint.registerSharedStatesAfterRestored(
Review comment:
Shouldn't we add the same check to `SharedStateRegistryFactory.create`
to prevent registering shared states as well? Otherwise, they'll be discarded
on subsumption.
(I'm not sure whether we should)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
##########
@@ -172,4 +174,19 @@ OperatorStateBackend createOperatorStateBackend(
default boolean useManagedMemory() {
return false;
}
+
+ /**
+ * Tells if a state backend supports taking forced full snapshots. Forced
full snapshots are
+ * snapshots that do not share files with previous snapshots. The feature
is necessary to
+ * properly support {@link RestoreMode#NO_CLAIM} mode.
+ *
+ * <p>If a state backend supports forced full snapshots, it should create
an independent
+ * snapshot when it receives {@link
CheckpointOptions#isForcedFullSnapshot()} request in {@link
+ * Snapshotable#snapshot(long, long, CheckpointStreamFactory,
CheckpointOptions)}.
+ *
+ * @return If the state backend supports taking forced full snapshots.
+ */
+ default boolean supportsForcedFullSnapshot() {
Review comment:
I wonder whether "full snapshot" actually reflects what the backend is
required to do.
It implies that a backend has incremental checkpoints, and now it must
perform a full checkpoint.
But in fact, it must re-upload (probably still incremental) state - that's
what you did in `RocksIncrementalSnapshotStrategy`.
So maybe `supportsForceReupload` would be better, WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -875,6 +890,21 @@ public JobExecutionResult executeJobBlocking(JobGraph job)
(Acknowledge ignored) -> new
JobSubmissionResult(jobGraph.getJobID()));
}
+ // HACK: temporary hack to make the randomized changelog state backend
tests work with forced
+ // full snapshots. This option should be removed once changelog state
backend supports forced
+ // full snapshots
+ private void checkRestoreModeForRandomizedChangelogStateBackend(JobGraph
jobGraph) {
+ final SavepointRestoreSettings savepointRestoreSettings =
+ jobGraph.getSavepointRestoreSettings();
+ if (overrideRestoreModeForRandomizedChangelogStateBackend
+ && savepointRestoreSettings.getRestoreMode() ==
RestoreMode.NO_CLAIM) {
+ final Configuration conf = new Configuration();
+ SavepointRestoreSettings.toConfiguration(savepointRestoreSettings,
conf);
+ conf.set(SavepointConfigOptions.RESTORE_MODE, RestoreMode.LEGACY);
Review comment:
If changelog was enabled with `STATE_CHANGE_LOG_CONFIG_RAND` and
`NO_CLAIM` was set deliberately in some ITCase;
then shouldn't it be the opposite: disable changelog if `RestoreMode ==
NO_CLAIM`?
Do I understand correctly, that:
1. for NO_CLAIM mode, there is/will be no randomization
1. the failing test is
`SavepointITCase#testTriggerSavepointAndResumeWithNoClaim`
1. you don't want to disable changelog explicitly in test there because
there will be more such tests?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -135,7 +135,9 @@ public void triggerCheckpoint(long checkpointId) throws
FlinkException {
CheckpointStorageLocationReference.getDefault(),
configuration.isExactlyOnceCheckpointMode(),
configuration.isUnalignedCheckpointsEnabled(),
-
configuration.getAlignedCheckpointTimeout().toMillis());
+
configuration.getAlignedCheckpointTimeout().toMillis(),
+ false /* TODO: we do not support
no-claim mode with
+ externally induced sources
for now */);
Review comment:
I couldn't quickly find any information why it's not supported. Could
you maybe add it here or to the FLIP?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1650,6 +1657,10 @@ public boolean restoreSavepoint(
case LEGACY:
checkpointProperties =
CheckpointProperties.forSavepoint(false);
break;
+ case NO_CLAIM:
+ this.forceFullSnapshot = true;
Review comment:
Wouldn't JM lose this info (`forceFullSnapshot = true`) in case of
failover before completing the 1st checkpoint?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]