vongosling closed pull request #222: [ROCKETMQ-355] Client asyncSend is not 
fully async
URL: https://github.com/apache/rocketmq/pull/222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index a9eabfe63..66ec1e944 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -25,6 +25,7 @@
  * Client Common configuration
  */
 public class ClientConfig {
+    public static final String SEND_ASYNC_SEMAPHORE = 
"com.rocketmq.sendAsyncSemaphore";
     public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = 
"com.rocketmq.sendMessageWithVIPChannel";
     private String namesrvAddr = 
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
System.getenv(MixAll.NAMESRV_ADDR_ENV));
     private String clientIP = RemotingUtil.getLocalAddress();
@@ -46,6 +47,8 @@
     private String unitName;
     private boolean vipChannelEnabled = 
Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, 
"true"));
 
+    private int asyncSendSemaphore = 
Integer.parseInt(System.getProperty(SEND_ASYNC_SEMAPHORE, "65536"));
+
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
     public String buildMQClientId() {
@@ -186,6 +189,14 @@ public void setUseTLS(boolean useTLS) {
         this.useTLS = useTLS;
     }
 
+    public int getAsyncSendSemaphore() {
+        return asyncSendSemaphore;
+    }
+
+    public void setAsyncSendSemaphore(int asyncSendSemaphore) {
+        this.asyncSendSemaphore = asyncSendSemaphore;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + 
clientIP + ", instanceName=" + instanceName
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7c1697967..5d4c46c58 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -29,8 +29,11 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -98,6 +101,8 @@
 
     private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
 
+    private Semaphore asyncSendSemphore = null;
+
     public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
         this(defaultMQProducer, null);
     }
@@ -145,6 +150,10 @@ public void start(final boolean startFactory) throws 
MQClientException {
 
                 this.checkConfig();
 
+                if (asyncSendSemphore == null) {
+                    asyncSendSemphore = new 
Semaphore(defaultMQProducer.getAsyncSendSemaphore());
+                }
+
                 if 
(!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP))
 {
                     this.defaultMQProducer.changeInstanceNameToPID();
                 }
@@ -406,20 +415,99 @@ public MessageExt queryMessageByUniqKey(String topic, 
String uniqKey)
         return 
this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
     }
 
