Hisoka-X commented on code in PR #9524: URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182689749
########## seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java: ########## @@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) { // TODO Synchronous write, asynchronous write can be added in the future @Override public void write(IMapFileData data) throws IOException { - byte[] bytes = serializer.serialize(data); - this.write(bytes); + ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new ReentrantLock()); Review Comment: why not use `synchronized`? ########## seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java: ########## @@ -55,60 +60,47 @@ public void initialize(FileSystem fs, Serializer serializer) throws IOException @Override public List<IMapFileData> readAllData(Path parentPath) throws IOException { - List<String> fileNames = getFileNames(parentPath); + List<String> fileNames = WALDataUtils.getDataFiles(fs, parentPath, FILE_NAME); if (CollectionUtils.isEmpty(fileNames)) { return new ArrayList<>(); } - List<IMapFileData> result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE); - for (String fileName : fileNames) { - result.addAll(readData(new Path(parentPath, fileName))); - } - return result; - } - private List<String> getFileNames(Path parentPath) { - try { - if (!fs.exists(parentPath)) { - return new ArrayList<>(); - } - RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = - fs.listFiles(parentPath, true); - List<String> fileNames = new ArrayList<>(); - while (fileStatusRemoteIterator.hasNext()) { - LocatedFileStatus fileStatus = fileStatusRemoteIterator.next(); - if (fileStatus.getPath().getName().endsWith("wal.txt")) { - fileNames.add(fileStatus.getPath().toString()); - } - } - return fileNames; - } catch (IOException e) { - throw new IMapStorageException(e, "get file names error,path is s%", parentPath); - } + List<Path> paths = + fileNames.stream() + .map(filename -> new Path(parentPath, filename)) + .collect(Collectors.toList()); + return readData(paths); } - private List<IMapFileData> readData(Path path) throws IOException { + public List<IMapFileData> readData(List<Path> paths) throws IOException { Review Comment: ```suggestion private List<IMapFileData> readData(List<Path> paths) throws IOException { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org