This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 635a72d46060abae45faf81eb7b009205d1eac89 Author: zhengwen zhu <[email protected]> AuthorDate: Wed Jul 29 10:32:27 2020 +0800 [ISSUE #337] throw exception when send message to broker fail (#339) * throw an exception when sending message to broker fail * remove useless import Co-authored-by: zhu zhengwen <[email protected]> --- src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index eecb72e..76d6a1f 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -35,9 +35,11 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.flink.common.selector.TopicSelector; import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,8 +141,12 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint try { SendResult result = producer.send(msg); LOG.debug("Sync send message result: {}", result); + if (result.getSendStatus() != SendStatus.SEND_OK) { + throw new RemotingException(result.toString()); + } } catch (Exception e) { LOG.error("Sync send message failure!", e); + throw e; } } }
