This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 635a72d46060abae45faf81eb7b009205d1eac89
Author: zhengwen zhu <[email protected]>
AuthorDate: Wed Jul 29 10:32:27 2020 +0800

    [ISSUE #337] throw exception when send message to broker fail (#339)
    
    * throw an exception when sending message to broker fail
    
    * remove useless import
    
    Co-authored-by: zhu zhengwen <[email protected]>
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index eecb72e..76d6a1f 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -35,9 +35,11 @@ import 
org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.flink.common.selector.TopicSelector;
 import 
org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,8 +141,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
             try {
                 SendResult result = producer.send(msg);
                 LOG.debug("Sync send message result: {}", result);
+                if (result.getSendStatus() != SendStatus.SEND_OK) {
+                    throw new RemotingException(result.toString());
+                }
             } catch (Exception e) {
                 LOG.error("Sync send message failure!", e);
+                throw e;
             }
         }
     }

Reply via email to