lindzh closed pull request #219: [ROCKETMQ-355] Client asyncSend is not fully 
async
URL: https://github.com/apache/rocketmq/pull/219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7c1697967..6729669da 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -74,6 +75,7 @@
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.remoting.DoAsyncCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -249,7 +251,7 @@ public TransactionCheckListener checkListener() {
 
     @Override
     public void checkTransactionState(final String addr, final MessageExt msg,
-        final CheckTransactionStateRequestHeader header) {
+                                      final CheckTransactionStateRequestHeader 
header) {
         Runnable request = new Runnable() {
             private final String brokerAddr = addr;
             private final MessageExt message = msg;
@@ -409,17 +411,43 @@ public MessageExt queryMessageByUniqKey(String topic, 
String uniqKey)
     /**
      * DEFAULT ASYNC -------------------------------------------------------
      */
-    public void send(Message msg,
-        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
+    public void send(Message msg, SendCallback sendCallback) {
         send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, SendCallback sendCallback, long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, 
timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
+    public void send(final Message msg, final SendCallback sendCallback, final 
long timeout) {
+        
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new 
DoAsyncCallback() {
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
+
+            @Override
+            public void onSuccess() throws RemotingException {
+                try {
+                    sendDefaultImpl(msg, CommunicationMode.ASYNC, 
sendCallback, timeout);
+                } catch (Exception e) {
+                    handleCallbackException(e, sendCallback);
+                    throw new RemotingException("client send check 
exception",e);
+                }
+            }
+
+            @Override
+            public void onFailed(Throwable e) {
+                handleCallbackException(e, sendCallback);
+            }
+        });
+    }
+
+    private void handleCallbackException(Throwable e, SendCallback 
sendCallback) {
+        if (sendCallback != null) {
+            if (e instanceof MQBrokerException) {
+                sendCallback.onException(new MQClientException("unknown 
exception", e));
+            } else {
+                sendCallback.onException(e);
+            }
+        } else {
+            log.warn("asyncSend message callback null real exception is " + 
e.getMessage(), e);
         }
     }
 
@@ -583,11 +611,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final 
String topic) {
     }
 
     private SendResult sendKernelImpl(final Message msg,
-        final MessageQueue mq,
-        final CommunicationMode communicationMode,
-        final SendCallback sendCallback,
-        final TopicPublishInfo topicPublishInfo,
-        final long timeout) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
+                                      final MessageQueue mq,
+                                      final CommunicationMode 
communicationMode,
+                                      final SendCallback sendCallback,
+                                      final TopicPublishInfo topicPublishInfo,
+                                      final long timeout) throws 
MQClientException, RemotingException, MQBrokerException, InterruptedException {
         String brokerAddr = 
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
@@ -842,32 +870,45 @@ public SendResult send(Message msg, MessageQueue mq, long 
timeout)
     /**
      * KERNEL ASYNC -------------------------------------------------------
      */
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback) {
         send(msg, mq, sendCallback, 
this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, 
long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
+    public void send(final Message msg, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout) {
+        
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new 
DoAsyncCallback() {
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
 
-        if (!msg.getTopic().equals(mq.getTopic())) {
-            throw new MQClientException("message's topic not equal mq's 
topic", null);
-        }
+            @Override
+            public void onSuccess() throws RemotingException {
+                try {
+                    makeSureStateOK();
+                    Validators.checkMessage(msg, defaultMQProducer);
 
-        try {
-            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
-        }
+                    if (!msg.getTopic().equals(mq.getTopic())) {
+                        throw new MQClientException("message's topic not equal 
mq's topic", null);
+                    }
+                    sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null, timeout);
+                } catch (Exception e) {
+                    handleCallbackException(e, sendCallback);
+                    throw new RemotingException("client send check 
exception",e);
+                }
+            }
+
+            @Override
+            public void onFailed(Throwable e) {
+                handleCallbackException(e, sendCallback);
+            }
+        });
     }
 
     /**
      * KERNEL ONEWAY -------------------------------------------------------
      */
     public void sendOneway(Message msg,
-        MessageQueue mq) throws MQClientException, RemotingException, 
InterruptedException {
+                           MessageQueue mq) throws MQClientException, 
RemotingException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -923,18 +964,32 @@ private SendResult sendSelectImpl(
     /**
      * SELECT ASYNC -------------------------------------------------------
      */
-    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback) {
         send(msg, selector, arg, sendCallback, 
this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback, long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
-        try {
-            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, 
sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
-        }
+    public void send(final Message msg, final MessageQueueSelector selector, 
final Object arg, final SendCallback sendCallback, final long timeout) {
+        
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().doAsyncSend(new 
DoAsyncCallback() {
+            @Override
+            public long getTimeout() {
+                return timeout;
+            }
+
+            @Override
+            public void onSuccess() throws RemotingException {
+                try {
+                    sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, sendCallback, timeout);
+                } catch (Exception e) {
+                    handleCallbackException(e, sendCallback);
+                    throw new RemotingException("client send check 
exception",e);
+                }
+            }
+
+            @Override
+            public void onFailed(Throwable e) {
+                handleCallbackException(e, sendCallback);
+            }
+        });
     }
 
     /**
@@ -950,7 +1005,7 @@ public void sendOneway(Message msg, MessageQueueSelector 
selector, Object arg)
     }
 
     public TransactionSendResult sendMessageInTransaction(final Message msg,
-        final LocalTransactionExecuter tranExecuter, final Object arg)
+                                                          final 
LocalTransactionExecuter tranExecuter, final Object arg)
         throws MQClientException {
         if (null == tranExecuter) {
             throw new MQClientException("tranExecutor is null", null);
@@ -1064,8 +1119,12 @@ public void setCallbackExecutor(final ExecutorService 
callbackExecutor) {
         
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
     }
 
+    private ExecutorService getCallbackExecutor() {
+        return 
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+    }
+
     public SendResult send(Message msg,
-        long timeout) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
+                           long timeout) throws MQClientException, 
RemotingException, MQBrokerException, InterruptedException {
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, 
timeout);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index a2f25dd0f..07ac9f625 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -250,8 +250,8 @@ public SendResult send(Message msg,
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
-    public void send(Message msg,
-        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
+    public void send(Message msg, SendCallback sendCallback) throws 
MQClientException, RemotingException,
+        InterruptedException {
         this.defaultMQProducerImpl.send(msg, sendCallback);
     }
 
@@ -266,8 +266,8 @@ public void send(Message msg,
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
-    public void send(Message msg, SendCallback sendCallback, long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, SendCallback sendCallback, long timeout) 
throws MQClientException,
+        RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
     }
 
@@ -333,8 +333,8 @@ public SendResult send(Message msg, MessageQueue mq, long 
timeout)
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback) 
throws MQClientException,
+        RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, mq, sendCallback);
     }
 
@@ -350,8 +350,8 @@ public void send(Message msg, MessageQueue mq, SendCallback 
sendCallback)
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, 
long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, 
long timeout) throws MQClientException,
+        RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java 
b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 14caf6ffa..dee19b27a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -70,12 +70,10 @@ SendResult send(final Message msg, final 
MessageQueueSelector selector, final Ob
         InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final 
Object arg,
-        final SendCallback sendCallback) throws MQClientException, 
RemotingException,
-        InterruptedException;
+        final SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final 
Object arg,
-        final SendCallback sendCallback, final long timeout) throws 
MQClientException, RemotingException,
-        InterruptedException;
+        final SendCallback sendCallback, final long timeout) throws 
MQClientException, RemotingException, InterruptedException;
 
     void sendOneway(final Message msg, final MessageQueueSelector selector, 
final Object arg)
         throws MQClientException, RemotingException, InterruptedException;
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index ded22ada9..a3d11a9de 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -24,6 +24,9 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -36,6 +39,7 @@
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -214,6 +218,51 @@ public void testSetCallbackExecutor() throws 
MQClientException {
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
     }
 
+    @Test
+    public void testAsyncSend() throws MQClientException, RemotingException, 
InterruptedException {
+        String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.start();
+
+        final AtomicInteger cc = new AtomicInteger(0);
+        final CountDownLatch countDownLatch = new CountDownLatch(6);
+
+        SendCallback sendCallback = new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                e.printStackTrace();
+                countDownLatch.countDown();
+                cc.incrementAndGet();
+            }
+        };
+        MessageQueueSelector messageQueueSelector = new MessageQueueSelector() 
{
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, 
Object arg) {
+                return null;
+            }
+        };
+
+        Message message = new Message();
+        message.setTopic("test");
+        message.setBody("hello world".getBytes());
+        producer.send(new Message(),sendCallback);
+        producer.send(message,sendCallback,1000);
+        producer.send(message,new MessageQueue(),sendCallback);
+        producer.send(new Message(),new MessageQueue(),sendCallback,1000);
+        producer.send(new Message(),messageQueueSelector,null,sendCallback);
+        producer.send(message,messageQueueSelector,null,sendCallback,1000);
+
+        countDownLatch.await(1000L, TimeUnit.MILLISECONDS);
+
+        assertThat(cc.get()).isEqualTo(6);
+    }
+
     public static TopicRouteData createTopicRoute() {
         TopicRouteData topicRouteData = new TopicRouteData();
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java
new file mode 100644
index 000000000..6de9e9dbb
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/DoAsyncCallback.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public interface DoAsyncCallback {
+
+    long getTimeout();
+
+    void onSuccess() throws RemotingException;
+
+    void onFailed(Throwable e);
+
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 2aea14cb9..e0763b434 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -39,6 +39,8 @@ void invokeAsync(final String addr, final RemotingCommand 
request, final long ti
         final InvokeCallback invokeCallback) throws InterruptedException, 
RemotingConnectException,
         RemotingTooMuchRequestException, RemotingTimeoutException, 
RemotingSendRequestException;
 
+    void doAsyncSend(DoAsyncCallback callback);
+
     void invokeOneway(final String addr, final RemotingCommand request, final 
long timeoutMillis)
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException,
         RemotingTimeoutException, RemotingSendRequestException;
@@ -48,5 +50,7 @@ void registerProcessor(final int requestCode, final 
NettyRequestProcessor proces
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
 
+    ExecutorService getCallbackExecutor();
+
     boolean isChannelWritable(final String addr);
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 557ad5602..9f00f3606 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -95,6 +95,13 @@
      */
     protected volatile SslContext sslContext;
 
+    protected ThreadLocal<Boolean> asynclockAcquired = new 
ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return Boolean.FALSE;
+        }
+    };
+
     /**
      * Constructor, specifying capacity of one-way and asynchronous semaphores.
      *
@@ -271,6 +278,29 @@ public void processResponseCommand(ChannelHandlerContext 
ctx, RemotingCommand cm
         }
     }
 
+    protected void executeAsyncCallback(Runnable runnable) {
+        boolean runInThisThread = false;
+        ExecutorService executor = this.getCallbackExecutor();
+        if (executor != null) {
+            try {
+                executor.submit(runnable);
+            } catch (Exception e) {
+                runInThisThread = true;
+                log.warn("execute async callback in executor exception, maybe 
executor busy", e);
+            }
+        } else {
+            runInThisThread = true;
+        }
+
+        if (runInThisThread) {
+            try {
+                runnable.run();
+            } catch (Throwable e) {
+                log.warn("executeAsyncCallback Exception", e);
+            }
+        }
+    }
+
     /**
      * Execute callback in callback executor. If callback executor is null, 
run directly in current thread
      */
@@ -400,7 +430,10 @@ public void invokeAsyncImpl(final Channel channel, final 
RemotingCommand request
         final InvokeCallback invokeCallback)
         throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
         final int opaque = request.getOpaque();
-        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
+        boolean acquired = asynclockAcquired.get();
+        if (!acquired) {
+            acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
+        }
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index dcc80cba0..1cf73dec5 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -53,6 +53,8 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rocketmq.remoting.DoAsyncCallback;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -61,6 +63,7 @@
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
@@ -507,6 +510,54 @@ private Channel createChannel(final String addr) throws 
InterruptedException {
         return null;
     }
 
+    @Override
+    public void doAsyncSend(final DoAsyncCallback callback) {
+        try {
+            boolean acquired = this.semaphoreAsync.tryAcquire(1, 
TimeUnit.MILLISECONDS);
+            if (acquired) {
+                asynclockAcquired.set(Boolean.TRUE);
+                executeAsyncCallback(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            callback.onSuccess();
+                        } catch (Throwable e) {
+                            semaphoreAsync.release();
+                            asynclockAcquired.set(Boolean.FALSE);
+                            if (e instanceof RemotingException) {
+                                // ignore
+                            } else {
+                                log.error("doAsyncSendBeforeNetwork call 
onSuccess failed", e);
+                            }
+                        }
+                    }
+                });
+            } else {
+                executeAsyncCallback(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            callback.onFailed(null);
+                        } catch (Throwable e) {
+                            log.error("doAsyncSendBeforeNetwork call onFailed 
failed", e);
+                        }
+                    }
+                });
+            }
+        } catch (final Throwable e) {
+            executeAsyncCallback(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        callback.onFailed(e);
+                    } catch (Throwable e) {
+                        log.error("doAsyncSendBeforeNetwork call onFailed 
failed", e);
+                    }
+                }
+            });
+        }
+    }
+
     @Override
     public void invokeAsync(String addr, RemotingCommand request, long 
timeoutMillis, InvokeCallback invokeCallback)
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException, RemotingTimeoutException,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to