Zakelly commented on code in PR #25310:
URL: https://github.com/apache/flink/pull/25310#discussion_r1758099239


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -125,29 +187,111 @@ public URI getUri() {
         return delegateFS.getUri();
     }
 
+    @Override
+    public boolean exists(final Path f) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
+        if (localPathTuple.f0) {
+            return localFS.exists(localPathTuple.f1);
+        }
+        return delegateFS.exists(f);
+    }
+
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            return localFS.getFileStatus(localPathTuple.f1);
+        }
         return delegateFS.getFileStatus(path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
+        Path path = file.getPath();
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+            return localFS.getFileBlockLocations(localFile, start, len);
+        }
         return delegateFS.getFileBlockLocations(file, start, len);
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        return delegateFS.listStatus(path);
+        FileStatus[] localFiles = new FileStatus[0];
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            localFiles = localFS.listStatus(localPathTuple.f1);
+        }
+        int localFileNum = localFiles == null ? 0 : localFiles.length;
+        FileStatus[] remoteFiles = delegateFS.listStatus(path);
+        if (localFileNum == 0 && remoteFiles == null) {
+            return null;
+        }
+        FileStatus[] fileStatuses = new FileStatus[localFileNum + 
remoteFiles.length];

Review Comment:
   what if `remoteFiles == null` ?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -125,29 +187,111 @@ public URI getUri() {
         return delegateFS.getUri();
     }
 
+    @Override
+    public boolean exists(final Path f) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
+        if (localPathTuple.f0) {
+            return localFS.exists(localPathTuple.f1);
+        }
+        return delegateFS.exists(f);
+    }
+
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            return localFS.getFileStatus(localPathTuple.f1);
+        }
         return delegateFS.getFileStatus(path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
+        Path path = file.getPath();
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+            return localFS.getFileBlockLocations(localFile, start, len);
+        }
         return delegateFS.getFileBlockLocations(file, start, len);
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        return delegateFS.listStatus(path);
+        FileStatus[] localFiles = new FileStatus[0];
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            localFiles = localFS.listStatus(localPathTuple.f1);
+        }
+        int localFileNum = localFiles == null ? 0 : localFiles.length;
+        FileStatus[] remoteFiles = delegateFS.listStatus(path);
+        if (localFileNum == 0 && remoteFiles == null) {
+            return null;
+        }

Review Comment:
   ```suggestion
           if (localFileNum == 0) {
               return remoteFiles;
           }
   ```



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -125,29 +187,111 @@ public URI getUri() {
         return delegateFS.getUri();
     }
 
+    @Override
+    public boolean exists(final Path f) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
+        if (localPathTuple.f0) {
+            return localFS.exists(localPathTuple.f1);
+        }
+        return delegateFS.exists(f);
+    }
+
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            return localFS.getFileStatus(localPathTuple.f1);
+        }
         return delegateFS.getFileStatus(path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
+        Path path = file.getPath();
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+            return localFS.getFileBlockLocations(localFile, start, len);
+        }
         return delegateFS.getFileBlockLocations(file, start, len);
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        return delegateFS.listStatus(path);
+        FileStatus[] localFiles = new FileStatus[0];
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            localFiles = localFS.listStatus(localPathTuple.f1);
+        }
+        int localFileNum = localFiles == null ? 0 : localFiles.length;
+        FileStatus[] remoteFiles = delegateFS.listStatus(path);
+        if (localFileNum == 0 && remoteFiles == null) {
+            return null;
+        }
+        FileStatus[] fileStatuses = new FileStatus[localFileNum + 
remoteFiles.length];
+        for (int index = 0; index < localFileNum; index++) {
+            final FileStatus localFile = localFiles[index];
+            fileStatuses[index] =
+                    new FileStatus() {
+                        @Override
+                        public long getLen() {
+                            return localFile.getLen();
+                        }
+
+                        @Override
+                        public long getBlockSize() {
+                            return localFile.getBlockSize();
+                        }
+
+                        @Override
+                        public short getReplication() {
+                            return localFile.getReplication();
+                        }
+
+                        @Override
+                        public long getModificationTime() {
+                            return localFile.getModificationTime();
+                        }
+
+                        @Override
+                        public long getAccessTime() {
+                            return localFile.getAccessTime();
+                        }
+
+                        @Override
+                        public boolean isDir() {
+                            return localFile.isDir();
+                        }
+
+                        @Override
+                        public Path getPath() {
+                            return new Path(
+                                    remoteBase, 
path.toString().substring(localBase.length()));
+                        }
+                    };
+        }
+        System.arraycopy(remoteFiles, 0, fileStatuses, localFiles.length, 
remoteFiles.length);
+        return fileStatuses;
     }
 
     @Override
     public boolean delete(Path path, boolean recursive) throws IOException {
-        return delegateFS.delete(path, recursive);
+        boolean success = false;
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            success = localFS.delete(localPathTuple.f1, recursive); // delete 
from local
+        }
+        success |= delegateFS.delete(path, recursive); // and delete from
+        return success;
     }
 
     @Override
     public boolean mkdirs(Path path) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            return localFS.mkdirs(localPathTuple.f1);

