Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 15af63e23 -> 01a0eb008


Fix possible NullPointerException when retry in send Async way


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/01a0eb00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/01a0eb00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/01a0eb00

Branch: refs/heads/develop
Commit: 01a0eb0088860b10ae0cff257e4f25a0c59cd44c
Parents: 15af63e
Author: Jaskey <[email protected]>
Authored: Wed Feb 15 22:17:47 2017 +0800
Committer: Zhanhui Li <[email protected]>
Committed: Mon Mar 27 22:07:37 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/client/impl/MQClientAPIImpl.java   | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/01a0eb00/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index bdce883..6119e24 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -438,34 +438,40 @@ public class MQClientAPIImpl {
     ) {
         int tmp = curTimes.incrementAndGet();
         if (needRetry && tmp <= timesTotal) {
-            MessageQueue tmpmq = 
producer.selectOneMessageQueue(topicPublishInfo, brokerName);
-            String addr = 
instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
+            String retryBrokerName = brokerName;//by default, it will send to 
the same broker
+            if (topicPublishInfo != null) { //select one message queue 
accordingly, in order to determine which broker to send
+                MessageQueue mqChosen = 
producer.selectOneMessageQueue(topicPublishInfo, brokerName);
+                retryBrokerName = mqChosen.getBrokerName();
+            }
+            String addr = instance.findBrokerAddressInPublish(retryBrokerName);
             log.info("async send msg by retry {} times. topic={}, 
brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                tmpmq.getBrokerName());
+                retryBrokerName);
             try {
                 request.setOpaque(RemotingCommand.createNewRequestId());
-                sendMessageAsync(addr, tmpmq.getBrokerName(), msg, 
timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+                sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance,
                     timesTotal, curTimes, context, producer);
             } catch (InterruptedException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, 
sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, false, producer);
             } catch (RemotingConnectException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, 
sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, true, producer);
             } catch (RemotingTooMuchRequestException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, 
sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, false, producer);
             } catch (RemotingException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+                onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, 
sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                     context, true, producer);
             }
         } else {
+
             if (context != null) {
                 context.setException(e);
                 context.getProducer().executeSendMessageHookAfter(context);
             }
+
             try {
                 sendCallback.onException(e);
             } catch (Exception ignored) {

Reply via email to