Repository: incubator-rocketmq Updated Branches: refs/heads/develop 15af63e23 -> 01a0eb008
Fix possible NullPointerException when retry in send Async way Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/01a0eb00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/01a0eb00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/01a0eb00 Branch: refs/heads/develop Commit: 01a0eb0088860b10ae0cff257e4f25a0c59cd44c Parents: 15af63e Author: Jaskey <[email protected]> Authored: Wed Feb 15 22:17:47 2017 +0800 Committer: Zhanhui Li <[email protected]> Committed: Mon Mar 27 22:07:37 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/client/impl/MQClientAPIImpl.java | 22 +++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/01a0eb00/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index bdce883..6119e24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -438,34 +438,40 @@ public class MQClientAPIImpl { ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { - MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); - String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); + String retryBrokerName = brokerName;//by default, it will send to the same broker + if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send + MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName); + retryBrokerName = mqChosen.getBrokerName(); + } + String addr = instance.findBrokerAddressInPublish(retryBrokerName); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); + retryBrokerName); try { request.setOpaque(RemotingCommand.createNewRequestId()); - sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, + sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } catch (RemotingTooMuchRequestException e1) { - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, + onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer); } } else { + if (context != null) { context.setException(e); context.getProducer().executeSendMessageHookAfter(context); } + try { sendCallback.onException(e); } catch (Exception ignored) {
