This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e13440  [ISSUE #118] fix: concurrency problem caused by 
batchFlushOnCheckpoint (#119)
5e13440 is described below

commit 5e13440321e1844adba3a75e6415d3ec08d60e9a
Author: Humkum <[email protected]>
AuthorDate: Fri May 17 11:02:21 2024 +0800

    [ISSUE #118] fix: concurrency problem caused by batchFlushOnCheckpoint 
(#119)
---
 .../java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java 
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java
index 9f87486..9ff2812 100644
--- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java
+++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java
@@ -111,7 +111,9 @@ public class RocketMQSink extends RichSinkFunction<Message> 
implements Checkpoin
         sinkInTps.markEvent();
 
         if (batchFlushOnCheckpoint) {
-            batchList.add(input);
+            synchronized (batchList) {
+                batchList.add(input);
+            }
             if (batchList.size() >= batchSize) {
                 flushSync();
             }

Reply via email to