Zakelly commented on code in PR #25310: URL: https://github.com/apache/flink/pull/25310#discussion_r1758099239
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -125,29 +187,111 @@ public URI getUri() { return delegateFS.getUri(); } + @Override + public boolean exists(final Path f) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f); + if (localPathTuple.f0) { + return localFS.exists(localPathTuple.f1); + } + return delegateFS.exists(f); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + return localFS.getFileStatus(localPathTuple.f1); + } return delegateFS.getFileStatus(path); } @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + Path path = file.getPath(); + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + FileStatus localFile = localFS.getFileStatus(localPathTuple.f1); + return localFS.getFileBlockLocations(localFile, start, len); + } return delegateFS.getFileBlockLocations(file, start, len); } @Override public FileStatus[] listStatus(Path path) throws IOException { - return delegateFS.listStatus(path); + FileStatus[] localFiles = new FileStatus[0]; + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + localFiles = localFS.listStatus(localPathTuple.f1); + } + int localFileNum = localFiles == null ? 0 : localFiles.length; + FileStatus[] remoteFiles = delegateFS.listStatus(path); + if (localFileNum == 0 && remoteFiles == null) { + return null; + } + FileStatus[] fileStatuses = new FileStatus[localFileNum + remoteFiles.length]; Review Comment: what if `remoteFiles == null` ? ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -125,29 +187,111 @@ public URI getUri() { return delegateFS.getUri(); } + @Override + public boolean exists(final Path f) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f); + if (localPathTuple.f0) { + return localFS.exists(localPathTuple.f1); + } + return delegateFS.exists(f); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + return localFS.getFileStatus(localPathTuple.f1); + } return delegateFS.getFileStatus(path); } @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + Path path = file.getPath(); + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + FileStatus localFile = localFS.getFileStatus(localPathTuple.f1); + return localFS.getFileBlockLocations(localFile, start, len); + } return delegateFS.getFileBlockLocations(file, start, len); } @Override public FileStatus[] listStatus(Path path) throws IOException { - return delegateFS.listStatus(path); + FileStatus[] localFiles = new FileStatus[0]; + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + localFiles = localFS.listStatus(localPathTuple.f1); + } + int localFileNum = localFiles == null ? 0 : localFiles.length; + FileStatus[] remoteFiles = delegateFS.listStatus(path); + if (localFileNum == 0 && remoteFiles == null) { + return null; + } Review Comment: ```suggestion if (localFileNum == 0) { return remoteFiles; } ``` ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -125,29 +187,111 @@ public URI getUri() { return delegateFS.getUri(); } + @Override + public boolean exists(final Path f) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f); + if (localPathTuple.f0) { + return localFS.exists(localPathTuple.f1); + } + return delegateFS.exists(f); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + return localFS.getFileStatus(localPathTuple.f1); + } return delegateFS.getFileStatus(path); } @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + Path path = file.getPath(); + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + FileStatus localFile = localFS.getFileStatus(localPathTuple.f1); + return localFS.getFileBlockLocations(localFile, start, len); + } return delegateFS.getFileBlockLocations(file, start, len); } @Override public FileStatus[] listStatus(Path path) throws IOException { - return delegateFS.listStatus(path); + FileStatus[] localFiles = new FileStatus[0]; + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + localFiles = localFS.listStatus(localPathTuple.f1); + } + int localFileNum = localFiles == null ? 0 : localFiles.length; + FileStatus[] remoteFiles = delegateFS.listStatus(path); + if (localFileNum == 0 && remoteFiles == null) { + return null; + } + FileStatus[] fileStatuses = new FileStatus[localFileNum + remoteFiles.length]; + for (int index = 0; index < localFileNum; index++) { + final FileStatus localFile = localFiles[index]; + fileStatuses[index] = + new FileStatus() { + @Override + public long getLen() { + return localFile.getLen(); + } + + @Override + public long getBlockSize() { + return localFile.getBlockSize(); + } + + @Override + public short getReplication() { + return localFile.getReplication(); + } + + @Override + public long getModificationTime() { + return localFile.getModificationTime(); + } + + @Override + public long getAccessTime() { + return localFile.getAccessTime(); + } + + @Override + public boolean isDir() { + return localFile.isDir(); + } + + @Override + public Path getPath() { + return new Path( + remoteBase, path.toString().substring(localBase.length())); + } + }; + } + System.arraycopy(remoteFiles, 0, fileStatuses, localFiles.length, remoteFiles.length); + return fileStatuses; } @Override public boolean delete(Path path, boolean recursive) throws IOException { - return delegateFS.delete(path, recursive); + boolean success = false; + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + success = localFS.delete(localPathTuple.f1, recursive); // delete from local + } + success |= delegateFS.delete(path, recursive); // and delete from + return success; } @Override public boolean mkdirs(Path path) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + return localFS.mkdirs(localPathTuple.f1); Review Comment: Should we `mkdir` for both local and dfs? ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -101,6 +156,13 @@ public boolean rename(Path src, Path dst) throws IOException { // The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS does not allow a // renaming if the target already exists. So, we delete the target before attempting the // rename. + + if (localFileFilter.apply(src.getName())) { + Path localSrc = tryBuildLocalPath(src).f1; + Path localDst = tryBuildLocalPath(dst).f1; + return localFS.rename(localSrc, localDst); + } Review Comment: The `src` might be a directory, which will fail the `localFileFilter.apply` but still need to be renamed. ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -160,4 +304,16 @@ public boolean isDistributedFS() { public FileSystemKind getKind() { return delegateFS.getKind(); } + + private Tuple2<Boolean, Path> tryBuildLocalPath(Path path) { + String remotePathStr = path.toString(); + if (localFileFilter.apply(path.getName()) && remotePathStr.startsWith(remoteBase)) { Review Comment: `path` might be a directory. I'd suggest not considering the `localFileFilter` here ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -125,29 +187,111 @@ public URI getUri() { return delegateFS.getUri(); } + @Override + public boolean exists(final Path f) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(f); + if (localPathTuple.f0) { + return localFS.exists(localPathTuple.f1); + } + return delegateFS.exists(f); + } + @Override public FileStatus getFileStatus(Path path) throws IOException { + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + return localFS.getFileStatus(localPathTuple.f1); + } return delegateFS.getFileStatus(path); } @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + Path path = file.getPath(); + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + FileStatus localFile = localFS.getFileStatus(localPathTuple.f1); + return localFS.getFileBlockLocations(localFile, start, len); + } return delegateFS.getFileBlockLocations(file, start, len); } @Override public FileStatus[] listStatus(Path path) throws IOException { - return delegateFS.listStatus(path); + FileStatus[] localFiles = new FileStatus[0]; + Tuple2<Boolean, Path> localPathTuple = tryBuildLocalPath(path); + if (localPathTuple.f0) { + localFiles = localFS.listStatus(localPathTuple.f1); + } + int localFileNum = localFiles == null ? 0 : localFiles.length; + FileStatus[] remoteFiles = delegateFS.listStatus(path); + if (localFileNum == 0 && remoteFiles == null) { + return null; + } + FileStatus[] fileStatuses = new FileStatus[localFileNum + remoteFiles.length]; + for (int index = 0; index < localFileNum; index++) { + final FileStatus localFile = localFiles[index]; + fileStatuses[index] = + new FileStatus() { + @Override + public long getLen() { + return localFile.getLen(); + } + + @Override + public long getBlockSize() { + return localFile.getBlockSize(); + } + + @Override + public short getReplication() { + return localFile.getReplication(); + } + + @Override + public long getModificationTime() { + return localFile.getModificationTime(); + } + + @Override + public long getAccessTime() { + return localFile.getAccessTime(); + } + + @Override + public boolean isDir() { + return localFile.isDir(); + } + + @Override + public Path getPath() { + return new Path( + remoteBase, path.toString().substring(localBase.length())); + } + }; + } + System.arraycopy(remoteFiles, 0, fileStatuses, localFiles.length, remoteFiles.length); Review Comment: ```suggestion System.arraycopy(remoteFiles, 0, fileStatuses, localFileNum, remoteFiles.length); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org