Zakelly commented on code in PR #25310: URL: https://github.com/apache/flink/pull/25310#discussion_r1753845878
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem { // TODO: make it configurable private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32; + private static final String SST_SUFFIX = ".sst"; + private final FileSystem delegateFS; + private final FileSystem localFS; Review Comment: I'd suggest a class member of local base path, to prevent frequent map accessing. ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem { // TODO: make it configurable private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32; + private static final String SST_SUFFIX = ".sst"; + private final FileSystem delegateFS; + private final FileSystem localFS; + + private static final Map<String, String> remoteLocalMapping = new HashMap<>(); Review Comment: How about moving this to top? And I think this should be a `ConcurrentHashMap` ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem { // TODO: make it configurable private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32; + private static final String SST_SUFFIX = ".sst"; Review Comment: Could this be a `Function<String, Boolean> filter`? I mean a file filter for local path, a member for each instance, and a default static filter for SST. ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java: ########## @@ -104,4 +109,8 @@ public void sync() throws IOException { public void close() throws IOException { originalOutputStream.close(); } + + public Path getRealPath() { Review Comment: Well I think we need this interface in `ForStFlinkFileSystem`. It is useful during checkpoint. ########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ########## @@ -61,6 +69,16 @@ public static FileSystem get(URI uri) throws IOException { return new ForStFlinkFileSystem(FileSystem.get(uri)); } + /** + * Setup local base path for corresponding remote base path. + * + * @param remoteBasePath the remote base path. + * @param localBasePath the local base path. + */ + public static void setupLocalBasePath(String remoteBasePath, String localBasePath) { + remoteLocalMapping.put(remoteBasePath, localBasePath); Review Comment: What if the `remoteBasePath` is local fs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org