Zakelly commented on code in PR #25310:
URL: https://github.com/apache/flink/pull/25310#discussion_r1753845878


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem {
     // TODO: make it configurable
     private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32;
 
+    private static final String SST_SUFFIX = ".sst";
+
     private final FileSystem delegateFS;
+    private final FileSystem localFS;

Review Comment:
   I'd suggest a class member of local base path, to prevent frequent map 
accessing.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem {
     // TODO: make it configurable
     private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32;
 
+    private static final String SST_SUFFIX = ".sst";
+
     private final FileSystem delegateFS;
+    private final FileSystem localFS;
+
+    private static final Map<String, String> remoteLocalMapping = new 
HashMap<>();

Review Comment:
   How about moving this to top? And I think this should be a 
`ConcurrentHashMap`



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -42,10 +44,16 @@ public class ForStFlinkFileSystem extends FileSystem {
     // TODO: make it configurable
     private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32;
 
+    private static final String SST_SUFFIX = ".sst";

Review Comment:
   Could this be a `Function<String, Boolean> filter`? I mean a file filter for 
local path, a member for each instance, and a default static filter for SST.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java:
##########
@@ -104,4 +109,8 @@ public void sync() throws IOException {
     public void close() throws IOException {
         originalOutputStream.close();
     }
+
+    public Path getRealPath() {

Review Comment:
   Well I think we need this interface in `ForStFlinkFileSystem`. It is useful 
during checkpoint.



##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java:
##########
@@ -61,6 +69,16 @@ public static FileSystem get(URI uri) throws IOException {
         return new ForStFlinkFileSystem(FileSystem.get(uri));
     }
 
+    /**
+     * Setup local base path for corresponding remote base path.
+     *
+     * @param remoteBasePath the remote base path.
+     * @param localBasePath the local base path.
+     */
+    public static void setupLocalBasePath(String remoteBasePath, String 
localBasePath) {
+        remoteLocalMapping.put(remoteBasePath, localBasePath);

Review Comment:
   What if the `remoteBasePath` is local fs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to