This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new f396e6da3d2 [FLINK-37367][state/forst] Make ForSt inherit uploaded SST 
files after restorations. (#26259)
f396e6da3d2 is described below

commit f396e6da3d23e31d987ae7fdb0f4acbc1661a1b9
Author: AlexYinHan <[email protected]>
AuthorDate: Thu Mar 6 19:46:07 2025 +0800

    [FLINK-37367][state/forst] Make ForSt inherit uploaded SST files after 
restorations. (#26259)
---
 .../apache/flink/state/forst/ForStKeyedStateBackendBuilder.java   | 8 +++++++-
 .../flink/state/forst/datatransfer/CopyDataTransferStrategy.java  | 5 -----
 .../flink/state/forst/fs/filemapping/FileMappingManager.java      | 4 ++++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 67aa91371c2..25649fcef7f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -263,10 +263,16 @@ public class ForStKeyedStateBackendBuilder<K>
             defaultColumnFamilyHandle = 
restoreResult.getDefaultColumnFamilyHandle();
             nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
 
-            // TODO: init materializedSstFiles and lastCompletedCheckpointId 
when implement restore
             SortedMap<Long, 
Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>
                     materializedSstFiles = new TreeMap<>();
             long lastCompletedCheckpointId = -1L;
+            if (restoreOperation instanceof ForStIncrementalRestoreOperation) {
+                backendUID = restoreResult.getBackendUID();
+                lastCompletedCheckpointId = 
restoreResult.getLastCompletedCheckpointId();
+                if (recoveryClaimMode != RecoveryClaimMode.NO_CLAIM) {
+                    materializedSstFiles = restoreResult.getRestoredSstFiles();
+                }
+            }
 
             snapshotStrategy =
                     initializeSnapshotStrategy(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
index 860cbbc0234..6686ba41878 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/CopyDataTransferStrategy.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
-import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
 import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -115,10 +114,6 @@ public class CopyDataTransferStrategy extends 
DataTransferStrategy {
                     ((ForStFlinkFileSystem) 
dbFileSystem).getMappingEntry(dbFilePath);
             Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + 
dbFilePath);
             sourceStateHandle = mappingEntry.getSource().toStateHandle();
-            if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
-                // The file is already owned by JM, simply return the state 
handle
-                return HandleAndLocalPath.of(sourceStateHandle, 
dbFilePath.getName());
-            }
         } else {
             // Construct a FileStateHandle base on the DB file
             FileSystem sourceFileSystem = dbFilePath.getFileSystem();
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
index 7d5544e8f8d..1e8cea5e58f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/filemapping/FileMappingManager.java
@@ -165,6 +165,10 @@ public class FileMappingManager {
      * @return always return true except for IOException
      */
     public boolean renameFile(String src, String dst) throws IOException {
+        if (src.equals(dst)) {
+            return true;
+        }
+
         MappingEntry srcEntry = mappingTable.get(src);
         if (srcEntry != null) { // rename file
             if (mappingTable.containsKey(dst)) {

Reply via email to