This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 452b57fcd480dd707ecd8b73d898e5e87c8f6679 Author: shangan <[email protected]> AuthorDate: Fri Feb 1 14:37:05 2019 +0800 Add batch size param for flink sink (#198) * Add batch size param for flink sink * Improvement. code style correction --- src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index e79d1b4..65274af 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -59,6 +59,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint private KeyValueSerializationSchema<IN> serializationSchema; private boolean batchFlushOnCheckpoint; // false by default + private int batchSize = 1000; private List<Message> batchList; public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) { @@ -97,6 +98,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint if (batchFlushOnCheckpoint) { batchList.add(msg); + if (batchList.size() >= batchSize) { + flushSync(); + } return; } @@ -156,6 +160,11 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint return this; } + public RocketMQSink<IN> withBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + @Override public void close() throws Exception { if (producer != null) {
