This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 452b57fcd480dd707ecd8b73d898e5e87c8f6679
Author: shangan <[email protected]>
AuthorDate: Fri Feb 1 14:37:05 2019 +0800

    Add batch size param for flink sink (#198)
    
    * Add batch size param for flink sink
    
    * Improvement. code style correction
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index e79d1b4..65274af 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -59,6 +59,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
     private KeyValueSerializationSchema<IN> serializationSchema;
 
     private boolean batchFlushOnCheckpoint; // false by default
+    private int batchSize = 1000;
     private List<Message> batchList;
 
     public RocketMQSink(KeyValueSerializationSchema<IN> schema, 
TopicSelector<IN> topicSelector, Properties props) {
@@ -97,6 +98,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
 
         if (batchFlushOnCheckpoint) {
             batchList.add(msg);
+            if (batchList.size() >= batchSize) {
+                flushSync();
+            }
             return;
         }
 
@@ -156,6 +160,11 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
         return this;
     }
 
+    public RocketMQSink<IN> withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
     @Override
     public void close() throws Exception {
         if (producer != null) {

Reply via email to