fredia commented on code in PR #22744:
URL: https://github.com/apache/flink/pull/22744#discussion_r1230394122


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -375,6 +378,49 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot(
             @Nonnull CheckpointStreamFactory streamFactory,
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
+
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+                    
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+            if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+                // For NO_SHARING native savepoint, trigger delegated one
+                RunnableFuture<SnapshotResult<KeyedStateHandle>> 
delegatedSnapshotResult =
+                        keyedStateBackend.snapshot(
+                                checkpointId, timestamp, streamFactory, 
checkpointOptions);
+                return new FutureTask<SnapshotResult<KeyedStateHandle>>(
+                        () -> {
+                            SnapshotResult<KeyedStateHandle> result =
+                                    
FutureUtils.runIfNotDoneAndGet(delegatedSnapshotResult);
+                            return castSnapshotResult(
+                                    buildSnapshotResult(
+                                            checkpointId,
+                                            SnapshotResult.empty(),
+                                            new ChangelogSnapshotState(
+                                                    
getMaterializedResult(result))));
+                        }) {
+                    @Override
+                    public boolean cancel(boolean mayInterruptIfRunning) {
+                        return 
delegatedSnapshotResult.cancel(mayInterruptIfRunning)
+                                && super.cancel(mayInterruptIfRunning);
+                    }
+
+                    @Override
+                    public boolean isCancelled() {
+                        return delegatedSnapshotResult.isCancelled() && 
super.isCancelled();
+                    }
+
+                    @Override
+                    public boolean isDone() {
+                        return delegatedSnapshotResult.isDone() && 
super.isDone();
+                    }
+                };
+            } else {
+                throw new UnsupportedOperationException(

Review Comment:
   Does changelog state backed support `SharingFilesStrategy.FORWARD` in 
no-claim mode?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -375,6 +378,49 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot(
             @Nonnull CheckpointStreamFactory streamFactory,
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
+
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+                    
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+            if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+                // For NO_SHARING native savepoint, trigger delegated one
+                RunnableFuture<SnapshotResult<KeyedStateHandle>> 
delegatedSnapshotResult =
+                        keyedStateBackend.snapshot(
+                                checkpointId, timestamp, streamFactory, 
checkpointOptions);

Review Comment:
   The `streamFactory` of changelog state backend is `taskowned`, will native 
savepoint be stored in a separate path here?



##########
flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java:
##########
@@ -169,18 +160,32 @@ private String tryCheckpointAndStop(JobGraph jobGraph) 
throws Exception {
         }
     }
 
-    private void restoreAndValidate(String location) {
+    private void restoreAndValidate(String location) throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableChangelogStateBackend(testCase.restoreWithChangelog);
         JobGraph jobGraph = addGraph(env);
         jobGraph.setSavepointRestoreSettings(forPath(location));
 
-        if (tryRun(jobGraph) != testCase.allowRestore) {

Review Comment:
   I think `tryRun(jobGraph) != testCase.allowRestore` contains two case of:
   `restored && !testCase.allowRestore` and `!restored && 
testCase.allowRestore`. 
   why split it into two.



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -984,6 +1030,18 @@ private class ChangelogSnapshotState {
         /** ID of this materialization corresponding to the nested backend 
checkpoint ID. */
         private final long materializationID;
 
+        /**
+         * Construct a ChangelogSnapshotState with empty non-materialized 
part, which could be used
+         * when triggering manual materialization.
+         */
+        public ChangelogSnapshotState(List<KeyedStateHandle> 
materializedSnapshot) {
+            this(
+                    materializedSnapshot,
+                    Collections.emptyList(),
+                    SequenceNumber.of(Long.MAX_VALUE),
+                    -1);

Review Comment:
   nit: replace -1 with a `static final` variable. 



##########
flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java:
##########
@@ -92,12 +91,18 @@ public static List<TestCase> parameters() {
                         .restoreWithChangelog(true)
                         .from(RestoreSource.CANONICAL_SAVEPOINT)
                         .allowRestore(true),
-                // taking native savepoints is not supported with changelog
                 TestCase.startWithChangelog(true)
                         .restoreWithChangelog(true)
                         .from(RestoreSource.NATIVE_SAVEPOINT)
-                        .allowSave(false)
-                        .allowRestore(false),
+                        .allowRestore(true),
+                TestCase.startWithChangelog(false)

Review Comment:
   nit: add comments to illustrate the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to