Cczzzz opened a new pull request #1800: Fix asynchronous send retry
URL: https://github.com/apache/rocketmq/pull/1800
 
 
   retryTimesWhenSendAsyncFailed在DefaultMQProducer中此字段无效。
   
        producer.send(msg, new SendCallback() {
                       @Override
                       public void onSuccess(SendResult sendResult) {
                           countDownLatch.countDown();
                           System.out.printf("%-10d OK %s %n", index, 
sendResult.getMsgId());
                       }
   
                       @Override
                       public void onException(Throwable e) {
                           countDownLatch.countDown();
                           System.out.printf("%-10d Exception %s %n", index, e);
                           e.printStackTrace();
                       }
                   });
   最后输入MQClientAPIImpl#sendMessageAsync,如果发生异常。
   将调用onExceptionImpl(),但timeoutMillis始终为0。
   
   
onExceptionImpl(brokerName,msg,0L,request,sendCallback,topicPublishInfo,instance,
   retryTimesWhenSendFailed,times,ex,context,true,producer);
   
       private void onExceptionImpl(final String brokerName,
                                    final Message msg,
                                    final long timeoutMillis,
                                    final RemotingCommand request,
                                    final SendCallback sendCallback,
                                    final TopicPublishInfo topicPublishInfo,
                                    final MQClientInstance instance,
                                    final int timesTotal,
                                    final AtomicInteger curTimes,
                                    final Exception e,
                                    final SendMessageContext context,
                                    final boolean needRetry,
                                    final DefaultMQProducerImpl producer
       )
   在此将导致发送失败,invokeAsync timeoutMillis为0。
   
   NettyRemotingClient#invokeAsync 
    @Override
       public void invokeAsync(String addr, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
           throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException, RemotingTimeoutException,
           RemotingSendRequestException {
           long beginStartTime = System.currentTimeMillis();
           final Channel channel = this.getAndCreateChannel(addr);
           if (channel != null && channel.isActive()) {
               try {
                   doBeforeRpcHooks(addr, request);
                   long costTime = System.currentTimeMillis() - beginStartTime;
                   if (timeoutMillis < costTime) {
                       throw new RemotingTooMuchRequestException("invokeAsync 
call timeout");
                   }
                   this.invokeAsyncImpl(channel, request, timeoutMillis - 
costTime, invokeCallback);
               } catch (RemotingSendRequestException e) {
                   log.warn("invokeAsync: send request exception, so close the 
channel[{}]", addr);
                   this.closeChannel(addr, channel);
                   throw e;
               }
           } else {
               this.closeChannel(addr, channel);
               throw new RemotingConnectException(addr);
           }
       }
   重试时,requestId将被重置,但是会有重复的requestId,并且不会执行回调函数
   
    private volatile int opaque = requestId.getAndIncrement();
      public static int createNewRequestId() {
           return requestId.incrementAndGet();
       }
   crementAndGet和getAndIncrement将导致重复的id。
   我尝试修复它,如果这是一个错误。
   ····································
   关于异步发送试重的问题
   
异步发送的重试次数并没有生效,因为在方法中MQClientAPIImpl#sendMessageAsync发送失败后调用时timeoutMillis是0,导致后面超时判断直接判定超时,第一次重试就失败。
   onExceptionImpl(brokerName, msg,0L,request,sendCallback,topicPublishInfo,实例,
   retryTimesWhenSendFailed,时间,ex,上下文,true,生产者);
   
然后,在重试时会重置请求id,但是重置的方法和创建request时分配请求id方法会导致出现反复id.incrementAndGet和getAndIncrement一起使用了,导致其他请求的响应影响的异步发送的响应,导致回调函数没有执行,重试中断。
   不知道这是不是一个错误,如果是,我修复了它,希望可以提交rp

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to