Fix possible NullPointerException when retry in send Async way
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/02acf1a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/02acf1a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/02acf1a0 Branch: refs/heads/master Commit: 02acf1a074289cb46909f00e88c86c52d356523b Parents: 47fad3c Author: Jaskey <[email protected]> Authored: Wed Feb 15 22:17:47 2017 +0800 Committer: dongeforever <[email protected]> Committed: Tue Jun 6 11:37:29 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/client/impl/MQClientAPIImpl.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/02acf1a0/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bdce883..6119e24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -438,34 +438,40 @@ public class MQClientAPIImpl { ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { - MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); + String retryBrokerName = brokerName;//by default, it will send to the same broker + if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send + MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + retryBrokerName = mqChosen.getBrokerName(); + } + String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); + retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); - sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { + if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); } + try { sendCallback.onException(e); } catch (Exception ignored) {
