Hisoka-X commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182689749


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
     // TODO Synchronous write, asynchronous write can be added in the future
     @Override
     public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+        ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new 
ReentrantLock());

Review Comment:
   why not use `synchronized`?



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java:
##########
@@ -55,60 +60,47 @@ public void initialize(FileSystem fs, Serializer 
serializer) throws IOException
 
     @Override
     public List<IMapFileData> readAllData(Path parentPath) throws IOException {
-        List<String> fileNames = getFileNames(parentPath);
+        List<String> fileNames = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
         if (CollectionUtils.isEmpty(fileNames)) {
             return new ArrayList<>();
         }
-        List<IMapFileData> result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE);
-        for (String fileName : fileNames) {
-            result.addAll(readData(new Path(parentPath, fileName)));
-        }
-        return result;
-    }
 
-    private List<String> getFileNames(Path parentPath) {
-        try {
-            if (!fs.exists(parentPath)) {
-                return new ArrayList<>();
-            }
-            RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator =
-                    fs.listFiles(parentPath, true);
-            List<String> fileNames = new ArrayList<>();
-            while (fileStatusRemoteIterator.hasNext()) {
-                LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
-                if (fileStatus.getPath().getName().endsWith("wal.txt")) {
-                    fileNames.add(fileStatus.getPath().toString());
-                }
-            }
-            return fileNames;
-        } catch (IOException e) {
-            throw new IMapStorageException(e, "get file names error,path is 
s%", parentPath);
-        }
+        List<Path> paths =
+                fileNames.stream()
+                        .map(filename -> new Path(parentPath, filename))
+                        .collect(Collectors.toList());
+        return readData(paths);
     }
 
-    private List<IMapFileData> readData(Path path) throws IOException {
+    public List<IMapFileData> readData(List<Path> paths) throws IOException {

Review Comment:
   ```suggestion
       private List<IMapFileData> readData(List<Path> paths) throws IOException 
{
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to