prclin commented on code in PR #9524: URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182808994
########## seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java: ########## @@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) { // TODO Synchronous write, asynchronous write can be added in the future @Override public void write(IMapFileData data) throws IOException { - byte[] bytes = serializer.serialize(data); - this.write(bytes); + ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new ReentrantLock()); Review Comment: when using synchronized, in integration test `org.apache.seatunnel.engine.e2e.SplitClusterFaultToleranceIT#testStreamJobRestoreInAllNodeDown` , there are concurrent operation on one IMap,but synchronized not worked; and cause error:  > this image from https://github.com/prclin/seatunnel/actions/runs/16017281670/job/45186339029 in the begining, i thought it perhaps because Disruptor `handleEventsWithWorkerPool` concurrently consum events(it means handlers pass to handleEventsWithWorkerPool were just protorype), but i look into source codes, it did not do that way. so i guess FileMapStore of eache IMap may not created just once; in that case instance level of lock did not work as excepted; so i can just use class level lock for each IMap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org