Zakelly commented on code in PR #25924:
URL: https://github.com/apache/flink/pull/25924#discussion_r1920000218
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,45 @@ public ForStResourceContainer(
this.metricGroup = metricGroup;
}
+ private static boolean isDbPathUnderCheckpointPath(
+ @Nullable CheckpointStorageAccess checkpointStorageAccess,
+ @Nullable Path dbRemotePath) {
+ if (dbRemotePath == null) {
+ return false;
+ }
+
+ // For checkpoint other that FsCheckpointStorageAccess, we treat it as
'DB path not under
+ // checkpoint path', since we cannot reuse checkpoint files in such
case.
+ // todo: Support enabling 'cp file reuse' with
FsMergingCheckpointStorageAccess
+ if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+ || checkpointStorageAccess instanceof
FsMergingCheckpointStorageAccess) {
+ return false;
+ }
+
+ FsCheckpointStorageAccess fsCheckpointStorageAccess =
+ (FsCheckpointStorageAccess) checkpointStorageAccess;
+ FileSystem checkpointFS = fsCheckpointStorageAccess.getFileSystem();
+ FileSystem dbRemoteFS;
+ try {
+ dbRemoteFS = dbRemotePath.getFileSystem();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to get FileSystem from dbRemotePath: " +
dbRemotePath, e);
+ }
+
+ if (!checkpointFS.getUri().equals(dbRemoteFS.getUri())) {
Review Comment:
```suggestion
if (!checkpointFS.equals(dbRemoteFS))) {
```
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,44 @@ public ForStResourceContainer(
this.metricGroup = metricGroup;
}
+ private static boolean isDbPathUnderCheckpointPath(
+ @Nullable CheckpointStorageAccess checkpointStorageAccess,
+ @Nullable Path dbRemotePath) {
+ if (dbRemotePath == null) {
+ return false;
+ }
+
+ // For checkpoint other that FsCheckpointStorageAccess, we treat it as
'DB path not under
+ // checkpoint path', since we cannot reuse checkpoint files in such
case.
+ if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+ || checkpointStorageAccess instanceof
FsMergingCheckpointStorageAccess) {
Review Comment:
```suggestion
if (checkpointStorageAccess == null
|| checkpointStorageAccess.getClass() !=
FsCheckpointStorageAccess.class) {
```
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java:
##########
@@ -167,6 +213,44 @@ public ForStResourceContainer(
this.metricGroup = metricGroup;
}
+ private static boolean isDbPathUnderCheckpointPath(
+ @Nullable CheckpointStorageAccess checkpointStorageAccess,
+ @Nullable Path dbRemotePath) {
+ if (dbRemotePath == null) {
+ return false;
+ }
+
+ // For checkpoint other that FsCheckpointStorageAccess, we treat it as
'DB path not under
+ // checkpoint path', since we cannot reuse checkpoint files in such
case.
+ if (!(checkpointStorageAccess instanceof FsCheckpointStorageAccess)
+ || checkpointStorageAccess instanceof
FsMergingCheckpointStorageAccess) {
Review Comment:
And I suggest a TODO for file merging support
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]