prclin commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182808994


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
     // TODO Synchronous write, asynchronous write can be added in the future
     @Override
     public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+        ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new 
ReentrantLock());

Review Comment:
   when using synchronized,  in integration test 
`org.apache.seatunnel.engine.e2e.SplitClusterFaultToleranceIT#testStreamJobRestoreInAllNodeDown`
 , there are concurrent operation on one IMap,but synchronized not worked; and 
cause error:
   
![image](https://github.com/user-attachments/assets/83792f4e-cf62-4560-8562-47fa4886220a)
   > this image from 
https://github.com/prclin/seatunnel/actions/runs/16017281670/job/45186339029
   
   in the begining, i thought it perhaps because Disruptor  
`handleEventsWithWorkerPool` concurrently consum events(it means handlers pass 
to handleEventsWithWorkerPool were just protorype), but i look into source 
codes, it did not do that way.
   
   so i guess FileMapStore of eache IMap may not created just once; in that 
case instance level of lock did not work as excepted;  so i can just use class 
level lock for each IMap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to