This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 0698d86fb318da566c0744ba5737e416a7536dc1 Author: shangan <[email protected]> AuthorDate: Mon Aug 19 14:41:39 2019 +0800 Producer failed to shutdown when exception happened (#388) --- src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index ca6848d..e8f237f 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -181,7 +181,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint @Override public void close() throws Exception { if (producer != null) { - flushSync(); + try { + flushSync(); + } catch (Exception e) { + LOG.error("FlushSync failure!", e); + } + // make sure producer can be shutdown, thus current producerGroup will be unregistered producer.shutdown(); } }