+    private static class AsyncSendCallback implements SendCallback {
+        private SendCallback realSendCallback;
+        private Semaphore semaphore;
+
+        public AsyncSendCallback(SendCallback sendCallback, Semaphore 
semaphore) {
+            this.realSendCallback = sendCallback;
+            this.semaphore = semaphore;
+        }
+
+        @Override
+        public void onSuccess(SendResult sendResult) {
+            try {
+                realSendCallback.onSuccess(sendResult);
+            } finally {
+                semaphore.release();
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            try {
+                realSendCallback.onException(e);
+            } finally {
+                semaphore.release();
+            }
+        }
+    }
+
+    // FIXME: 2018/1/25
+    private ExecutorService getAsyncSendExecutor() {
+        return this.getCallbackExecutor();
+    }
+
+    private void doAsyncSend(Runnable runnable, final SendCallback 
sendCallback) {
+        try {
+            getAsyncSendExecutor().submit(runnable);
+        } catch (RejectedExecutionException e) {
+            sendCallback.onException(e);
+        }
+    }
+
+    private void asyncHandleException(final SendCallback sendCallback, final 
Throwable e) {
+        try {
+            getAsyncSendExecutor().submit(new Runnable() {
+                @Override
+                public void run() {
+                    sendCallback.onException(e);
+                }
+            });
+        } catch (RejectedExecutionException e1) {
+            sendCallback.onException(e);
+        }
+    }
+
     /**
      * DEFAULT ASYNC -------------------------------------------------------
      */
-    public void send(Message msg,
-        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
+    public void send(Message msg, SendCallback sendCallback) {
         send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, SendCallback sendCallback, long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(final Message msg, final SendCallback sendCallback, final 
long timeout) {
         try {
-            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, 
timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
+            boolean acquire = asyncSendSemphore.tryAcquire(timeout, 
TimeUnit.MILLISECONDS);
+            if (acquire) {
+                final AsyncSendCallback asyncSendCallback = new 
AsyncSendCallback(sendCallback, asyncSendSemphore);
+                doAsyncSend(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            sendDefaultImpl(msg, CommunicationMode.ASYNC, 
asyncSendCallback, timeout);
+                        } catch (Exception e) {
+                            handleCallbackException(e, asyncSendCallback);
+                        }
+                    }
+                }, asyncSendCallback);
+            } else {
+                asyncHandleException(sendCallback, new 
RejectedExecutionException());
+            }
+        } catch (InterruptedException e) {
+            asyncHandleException(sendCallback, e);
+        }
+    }
+
+    private void handleCallbackException(Exception e, SendCallback 
sendCallback) {
+        if (sendCallback != null) {
+            if (e instanceof MQBrokerException) {
+                sendCallback.onException(new MQClientException("unknown 
exception", e));
+            } else {
+                sendCallback.onException(e);
+            }
+        } else {
+            log.warn("asyncSend message callback null real exception is " + 
e.getMessage(), e);
         }
     }
 
@@ -583,11 +671,11 @@ private TopicPublishInfo tryToFindTopicPublishInfo(final 
String topic) {
     }
 
     private SendResult sendKernelImpl(final Message msg,
-        final MessageQueue mq,
-        final CommunicationMode communicationMode,
-        final SendCallback sendCallback,
-        final TopicPublishInfo topicPublishInfo,
-        final long timeout) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
+                                      final MessageQueue mq,
+                                      final CommunicationMode 
communicationMode,
+                                      final SendCallback sendCallback,
+                                      final TopicPublishInfo topicPublishInfo,
+                                      final long timeout) throws 
MQClientException, RemotingException, MQBrokerException, InterruptedException {
         String brokerAddr = 
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
@@ -842,24 +930,38 @@ public SendResult send(Message msg, MessageQueue mq, long 
timeout)
     /**
      * KERNEL ASYNC -------------------------------------------------------
      */
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback) {
         send(msg, mq, sendCallback, 
this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, 
long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
-
-        if (!msg.getTopic().equals(mq.getTopic())) {
-            throw new MQClientException("message's topic not equal mq's 
topic", null);
-        }
-
+    public void send(final Message msg, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout) {
         try {
-            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
sendCallback, null, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
+            boolean acquire = asyncSendSemphore.tryAcquire(timeout, 
TimeUnit.MILLISECONDS);
+            if (acquire) {
+                final AsyncSendCallback asyncSendCallback = new 
AsyncSendCallback(sendCallback, asyncSendSemphore);
+                doAsyncSend(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+
+                            makeSureStateOK();
+                            Validators.checkMessage(msg, defaultMQProducer);
+
+                            if (!msg.getTopic().equals(mq.getTopic())) {
+                                throw new MQClientException("message's topic 
not equal mq's topic", null);
+                            }
+                            sendKernelImpl(msg, mq, CommunicationMode.ASYNC, 
asyncSendCallback, null, timeout);
+                        } catch (Exception e) {
+                            handleCallbackException(e, asyncSendCallback);
+                        }
+                    }
+                }, asyncSendCallback);
+
+            } else {
+                asyncHandleException(sendCallback, new 
RejectedExecutionException());
+            }
+        } catch (InterruptedException e) {
+            asyncHandleException(sendCallback, e);
         }
     }
 
@@ -867,7 +969,7 @@ public void send(Message msg, MessageQueue mq, SendCallback 
sendCallback, long t
      * KERNEL ONEWAY -------------------------------------------------------
      */
     public void sendOneway(Message msg,
-        MessageQueue mq) throws MQClientException, RemotingException, 
InterruptedException {
+                           MessageQueue mq) throws MQClientException, 
RemotingException, InterruptedException {
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -923,17 +1025,31 @@ private SendResult sendSelectImpl(
     /**
      * SELECT ASYNC -------------------------------------------------------
      */
-    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback) {
         send(msg, selector, arg, sendCallback, 
this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback, long timeout)
-        throws MQClientException, RemotingException, InterruptedException {
+    public void send(final Message msg, final MessageQueueSelector selector, 
final Object arg, final SendCallback sendCallback, final long timeout) {
         try {
-            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, 
sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
+            boolean acquire = asyncSendSemphore.tryAcquire(timeout, 
TimeUnit.MILLISECONDS);
+            if (acquire) {
+                final AsyncSendCallback asyncSendCallback = new 
AsyncSendCallback(sendCallback, asyncSendSemphore);
+                doAsyncSend(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            sendSelectImpl(msg, selector, arg, 
CommunicationMode.ASYNC, asyncSendCallback, timeout);
+                        } catch (Exception e) {
+                            handleCallbackException(e, asyncSendCallback);
+                        }
+                    }
+                }, asyncSendCallback);
+
+            } else {
+                asyncHandleException(sendCallback, new 
RejectedExecutionException());
+            }
+        } catch (InterruptedException e) {
+            asyncHandleException(sendCallback, e);
         }
     }
 
@@ -950,7 +1066,7 @@ public void sendOneway(Message msg, MessageQueueSelector 
selector, Object arg)
     }
 
     public TransactionSendResult sendMessageInTransaction(final Message msg,
-        final LocalTransactionExecuter tranExecuter, final Object arg)
+                                                          final 
LocalTransactionExecuter tranExecuter, final Object arg)
         throws MQClientException {
         if (null == tranExecuter) {
             throw new MQClientException("tranExecutor is null", null);
@@ -1064,8 +1180,12 @@ public void setCallbackExecutor(final ExecutorService 
callbackExecutor) {
         
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
     }
 
+    private ExecutorService getCallbackExecutor() {
+        return 
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+    }
+
     public SendResult send(Message msg,
-        long timeout) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
+                           long timeout) throws MQClientException, 
RemotingException, MQBrokerException, InterruptedException {
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, 
timeout);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index a2f25dd0f..d0cae87a1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -250,8 +250,8 @@ public SendResult send(Message msg,
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
-    public void send(Message msg,
-        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
+    public void send(Message msg, SendCallback sendCallback)
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQProducerImpl.send(msg, sendCallback);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java 
b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 14caf6ffa..a7367e919 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -70,11 +70,11 @@ SendResult send(final Message msg, final 
MessageQueueSelector selector, final Ob
         InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final 
Object arg,
-        final SendCallback sendCallback) throws MQClientException, 
RemotingException,
+              final SendCallback sendCallback) throws MQClientException, 
RemotingException,
         InterruptedException;
 
     void send(final Message msg, final MessageQueueSelector selector, final 
Object arg,
-        final SendCallback sendCallback, final long timeout) throws 
MQClientException, RemotingException,
+              final SendCallback sendCallback, final long timeout) throws 
MQClientException, RemotingException,
         InterruptedException;
 
     void sendOneway(final Message msg, final MessageQueueSelector selector, 
final Object arg)
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index ded22ada9..a3d11a9de 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -24,6 +24,9 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -36,6 +39,7 @@
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -214,6 +218,51 @@ public void testSetCallbackExecutor() throws 
MQClientException {
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
     }
 
+    @Test
+    public void testAsyncSend() throws MQClientException, RemotingException, 
InterruptedException {
+        String producerGroupTemp = producerGroupPrefix + 
System.currentTimeMillis();
+        producer = new DefaultMQProducer(producerGroupTemp);
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.start();
+
+        final AtomicInteger cc = new AtomicInteger(0);
+        final CountDownLatch countDownLatch = new CountDownLatch(6);
+
+        SendCallback sendCallback = new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                e.printStackTrace();
+                countDownLatch.countDown();
+                cc.incrementAndGet();
+            }
+        };
+        MessageQueueSelector messageQueueSelector = new MessageQueueSelector() 
{
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, 
Object arg) {
+                return null;
+            }
+        };
+
+        Message message = new Message();
+        message.setTopic("test");
+        message.setBody("hello world".getBytes());
+        producer.send(new Message(),sendCallback);
+        producer.send(message,sendCallback,1000);
+        producer.send(message,new MessageQueue(),sendCallback);
+        producer.send(new Message(),new MessageQueue(),sendCallback,1000);
+        producer.send(new Message(),messageQueueSelector,null,sendCallback);
+        producer.send(message,messageQueueSelector,null,sendCallback,1000);
+
+        countDownLatch.await(1000L, TimeUnit.MILLISECONDS);
+
+        assertThat(cc.get()).isEqualTo(6);
+    }
+
     public static TopicRouteData createTopicRoute() {
         TopicRouteData topicRouteData = new TopicRouteData();
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 2aea14cb9..c0754db63 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -48,5 +48,7 @@ void registerProcessor(final int requestCode, final 
NettyRequestProcessor proces
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
 
+    ExecutorService getCallbackExecutor();
+
     boolean isChannelWritable(final String addr);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to