This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 0698d86fb318da566c0744ba5737e416a7536dc1
Author: shangan <[email protected]>
AuthorDate: Mon Aug 19 14:41:39 2019 +0800

    Producer failed to shutdown when exception happened (#388)
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index ca6848d..e8f237f 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -181,7 +181,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
     @Override
     public void close() throws Exception {
         if (producer != null) {
-            flushSync();
+            try {
+                flushSync();
+            } catch (Exception e) {
+                LOG.error("FlushSync failure!", e);
+            }
+            // make sure producer can be shutdown, thus current producerGroup 
will be unregistered
             producer.shutdown();
         }
     }

Reply via email to