Zakelly commented on code in PR #25924:
URL: https://github.com/apache/flink/pull/25924#discussion_r1920000218


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,45 @@ public ForStResourceContainer(
         this.metricGroup = metricGroup;
     }
 
+    private static boolean isDbPathUnderCheckpointPath(
+            @Nullable CheckpointStorageAccess checkpointStorageAccess,
+            @Nullable Path dbRemotePath) {
+        if (dbRemotePath == null) {
+            return false;
+        }
+
+        // For checkpoint other that FsCheckpointStorageAccess, we treat it as 
'DB path not under
+        // checkpoint path', since we cannot reuse checkpoint files in such 
case.
+        // todo: Support enabling 'cp file reuse' with 
FsMergingCheckpointStorageAccess
+        if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+                || checkpointStorageAccess instanceof 
FsMergingCheckpointStorageAccess) {
+            return false;
+        }
+
+        FsCheckpointStorageAccess fsCheckpointStorageAccess =
+                (FsCheckpointStorageAccess) checkpointStorageAccess;
+        FileSystem checkpointFS = fsCheckpointStorageAccess.getFileSystem();
+        FileSystem dbRemoteFS;
+        try {
+            dbRemoteFS = dbRemotePath.getFileSystem();
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Failed to get FileSystem from dbRemotePath: " + 
dbRemotePath, e);
+        }
+
+        if (!checkpointFS.getUri().equals(dbRemoteFS.getUri())) {

Review Comment:
   ```suggestion
           if (!checkpointFS.equals(dbRemoteFS))) {
   ```



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,44 @@ public ForStResourceContainer(
         this.metricGroup = metricGroup;
     }
 
+    private static boolean isDbPathUnderCheckpointPath(
+            @Nullable CheckpointStorageAccess checkpointStorageAccess,
+            @Nullable Path dbRemotePath) {
+        if (dbRemotePath == null) {
+            return false;
+        }
+
+        // For checkpoint other that FsCheckpointStorageAccess, we treat it as 
'DB path not under
+        // checkpoint path', since we cannot reuse checkpoint files in such 
case.
+        if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+                || checkpointStorageAccess instanceof 
FsMergingCheckpointStorageAccess) {

Review Comment:
   ```suggestion
           if (checkpointStorageAccess == null 
                   || checkpointStorageAccess.getClass() != 
FsCheckpointStorageAccess.class) {
   ```



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,44 @@ public ForStResourceContainer(
         this.metricGroup = metricGroup;
     }
 
+    private static boolean isDbPathUnderCheckpointPath(
+            @Nullable CheckpointStorageAccess checkpointStorageAccess,
+            @Nullable Path dbRemotePath) {
+        if (dbRemotePath == null) {
+            return false;
+        }
+
+        // For checkpoint other that FsCheckpointStorageAccess, we treat it as 
'DB path not under
+        // checkpoint path', since we cannot reuse checkpoint files in such 
case.
+        if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+                || checkpointStorageAccess instanceof 
FsMergingCheckpointStorageAccess) {

Review Comment:
   And I suggest a TODO for file merging support



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to