Review Comment:
   Should we `mkdir` for both local and dfs?



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -101,6 +156,13 @@ public boolean rename(Path src, Path dst) throws 
IOException {
         // The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS 
does not allow a
         // renaming if the target already exists. So, we delete the target 
before attempting the
         // rename.
+
+        if (localFileFilter.apply(src.getName())) {
+            Path localSrc = tryBuildLocalPath(src).f1;
+            Path localDst = tryBuildLocalPath(dst).f1;
+            return localFS.rename(localSrc, localDst);
+        }

Review Comment:
   The `src` might be a directory, which will fail the `localFileFilter.apply` 
but still need to be renamed. 



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -160,4 +304,16 @@ public boolean isDistributedFS() {
     public FileSystemKind getKind() {
         return delegateFS.getKind();
     }
+
+    private Tuple2<Boolean, Path> tryBuildLocalPath(Path path) {
+        String remotePathStr = path.toString();
+        if (localFileFilter.apply(path.getName()) && 
remotePathStr.startsWith(remoteBase)) {

Review Comment:
   `path` might be a directory. I'd suggest not considering the 
`localFileFilter` here



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -125,29 +187,111 @@ public URI getUri() {
         return delegateFS.getUri();
     }
 
+    @Override
+    public boolean exists(final Path f) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f);
+        if (localPathTuple.f0) {
+            return localFS.exists(localPathTuple.f1);
+        }
+        return delegateFS.exists(f);
+    }
+
     @Override
     public FileStatus getFileStatus(Path path) throws IOException {
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            return localFS.getFileStatus(localPathTuple.f1);
+        }
         return delegateFS.getFileStatus(path);
     }
 
     @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len)
             throws IOException {
+        Path path = file.getPath();
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            FileStatus localFile = localFS.getFileStatus(localPathTuple.f1);
+            return localFS.getFileBlockLocations(localFile, start, len);
+        }
         return delegateFS.getFileBlockLocations(file, start, len);
     }
 
     @Override
     public FileStatus[] listStatus(Path path) throws IOException {
-        return delegateFS.listStatus(path);
+        FileStatus[] localFiles = new FileStatus[0];
+        Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path);
+        if (localPathTuple.f0) {
+            localFiles = localFS.listStatus(localPathTuple.f1);
+        }
+        int localFileNum = localFiles == null ? 0 : localFiles.length;
+        FileStatus[] remoteFiles = delegateFS.listStatus(path);
+        if (localFileNum == 0 && remoteFiles == null) {
+            return null;
+        }
+        FileStatus[] fileStatuses = new FileStatus[localFileNum + 
remoteFiles.length];
+        for (int index = 0; index < localFileNum; index++) {
+            final FileStatus localFile = localFiles[index];
+            fileStatuses[index] =
+                    new FileStatus() {
+                        @Override
+                        public long getLen() {
+                            return localFile.getLen();
+                        }
+
+                        @Override
+                        public long getBlockSize() {
+                            return localFile.getBlockSize();
+                        }
+
+                        @Override
+                        public short getReplication() {
+                            return localFile.getReplication();
+                        }
+
+                        @Override
+                        public long getModificationTime() {
+                            return localFile.getModificationTime();
+                        }
+
+                        @Override
+                        public long getAccessTime() {
+                            return localFile.getAccessTime();
+                        }
+
+                        @Override
+                        public boolean isDir() {
+                            return localFile.isDir();
+                        }
+
+                        @Override
+                        public Path getPath() {
+                            return new Path(
+                                    remoteBase, 
path.toString().substring(localBase.length()));
+                        }
+                    };
+        }
+        System.arraycopy(remoteFiles, 0, fileStatuses, localFiles.length, 
remoteFiles.length);

Review Comment:
   ```suggestion
           System.arraycopy(remoteFiles, 0, fileStatuses, localFileNum, 
remoteFiles.length);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to