[ 
https://issues.apache.org/jira/browse/HUDI-3661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641823#comment-17641823
 ] 

lei w commented on HUDI-3661:
-----------------------------

hi,[~hd zhou] , [~danny0405]. In our cluster, we removed the async executor to 
solve this problem , but this lead checkpoint timeout. I have an idea to avoid 
this problem.
Some details are as follows:
1. compactionPlanOperator will  always send CompactionPlanEvent to down stream 
in  method named notifyCheckpointComplete.
     If no compaction plan, we will send compactionPlanEvent with null 
compactionOperation and broadcast this event  to down stream by  
     sending more events than compactTask concurrency.
2. Add a CompactionCommitEventList(a thread safe list) in compactFunction.
3. When compactionTask receives events from upstream:
    3.1: If compactionPlanEvent with null compactionOperation. do 3.3.
    3.2: If compactionPlanEvent with non-null compactionOperation,  put this 
task to AsyncCompactionExecutor.  do 3.3.
    3.3: Poll  all CompactionCommitEvent from CompactionCommitEventList and 
send to downStream.
 4. When the compaction is done, AsyncCompactionExecutor will put 
CompactionCommitEvent to CompactionCommitEventList.  Wait until the next 
     CompactionPlanEvent arrives, CompactionCommitEvent will send to down 
stream.
This idea will lead to delayed submission of compaction, but will not cause 
checkpoint timeout. Well, what do you suggest?  Your prompt reply would be 
greatly appreciated.

> Flink async compaction is not thread safe when use watermark
> ------------------------------------------------------------
>
>                 Key: HUDI-3661
>                 URL: https://issues.apache.org/jira/browse/HUDI-3661
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: hd zhou
>            Priority: Major
>         Attachments: image-2022-03-18-19-38-39-257.png
>
>
> async compaction will start a executor async compaciton and send compaction 
> result message to next flink operator. But collector.collect() is not a 
> threadsafe function. when use watermark or latencyMarker, they both call 
> collector.collect() may cause issue.
> we should not let async compaction = false
>  
> !image-2022-03-18-19-38-39-257.png!
>  
>  
> !https://git.bilibili.co/datacenter/bili-hudi/uploads/79608d01b0301de84d1d9e3cf24f1d21/image.png!
>  
> !https://git.bilibili.co/datacenter/bili-hudi/uploads/e9c2f27d395e708a407bcf40f672c870/image.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to