This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new af993d28e2 [ISSUE #3717][RIP-27] Auto batching in producer
af993d28e2 is described below

commit af993d28e20922d91862f0911e59f748dcb64e6a
Author: guyinyou <[email protected]>
AuthorDate: Fri Jul 21 09:31:56 2023 +0800

    [ISSUE #3717][RIP-27] Auto batching in producer
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../rocketmq/client/impl/MQClientManager.java      |  21 +-
 .../impl/producer/DefaultMQProducerImpl.java       |  36 ++
 .../client/producer/DefaultMQProducer.java         | 501 +++++++++++++-------
 .../rocketmq/client/producer/MQProducer.java       |  24 +-
 .../client/producer/ProduceAccumulator.java        | 510 +++++++++++++++++++++
 .../client/producer/DefaultMQProducerTest.java     |  38 +-
 .../client/producer/ProduceAccumulatorTest.java    | 176 +++++++
 .../rocketmq/common/message/MessageBatch.java      |   2 +-
 8 files changed, 1133 insertions(+), 175 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 49186633fa..02eaa66e99 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.ProduceAccumulator;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -31,6 +32,9 @@ public class MQClientManager {
     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable 
=
         new ConcurrentHashMap<>();
+    private ConcurrentMap<String/* clientId */, ProduceAccumulator> 
accumulatorTable =
+        new ConcurrentHashMap<String, ProduceAccumulator>();
+
 
     private MQClientManager() {
 
@@ -43,7 +47,6 @@ public class MQClientManager {
     public MQClientInstance getOrCreateMQClientInstance(final ClientConfig 
clientConfig) {
         return getOrCreateMQClientInstance(clientConfig, null);
     }
-
     public MQClientInstance getOrCreateMQClientInstance(final ClientConfig 
clientConfig, RPCHook rpcHook) {
         String clientId = clientConfig.buildMQClientId();
         MQClientInstance instance = this.factoryTable.get(clientId);
@@ -62,6 +65,22 @@ public class MQClientManager {
 
         return instance;
     }
+    public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig 
clientConfig) {
+        String clientId = clientConfig.buildMQClientId();
+        ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+        if (null == accumulator) {
+            accumulator = new ProduceAccumulator(clientId);
+            ProduceAccumulator prev = 
this.accumulatorTable.putIfAbsent(clientId, accumulator);
+            if (prev != null) {
+                accumulator = prev;
+                log.warn("Returned Previous ProduceAccumulator for 
clientId:[{}]", clientId);
+            } else {
+                log.info("Created new ProduceAccumulator for clientId:[{}]", 
clientId);
+            }
+        }
+
+        return accumulator;
+    }
 
     public void removeClientFactory(final String clientId) {
         this.factoryTable.remove(clientId);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4eb0e69247..3f4c6e5f7a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
 
     }
 
+    public MessageQueue invokeMessageQueueSelector(Message msg, 
MessageQueueSelector selector, Object arg,
+        final long timeout) throws MQClientException, 
RemotingTooMuchRequestException {
+        long beginStartTime = System.currentTimeMillis();
+        this.makeSureStateOK();
+        Validators.checkMessage(msg, this.defaultMQProducer);
+
+        TopicPublishInfo topicPublishInfo = 
this.tryToFindTopicPublishInfo(msg.getTopic());
+        if (topicPublishInfo != null && topicPublishInfo.ok()) {
+            MessageQueue mq = null;
+            try {
+                List<MessageQueue> messageQueueList =
+                    
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+                Message userMessage = MessageAccessor.cloneMessage(msg);
+                String userTopic = 
NamespaceUtil.withoutNamespace(userMessage.getTopic(), 
mQClientFactory.getClientConfig().getNamespace());
+                userMessage.setTopic(userTopic);
+
+                mq = 
mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList,
 userMessage, arg));
+            } catch (Throwable e) {
+                throw new MQClientException("select message queue threw 
exception.", e);
+            }
+
+            long costTime = System.currentTimeMillis() - beginStartTime;
+            if (timeout < costTime) {
+                throw new RemotingTooMuchRequestException("sendSelectImpl call 
timeout");
+            }
+            if (mq != null) {
+                return mq;
+            } else {
+                throw new MQClientException("select message queue return 
null.", null);
+            }
+        }
+
+        validateNameServerSetting();
+        throw new MQClientException("No route info for this topic, " + 
msg.getTopic(), null);
+    }
+
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, 
final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, 
lastBrokerName);
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 6e9ffed8c0..c5b1b52230 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.topic.TopicValidator;
@@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 /**
  * This class is the entry point for applications intending to send messages. 
</p>
- *
+ * <p>
  * It's fine to tune fields which exposes getter/setter methods, but keep in 
mind, all of them should work well out of
  * box for most scenarios. </p>
- *
+ * <p>
  * This class aggregates various <code>send</code> methods to deliver messages 
to broker(s). Each of them has pros and
  * cons; you'd better understand strengths and weakness of them before 
actually coding. </p>
  *
@@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Producer group conceptually aggregates all producer instances of 
exactly same role, which is particularly
      * important when transactional messages are involved. </p>
-     *
+     * <p>
      * For non-transactional messages, it does not matter as long as it's 
unique per process. </p>
-     *
+     * <p>
      * See <a 
href="https://rocketmq.apache.org/docs/introduction/02concepts";>core 
concepts</a> for more discussion.
      */
     private String producerGroup;
@@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Maximum number of retry to perform internally before claiming sending 
failure in synchronous mode. </p>
-     *
+     * <p>
      * This may potentially cause message duplication which is up to 
application developers to resolve.
      */
     private int retryTimesWhenSendFailed = 2;
 
     /**
      * Maximum number of retry to perform internally before claiming sending 
failure in asynchronous mode. </p>
-     *
+     * <p>
      * This may potentially cause message duplication which is up to 
application developers to resolve.
      */
     private int retryTimesWhenSendAsyncFailed = 2;
@@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      */
     private TraceDispatcher traceDispatcher = null;
 
+    /**
+     * Switch flag instance for automatic batch message
+     */
+    private boolean autoBatch = false;
+    /**
+     * Instance for batching message automatically
+     */
+    private ProduceAccumulator produceAccumulator = null;
+
     /**
      * Indicate whether to block message when asynchronous sending traffic is 
too heavy.
      */
@@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Constructor specifying producer group.
      *
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param rpcHook              RPC hook to execute per each remoting 
command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, 
boolean enableMsgTrace,
         final String customizedTraceTopic) {
@@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Constructor specifying producer group.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
      */
     public DefaultMQProducer(final String namespace, final String 
producerGroup) {
@@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * Constructor specifying both producer group and RPC hook.
      *
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command 
execution.
      */
     public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
         this(null, producerGroup, rpcHook);
@@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Constructor specifying namespace, producer group and RPC hook.
      *
-     * @param namespace Namespace for this MQ Producer instance.
+     * @param namespace     Namespace for this MQ Producer instance.
      * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
+     * @param rpcHook       RPC hook to execute per each remoting command 
execution.
      */
     public DefaultMQProducer(final String namespace, final String 
producerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
     }
 
     /**
      * Constructor specifying producer group and enabled msg trace flag.
      *
-     * @param producerGroup Producer group, see the name-sake field.
+     * @param producerGroup  Producer group, see the name-sake field.
      * @param enableMsgTrace Switch flag instance for message trace.
      */
     public DefaultMQProducer(final String producerGroup, boolean 
enableMsgTrace) {
@@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Constructor specifying producer group, enabled msgTrace flag and 
customized trace topic name.
      *
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String producerGroup, boolean 
enableMsgTrace, final String customizedTraceTopic) {
         this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * Constructor specifying namespace, producer group, RPC hook, enabled 
msgTrace flag and customized trace topic
      * name.
      *
-     * @param namespace Namespace for this MQ Producer instance.
-     * @param producerGroup Producer group, see the name-sake field.
-     * @param rpcHook RPC hook to execute per each remoting command execution.
-     * @param enableMsgTrace Switch flag instance for message trace.
+     * @param namespace            Namespace for this MQ Producer instance.
+     * @param producerGroup        Producer group, see the name-sake field.
+     * @param rpcHook              RPC hook to execute per each remoting 
command execution.
+     * @param enableMsgTrace       Switch flag instance for message trace.
      * @param customizedTraceTopic The name value of message trace topic.If 
you don't config,you can use the default
-     * trace topic name.
+     *                             trace topic name.
      */
     public DefaultMQProducer(final String namespace, final String 
producerGroup, RPCHook rpcHook,
         boolean enableMsgTrace, final String customizedTraceTopic) {
         this.namespace = namespace;
         this.producerGroup = producerGroup;
         defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+        produceAccumulator = 
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
@@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     public void start() throws MQClientException {
         this.setProducerGroup(withNamespace(this.producerGroup));
         this.defaultMQProducerImpl.start();
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.start();
+        }
         if (null != traceDispatcher) {
             try {
                 traceDispatcher.start(this.getNamesrvAddr(), 
this.getAccessChannel());
@@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     @Override
     public void shutdown() {
         this.defaultMQProducerImpl.shutdown();
+        if (this.produceAccumulator != null) {
+            this.produceAccumulator.shutdown();
+        }
         if (null != traceDispatcher) {
             traceDispatcher.shutdown();
         }
@@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         return 
this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
     }
 
+    private boolean canBatch(Message msg) {
+        // produceAccumulator is full
+        if (!produceAccumulator.tryAddMessage(msg)) {
+            return false;
+        }
+        // delay message do not support batch processing
+        if (msg.getDelayTimeLevel() > 0) {
+            return false;
+        }
+        // retry message do not support batch processing
+        if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            return false;
+        }
+        // message which have been assigned to producer group do not support 
batch processing
+        if 
(msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Send message in synchronous mode. This method returns only when the 
sending procedure totally completes. </p>
      *
@@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * @param msg Message to send.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public SendResult send(
         Message msg) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg);
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, null, null);
+        } else {
+            return sendDirect(msg, null, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message)} with send timeout specified in addition.
      *
-     * @param msg Message to send.
+     * @param msg     Message to send.
      * @param timeout send timeout.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Send message to broker asynchronously. </p>
-     *
+     * <p>
      * This method returns immediately. On sending completion, 
<code>sendCallback</code> will be executed. </p>
-     *
+     * <p>
      * Similar to {@link #send(Message)}, internal implementation would 
potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, 
which may yield message duplication and
      * application developers are the one to resolve this potential issue.
      *
-     * @param msg Message to send.
+     * @param msg          Message to send.
      * @param sendCallback Callback to execute on sending completed, either 
successful or unsuccessful.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public void send(Message msg,
         SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, sendCallback);
+        try {
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, null, sendCallback);
+            } else {
+                sendDirect(msg, null, sendCallback);
+            }
+        } catch (Throwable e) {
+            sendCallback.onException(e);
+        }
     }
 
     /**
      * Same to {@link #send(Message, SendCallback)} with send timeout 
specified in addition.
      *
-     * @param msg message to send.
+     * @param msg          message to send.
      * @param sendCallback Callback to execute.
-     * @param timeout send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * acknowledgement from broker before return. Obviously, it has maximums 
throughput yet potentials of message loss.
      *
      * @param msg Message to send.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * Same to {@link #send(Message)} with target message queue specified in 
addition.
      *
      * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param mq  Target message queue.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public SendResult send(Message msg, MessageQueue mq)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
+        mq = queueWithNamespace(mq);
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, mq, null);
+        } else {
+            return sendDirect(msg, mq, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message)} with target message queue and send 
timeout specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg     Message to send.
+     * @param mq      Target message queue.
      * @param timeout send timeout.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #send(Message, SendCallback)} with target message queue 
specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg          Message to send.
+     * @param mq           Target message queue.
      * @param sendCallback Callback to execute on sending completed, either 
successful or unsuccessful.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
         throws MQClientException, RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq), 
sendCallback);
+        mq = queueWithNamespace(mq);
+        try {
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, mq, sendCallback);
+            } else {
+                sendDirect(msg, mq, sendCallback);
+            }
+        } catch (MQBrokerException e) {
+            // ignore
+        }
     }
 
     /**
      * Same to {@link #send(Message, SendCallback)} with target message queue 
and send timeout specified.
      *
-     * @param msg Message to send.
-     * @param mq Target message queue.
+     * @param msg          Message to send.
+     * @param mq           Target message queue.
      * @param sendCallback Callback to execute on sending completed, either 
successful or unsuccessful.
-     * @param timeout Send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      Send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * Same to {@link #sendOneway(Message)} with target message queue 
specified.
      *
      * @param msg Message to send.
-     * @param mq Target message queue.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param mq  Target message queue.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #send(Message)} with message queue selector specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which we get target 
message queue to deliver message to.
-     * @param arg Argument to work along with message queue selector.
+     * @param arg      Argument to work along with message queue selector.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public SendResult send(Message msg, MessageQueueSelector selector, Object 
arg)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        return this.defaultMQProducerImpl.send(msg, selector, arg);
+        MessageQueue mq = 
this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, 
this.getSendMsgTimeout());
+        mq = queueWithNamespace(mq);
+        if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+            return sendByAccumulator(msg, mq, null);
+        } else {
+            return sendDirect(msg, mq, null);
+        }
     }
 
     /**
      * Same to {@link #send(Message, MessageQueueSelector, Object)} with send 
timeout specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which we get target 
message queue to deliver message to.
-     * @param arg Argument to work along with message queue selector.
-     * @param timeout Send timeout.
+     * @param arg      Argument to work along with message queue selector.
+     * @param timeout  Send timeout.
      * @return {@link SendResult} instance to inform senders details of the 
deliverable, say Message ID of the message,
      * {@link SendStatus} indicating broker storage/replication status, 
message queue sent to, etc.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any error with broker.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
+     * @throws MQBrokerException    if there is any error with broker.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #send(Message, SendCallback)} with message queue 
selector specified.
      *
-     * @param msg Message to send.
-     * @param selector Message selector through which to get target message 
queue.
-     * @param arg Argument used along with message queue selector.
+     * @param msg          Message to send.
+     * @param selector     Message selector through which to get target 
message queue.
+     * @param arg          Argument used along with message queue selector.
      * @param sendCallback callback to execute on sending completion.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
     public void send(Message msg, MessageQueueSelector selector, Object arg, 
SendCallback sendCallback)
         throws MQClientException, RemotingException, InterruptedException {
         msg.setTopic(withNamespace(msg.getTopic()));
-        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+        try {
+            MessageQueue mq = 
this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, 
this.getSendMsgTimeout());
+            mq = queueWithNamespace(mq);
+            if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+                sendByAccumulator(msg, mq, sendCallback);
+            } else {
+                sendDirect(msg, mq, sendCallback);
+            }
+        } catch (Throwable e) {
+            sendCallback.onException(e);
+        }
     }
 
     /**
      * Same to {@link #send(Message, MessageQueueSelector, Object, 
SendCallback)} with timeout specified.
      *
-     * @param msg Message to send.
-     * @param selector Message selector through which to get target message 
queue.
-     * @param arg Argument used along with message queue selector.
+     * @param msg          Message to send.
+     * @param selector     Message selector through which to get target 
message queue.
+     * @param arg          Argument used along with message queue selector.
      * @param sendCallback callback to execute on sending completion.
-     * @param timeout Send timeout.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout      Send timeout.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, 
timeout);
     }
 
+    public SendResult sendDirect(Message msg, MessageQueue mq,
+        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException, MQBrokerException {
+        // send in sync mode
+        if (sendCallback == null) {
+            if (mq == null) {
+                return this.defaultMQProducerImpl.send(msg);
+            } else {
+                return this.defaultMQProducerImpl.send(msg, mq);
+            }
+        } else {
+            if (mq == null) {
+                this.defaultMQProducerImpl.send(msg, sendCallback);
+            } else {
+                this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+            }
+            return null;
+        }
+    }
+
+    public SendResult sendByAccumulator(Message msg, MessageQueue mq,
+        SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException, MQBrokerException {
+        // check whether it can batch
+        if (!canBatch(msg)) {
+            return sendDirect(msg, mq, sendCallback);
+        } else {
+            Validators.checkMessage(msg, this);
+            MessageClientIDSetter.setUniqID(msg);
+            if (sendCallback == null) {
+                return this.produceAccumulator.send(msg, mq, this);
+            } else {
+                this.produceAccumulator.send(msg, mq, sendCallback, this);
+                return null;
+            }
+        }
+    }
+
     /**
      * Send request message in synchronous mode. This method returns only when 
the consumer consume the request message and reply a message. </p>
      *
@@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * {@link #retryTimesWhenSendFailed} times before claiming failure. As a 
result, multiple messages may be potentially
      * delivered to broker(s). It's up to the application developers to 
resolve potential duplication issue.
      *
-     * @param msg request message to send
+     * @param msg     request message to send
      * @param timeout request timeout
      * @return reply message
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Request asynchronously. </p>
      * This method returns immediately. On receiving reply message, 
<code>requestCallback</code> will be executed. </p>
-     *
+     * <p>
      * Similar to {@link #request(Message, long)}, internal implementation 
would potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, 
which may yield message duplication and
      * application developers are the one to resolve this potential issue.
      *
-     * @param msg request message to send
+     * @param msg             request message to send
      * @param requestCallback callback to execute on request completion.
-     * @param timeout request timeout
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         request timeout
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final RequestCallback 
requestCallback, final long timeout)
@@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #request(Message, long)}  with message queue selector 
specified.
      *
-     * @param msg request message to send
+     * @param msg      request message to send
      * @param selector message queue selector, through which we get target 
message queue to deliver message to.
-     * @param arg argument to work along with message queue selector.
-     * @param timeout timeout of request.
+     * @param arg      argument to work along with message queue selector.
+     * @param timeout  timeout of request.
      * @return reply message
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #request(Message, RequestCallback, long)} with target 
message selector specified.
      *
-     * @param msg requst message to send
-     * @param selector message queue selector, through which we get target 
message queue to deliver message to.
-     * @param arg argument to work along with message queue selector.
+     * @param msg             requst message to send
+     * @param selector        message queue selector, through which we get 
target message queue to deliver message to.
+     * @param arg             argument to work along with message queue 
selector.
      * @param requestCallback callback to execute on request completion.
-     * @param timeout timeout of request.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         timeout of request.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final MessageQueueSelector 
selector, final Object arg,
@@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #request(Message, long)}  with target message queue 
specified in addition.
      *
-     * @param msg request message to send
-     * @param mq target message queue.
+     * @param msg     request message to send
+     * @param mq      target message queue.
      * @param timeout request timeout
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQClientException       if there is any client error.
+     * @throws RemotingException       if there is any network-tier error.
+     * @throws MQBrokerException       if there is any broker error.
+     * @throws InterruptedException    if the thread is interrupted.
      * @throws RequestTimeoutException if request timeout.
      */
     @Override
@@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #request(Message, RequestCallback, long)} with target 
message queue specified.
      *
-     * @param msg request message to send
-     * @param mq target message queue.
+     * @param msg             request message to send
+     * @param mq              target message queue.
      * @param requestCallback callback to execute on request completion.
-     * @param timeout timeout of request.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param timeout         timeout of request.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the thread is interrupted.
-     * @throws MQBrokerException if there is any broker error.
+     * @throws MQBrokerException    if there is any broker error.
      */
     @Override
     public void request(final Message msg, final MessageQueue mq, final 
RequestCallback requestCallback, long timeout)
@@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * Same to {@link #sendOneway(Message)} with message queue selector 
specified.
      *
-     * @param msg Message to send.
+     * @param msg      Message to send.
      * @param selector Message queue selector, through which to determine 
target message queue to deliver message
-     * @param arg Argument used along with message queue selector.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @param arg      Argument used along with message queue selector.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Override
@@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * This method is to send transactional messages.
      *
-     * @param msg Transactional message to send.
+     * @param msg          Transactional message to send.
      * @param tranExecuter local transaction executor.
-     * @param arg Argument used along with local transaction executor.
+     * @param arg          Argument used along with local transaction executor.
      * @return Transaction result.
      * @throws MQClientException if there is any client error.
      */
@@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     /**
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
-     * @param key accessKey
-     * @param newTopic topic name
-     * @param queueNum topic's queue number
+     * @param key        accessKey
+     * @param newTopic   topic name
+     * @param queueNum   topic's queue number
      * @param attributes
      * @throws MQClientException if there is any client error.
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, 
Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum,
+        Map<String, String> attributes) throws MQClientException {
         createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
 
@@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
      * Create a topic on broker. This method will be removed in a certain 
version after April 5, 2020, so please do not
      * use this method.
      *
-     * @param key accessKey
-     * @param newTopic topic name
-     * @param queueNum topic's queue number
+     * @param key          accessKey
+     * @param newTopic     topic name
+     * @param queueNum     topic's queue number
      * @param topicSysFlag topic system flag
      * @param attributes
      * @throws MQClientException if there is any client error.
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag,
+        Map<String, String> attributes) throws MQClientException {
         this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), 
queueNum, topicSysFlag);
     }
 
     /**
      * Search consume queue offset of the given time stamp.
      *
-     * @param mq Instance of MessageQueue
+     * @param mq        Instance of MessageQueue
      * @param timestamp from when in milliseconds.
      * @return Consume queue offset.
      * @throws MQClientException if there is any client error.
@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query maximum offset of the given message queue.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query minimum offset of the given message queue.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query the earliest message store time.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
      * @param mq Instance of MessageQueue
@@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query message of the given offset message ID.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
      * @param offsetMsgId message id
      * @return Message specified.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Deprecated
@@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query message by key.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
-     * @param topic message topic
-     * @param key message key index word
+     * @param topic  message topic
+     * @param key    message key index word
      * @param maxNum max message number
-     * @param begin from when
-     * @param end to when
+     * @param begin  from when
+     * @param end    to when
      * @return QueryResult instance contains matched messages.
-     * @throws MQClientException if there is any client error.
+     * @throws MQClientException    if there is any client error.
      * @throws InterruptedException if the thread is interrupted.
      */
     @Deprecated
@@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     /**
      * Query message of the given message ID.
-     *
+     * <p>
      * This method will be removed in a certain version after April 5, 2020, 
so please do not use this method.
      *
      * @param topic Topic
      * @param msgId Message ID
      * @return Message specified.
-     * @throws MQBrokerException if there is any broker error.
-     * @throws MQClientException if there is any client error.
-     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException    if there is any broker error.
+     * @throws MQClientException    if there is any client error.
+     * @throws RemotingException    if there is any network-tier error.
      * @throws InterruptedException if the sending thread is interrupted.
      */
     @Deprecated
@@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     @Override
-    public void send(Collection<Message> msgs, SendCallback sendCallback) 
throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
+    public void send(Collection<Message> msgs,
+        SendCallback sendCallback) throws MQClientException, 
RemotingException, MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
     }
 
@@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
 
     @Override
     public void send(Collection<Message> msgs, MessageQueue mq,
-        SendCallback sendCallback, long timeout) throws MQClientException, 
RemotingException, MQBrokerException, InterruptedException {
+        SendCallback sendCallback,
+        long timeout) throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException {
         this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), 
sendCallback, timeout);
     }
 
@@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
         return msgBatch;
     }
 
+    public int getBatchMaxDelayMs() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getBatchMaxDelayMs();
+    }
+
+    public void batchMaxDelayMs(int holdMs) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        }
+        this.produceAccumulator.batchMaxDelayMs(holdMs);
+    }
+
+    public long getBatchMaxBytes() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getBatchMaxBytes();
+    }
+
+    public void batchMaxBytes(long holdSize) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        }
+        this.produceAccumulator.batchMaxBytes(holdSize);
+    }
+
+    public long getTotalBatchMaxBytes() {
+        if (this.produceAccumulator == null) {
+            return 0;
+        }
+        return produceAccumulator.getTotalBatchMaxBytes();
+    }
+
+    public void totalBatchMaxBytes(long totalHoldSize) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        }
+        this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
+    }
+
+    public boolean getAutoBatch() {
+        if (this.produceAccumulator == null) {
+            return false;
+        }
+        return this.autoBatch;
+    }
+
+    public void setAutoBatch(boolean autoBatch) {
+        if (this.produceAccumulator == null) {
+            throw new UnsupportedOperationException("The currently constructed 
producer does not support autoBatch");
+        }
+        this.autoBatch = autoBatch;
+    }
+
     public String getProducerGroup() {
         return producerGroup;
     }
@@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig 
implements MQProducer {
     }
 
     public boolean isEnableBackpressureForAsyncMode() {
-        return  enableBackpressureForAsyncMode;
+        return enableBackpressureForAsyncMode;
     }
 
     public void setEnableBackpressureForAsyncMode(boolean 
enableBackpressureForAsyncMode) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java 
b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb283d..78657e623e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
         RemotingException, MQBrokerException, InterruptedException;
 
     void send(final Message msg, final SendCallback sendCallback) throws 
MQClientException,
-        RemotingException, InterruptedException;
+        RemotingException, InterruptedException, MQBrokerException;
 
     void send(final Message msg, final SendCallback sendCallback, final long 
timeout)
         throws MQClientException, RemotingException, InterruptedException;
@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
 
     SendResult send(final Collection<Message> msgs, final MessageQueue mq, 
final long timeout)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException;
-    
-    void send(final Collection<Message> msgs, final SendCallback sendCallback) 
throws MQClientException, RemotingException, MQBrokerException,
+
+    void send(final Collection<Message> msgs,
+        final SendCallback sendCallback) throws MQClientException, 
RemotingException, MQBrokerException,
         InterruptedException;
-    
-    void send(final Collection<Message> msgs, final SendCallback sendCallback, 
final long timeout) throws MQClientException, RemotingException,
+
+    void send(final Collection<Message> msgs, final SendCallback sendCallback,
+        final long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException;
-    
-    void send(final Collection<Message> msgs, final MessageQueue mq, final 
SendCallback sendCallback) throws MQClientException, RemotingException,
+
+    void send(final Collection<Message> msgs, final MessageQueue mq,
+        final SendCallback sendCallback) throws MQClientException, 
RemotingException,
         MQBrokerException, InterruptedException;
-    
-    void send(final Collection<Message> msgs, final MessageQueue mq, final 
SendCallback sendCallback, final long timeout) throws MQClientException,
+
+    void send(final Collection<Message> msgs, final MessageQueue mq, final 
SendCallback sendCallback,
+        final long timeout) throws MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
-    
+
     //for rpc
     Message request(final Message msg, final long timeout) throws 
RequestTimeoutException, MQClientException,
         RemotingException, MQBrokerException, InterruptedException;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
new file mode 100644
index 0000000000..46dfcf71d2
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+    // totalHoldSize normal value
+    private long totalHoldSize = 32 * 1024 * 1024;
+    // holdSize normal value
+    private long holdSize = 32 * 1024;
+    // holdMs normal value
+    private int holdMs = 10;
+    private final Logger log = 
LoggerFactory.getLogger(DefaultMQProducer.class);
+    private final GuardForSyncSendService guardThreadForSyncSend;
+    private final GuardForAsyncSendService guardThreadForAsyncSend;
+    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final String instanceName;
+
+    public ProduceAccumulator(String instanceName) {
+        this.instanceName = instanceName;
+        this.guardThreadForSyncSend = new 
GuardForSyncSendService(this.instanceName);
+        this.guardThreadForAsyncSend = new 
GuardForAsyncSendService(this.instanceName);
+    }
+
+    private class GuardForSyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForSyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForSyncSend", 
clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    log.warn(this.getServiceName() + " service has exception. 
", e);
+                }
+            }
+
+            log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws InterruptedException {
+            Collection<MessageAccumulation> values = syncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            for (MessageAccumulation v : values) {
+                v.wakeup();
+                synchronized (v) {
+                    synchronized (v.closed) {
+                        if (v.messagesSize.get() == 0) {
+                            v.closed.set(true);
+                            syncSendBatchs.remove(v.aggregateKey, v);
+                        } else {
+                            v.notify();
+                        }
+                    }
+                }
+            }
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    private class GuardForAsyncSendService extends ServiceThread {
+        private final String serviceName;
+
+        public GuardForAsyncSendService(String clientInstanceName) {
+            serviceName = String.format("Client_%s_GuardForAsyncSend", 
clientInstanceName);
+        }
+
+        @Override public String getServiceName() {
+            return serviceName;
+        }
+
+        @Override public void run() {
+            log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.doWork();
+                } catch (Exception e) {
+                    log.warn(this.getServiceName() + " service has exception. 
", e);
+                }
+            }
+
+            log.info(this.getServiceName() + " service end");
+        }
+
+        private void doWork() throws Exception {
+            Collection<MessageAccumulation> values = asyncSendBatchs.values();
+            final int sleepTime = Math.max(1, holdMs / 2);
+            for (MessageAccumulation v : values) {
+                if (v.readyToSend()) {
+                    v.send(null);
+                }
+                synchronized (v.closed) {
+                    if (v.messagesSize.get() == 0) {
+                        v.closed.set(true);
+                        asyncSendBatchs.remove(v.aggregateKey, v);
+                    }
+                }
+            }
+            Thread.sleep(sleepTime);
+        }
+    }
+
+    void start() {
+        guardThreadForSyncSend.start();
+        guardThreadForAsyncSend.start();
+    }
+
+    void shutdown() {
+        guardThreadForSyncSend.shutdown();
+        guardThreadForAsyncSend.shutdown();
+    }
+
+    int getBatchMaxDelayMs() {
+        return holdMs;
+    }
+
+    void batchMaxDelayMs(int holdMs) {
+        if (holdMs <= 0 || holdMs > 30 * 1000) {
+            throw new IllegalArgumentException(String.format("batchMaxDelayMs 
expect between 1ms and 30s, but get %d!", holdMs));
+        }
+        this.holdMs = holdMs;
+    }
+
+    long getBatchMaxBytes() {
+        return holdSize;
+    }
+
+    void batchMaxBytes(long holdSize) {
+        if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+            throw new IllegalArgumentException(String.format("batchMaxBytes 
expect between 1B and 2MB, but get %d!", holdSize));
+        }
+        this.holdSize = holdSize;
+    }
+
+    long getTotalBatchMaxBytes() {
+        return holdSize;
+    }
+
+    void totalBatchMaxBytes(long totalHoldSize) {
+        if (totalHoldSize <= 0) {
+            throw new 
IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0, 
but get %d!", totalHoldSize));
+        }
+        this.totalHoldSize = totalHoldSize;
+    }
+
+    private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey 
aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = 
syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+        return previous == null ? batch : previous;
+    }
+
+    private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey 
aggregateKey,
+        DefaultMQProducer defaultMQProducer) {
+        MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+        if (batch != null) {
+            return batch;
+        }
+        batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+        MessageAccumulation previous = 
asyncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+        return previous == null ? batch : previous;
+    }
+
+    SendResult send(Message msg,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, 
MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, 
defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    SendResult send(Message msg, MessageQueue mq,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, 
MQBrokerException, RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey, 
defaultMQProducer);
+            int index = batch.add(msg);
+            if (index == -1) {
+                syncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return batch.sendResults[index];
+            }
+        }
+    }
+
+    void send(Message msg, SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, 
RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg);
+        while (true) {
+            MessageAccumulation batch = 
getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    void send(Message msg, MessageQueue mq,
+        SendCallback sendCallback,
+        DefaultMQProducer defaultMQProducer) throws InterruptedException, 
RemotingException, MQClientException {
+        AggregateKey partitionKey = new AggregateKey(msg, mq);
+        while (true) {
+            MessageAccumulation batch = 
getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+            if (!batch.add(msg, sendCallback)) {
+                asyncSendBatchs.remove(partitionKey, batch);
+            } else {
+                return;
+            }
+        }
+    }
+
+    boolean tryAddMessage(Message message) {
+        synchronized (currentlyHoldSize) {
+            if (currentlyHoldSize.get() < totalHoldSize) {
+                currentlyHoldSize.addAndGet(message.getBody().length);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private class AggregateKey {
+        public String topic = null;
+        public MessageQueue mq = null;
+        public boolean waitStoreMsgOK = false;
+        public String tag = null;
+
+        public AggregateKey(Message message) {
+            this(message.getTopic(), null, message.isWaitStoreMsgOK(), 
message.getTags());
+        }
+
+        public AggregateKey(Message message, MessageQueue mq) {
+            this(message.getTopic(), mq, message.isWaitStoreMsgOK(), 
message.getTags());
+        }
+
+        public AggregateKey(String topic, MessageQueue mq, boolean 
waitStoreMsgOK, String tag) {
+            this.topic = topic;
+            this.mq = mq;
+            this.waitStoreMsgOK = waitStoreMsgOK;
+            this.tag = tag;
+        }
+
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            AggregateKey key = (AggregateKey) o;
+            return waitStoreMsgOK == key.waitStoreMsgOK && 
topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, 
key.tag);
+        }
+
+        @Override public int hashCode() {
+            return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+        }
+    }
+
+    private class MessageAccumulation {
+        private final DefaultMQProducer defaultMQProducer;
+        private LinkedList<Message> messages;
+        private LinkedList<SendCallback> sendCallbacks;
+        private Set<String> keys;
+        private AtomicBoolean closed;
+        private SendResult[] sendResults;
+        private AggregateKey aggregateKey;
+        private AtomicInteger messagesSize;
+        private int count;
+        private long createTime;
+
+        public MessageAccumulation(AggregateKey aggregateKey, 
DefaultMQProducer defaultMQProducer) {
+            this.defaultMQProducer = defaultMQProducer;
+            this.messages = new LinkedList<Message>();
+            this.sendCallbacks = new LinkedList<SendCallback>();
+            this.keys = new HashSet<String>();
+            this.closed = new AtomicBoolean(false);
+            this.messagesSize = new AtomicInteger(0);
+            this.aggregateKey = aggregateKey;
+            this.count = 0;
+            this.createTime = System.currentTimeMillis();
+        }
+
+        private boolean readyToSend() {
+            if (this.messagesSize.get() > holdSize
+                || System.currentTimeMillis() >= this.createTime + holdMs) {
+                return true;
+            }
+            return false;
+        }
+
+        public int add(
+            Message msg) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException {
+            int ret = -1;
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return ret;
+                }
+                ret = this.count++;
+                this.messages.add(msg);
+                messagesSize.addAndGet(msg.getBody().length);
+                String msgKeys = msg.getKeys();
+                if (msgKeys != null) {
+                    
this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+                }
+            }
+            synchronized (this) {
+                while (!this.closed.get()) {
+                    if (readyToSend()) {
+                        this.send();
+                        break;
+                    } else {
+                        this.wait();
+                    }
+                }
+                return ret;
+            }
+        }
+
+        public boolean add(Message msg,
+            SendCallback sendCallback) throws InterruptedException, 
RemotingException, MQClientException {
+            synchronized (this.closed) {
+                if (this.closed.get()) {
+                    return false;
+                }
+                this.count++;
+                this.messages.add(msg);
+                this.sendCallbacks.add(sendCallback);
+                messagesSize.getAndAdd(msg.getBody().length);
+            }
+            if (readyToSend()) {
+                this.send(sendCallback);
+            }
+            return true;
+
+        }
+
+        public synchronized void wakeup() {
+            if (this.closed.get()) {
+                return;
+            }
+            this.notify();
+        }
+
+        private MessageBatch batch() {
+            MessageBatch messageBatch = new MessageBatch(this.messages);
+            messageBatch.setTopic(this.aggregateKey.topic);
+            messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+            messageBatch.setKeys(this.keys);
+            messageBatch.setTags(this.aggregateKey.tag);
+            MessageClientIDSetter.setUniqID(messageBatch);
+            messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+            return messageBatch;
+        }
+
+        private void splitSendResults(SendResult sendResult) {
+            if (sendResult == null) {
+                throw new IllegalArgumentException("sendResult is null");
+            }
+            boolean isBatchConsumerQueue = 
!sendResult.getMsgId().contains(",");
+            this.sendResults = new SendResult[this.count];
+            if (!isBatchConsumerQueue) {
+                String[] msgIds = sendResult.getMsgId().split(",");
+                String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+                if (offsetMsgIds.length != this.count || msgIds.length != 
this.count) {
+                    throw new IllegalArgumentException("sendResult is 
illegal");
+                }
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = new 
SendResult(sendResult.getSendStatus(), msgIds[i],
+                        sendResult.getMessageQueue(), 
sendResult.getQueueOffset() + i,
+                        sendResult.getTransactionId(), offsetMsgIds[i], 
sendResult.getRegionId());
+                }
+            } else {
+                for (int i = 0; i < this.count; i++) {
+                    this.sendResults[i] = sendResult;
+                }
+            }
+        }
+
+        private void send() throws InterruptedException, MQClientException, 
MQBrokerException, RemotingException {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    sendResult = defaultMQProducer.sendDirect(messageBatch, 
aggregateKey.mq, null);
+                    this.splitSendResults(sendResult);
+                } else {
+                    throw new IllegalArgumentException("defaultMQProducer is 
null, can not send message");
+                }
+            } finally {
+                currentlyHoldSize.addAndGet(-messagesSize.get());
+                this.notifyAll();
+            }
+        }
+
+        private void send(SendCallback sendCallback) {
+            synchronized (this.closed) {
+                if (this.closed.getAndSet(true)) {
+                    return;
+                }
+            }
+            MessageBatch messageBatch = this.batch();
+            SendResult sendResult = null;
+            try {
+                if (defaultMQProducer != null) {
+                    final int size = messagesSize.get();
+                    defaultMQProducer.sendDirect(messageBatch, 
aggregateKey.mq, new SendCallback() {
+                        @Override public void onSuccess(SendResult sendResult) 
{
+                            try {
+                                splitSendResults(sendResult);
+                                int i = 0;
+                                Iterator<SendCallback> it = 
sendCallbacks.iterator();
+                                while (it.hasNext()) {
+                                    SendCallback v = it.next();
+                                    v.onSuccess(sendResults[i++]);
+                                }
+                                if (i != count) {
+                                    throw new 
IllegalArgumentException("sendResult is illegal");
+                                }
+                                currentlyHoldSize.addAndGet(-size);
+                            } catch (Exception e) {
+                                onException(e);
+                            }
+                        }
+
+                        @Override public void onException(Throwable e) {
+                            for (SendCallback v : sendCallbacks) {
+                                v.onException(e);
+                            }
+                            currentlyHoldSize.addAndGet(-size);
+                        }
+                    });
+                } else {
+                    throw new IllegalArgumentException("defaultMQProducer is 
null, can not send message");
+                }
+            } catch (Exception e) {
+                for (SendCallback v : sendCallbacks) {
+                    v.onException(e);
+                }
+            }
+        }
+    }
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 658f22ab0d..d4153c7cd9 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -250,7 +250,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testBatchSendMessageAsync()
-            throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
+        throws RemotingException, MQClientException, InterruptedException, 
MQBrokerException {
         final AtomicInteger cc = new AtomicInteger(0);
         final CountDownLatch countDownLatch = new CountDownLatch(4);
 
@@ -504,6 +504,42 @@ public class DefaultMQProducerTest {
         assertThat(cc.get()).isEqualTo(1);
     }
 
+    @Test
+    public void testBatchSendMessageAsync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        producer.setAutoBatch(true);
+        producer.send(message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                countDownLatch.countDown();
+            }
+        });
+
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        producer.setAutoBatch(false);
+    }
+
+    @Test
+    public void testBatchSendMessageSync_Success() throws RemotingException, 
InterruptedException, MQBrokerException, MQClientException {
+        producer.setAutoBatch(true);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), 
anyLong())).thenReturn(createTopicRoute());
+        SendResult sendResult = producer.send(message);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+        producer.setAutoBatch(false);
+    }
+
     public static TopicRouteData createTopicRoute() {
         TopicRouteData topicRouteData = new TopicRouteData();
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
new file mode 100644
index 0000000000..7074fae243
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProduceAccumulatorTest {
+    private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
+        if (!a.getTopic().equals(b.getTopic())) {
+            return false;
+        }
+        if (!Arrays.equals(a.getBody(), b.getBody())) {
+            return false;
+        }
+        return true;
+    }
+
+    private class MockMQProducer extends DefaultMQProducer {
+        private Message beSendMessage = null;
+        private MessageQueue beSendMessageQueue = null;
+
+        @Override
+        public SendResult sendDirect(Message msg, MessageQueue mq,
+            SendCallback sendCallback) {
+            this.beSendMessage = msg;
+            this.beSendMessageQueue = mq;
+
+            SendResult sendResult = new SendResult();
+            sendResult.setMsgId("123");
+            if (sendCallback != null) {
+                sendCallback.onSuccess(sendResult);
+            }
+            return sendResult;
+        }
+    }
+
+    @Test
+    public void testProduceAccumulator_async() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        MockMQProducer mockMQProducer = new MockMQProducer();
+
+        ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        List<Message> messages = new ArrayList<Message>();
+        messages.add(new Message("testTopic", "1".getBytes()));
+        messages.add(new Message("testTopic", "22".getBytes()));
+        messages.add(new Message("testTopic", "333".getBytes()));
+        messages.add(new Message("testTopic", "4444".getBytes()));
+        messages.add(new Message("testTopic", "55555".getBytes()));
+        for (Message message : messages) {
+            produceAccumulator.send(message, new SendCallback() {
+                final CountDownLatch finalCountDownLatch = countDownLatch;
+
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    finalCountDownLatch.countDown();
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    finalCountDownLatch.countDown();
+                }
+            }, mockMQProducer);
+        }
+        assertThat(countDownLatch.await(3000L, 
TimeUnit.MILLISECONDS)).isTrue();
+        assertThat(mockMQProducer.beSendMessage instanceof 
MessageBatch).isTrue();
+
+        MessageBatch messageBatch1 = (MessageBatch) 
mockMQProducer.beSendMessage;
+        MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+        messageBatch2.setBody(messageBatch2.encode());
+
+        assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue();
+    }
+
+    @Test
+    public void testProduceAccumulator_sync() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        final MockMQProducer mockMQProducer = new MockMQProducer();
+
+        final ProduceAccumulator produceAccumulator = new 
ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        List<Message> messages = new ArrayList<Message>();
+        messages.add(new Message("testTopic", "1".getBytes()));
+        messages.add(new Message("testTopic", "22".getBytes()));
+        messages.add(new Message("testTopic", "333".getBytes()));
+        messages.add(new Message("testTopic", "4444".getBytes()));
+        messages.add(new Message("testTopic", "55555".getBytes()));
+        final CountDownLatch countDownLatch = new 
CountDownLatch(messages.size());
+
+        for (final Message message : messages) {
+            new Thread(new Runnable() {
+                final ProduceAccumulator finalProduceAccumulator = 
produceAccumulator;
+                final CountDownLatch finalCountDownLatch = countDownLatch;
+                final MockMQProducer finalMockMQProducer = mockMQProducer;
+                final Message finalMessage = message;
+
+                @Override
+                public void run() {
+                    try {
+                        finalProduceAccumulator.send(finalMessage, 
finalMockMQProducer);
+                        finalCountDownLatch.countDown();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }).start();
+        }
+        assertThat(countDownLatch.await(3000L, 
TimeUnit.MILLISECONDS)).isTrue();
+        assertThat(mockMQProducer.beSendMessage instanceof 
MessageBatch).isTrue();
+
+        MessageBatch messageBatch1 = (MessageBatch) 
mockMQProducer.beSendMessage;
+        MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+        messageBatch2.setBody(messageBatch2.encode());
+
+        
assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic());
+        // The execution order is uncertain, just compare the length
+        
assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length);
+    }
+
+    @Test
+    public void testProduceAccumulator_sendWithMessageQueue() throws 
MQBrokerException, RemotingException, InterruptedException, MQClientException {
+        MockMQProducer mockMQProducer = new MockMQProducer();
+
+        MessageQueue messageQueue = new MessageQueue("topicTest", 
"brokerTest", 0);
+        final ProduceAccumulator produceAccumulator = new 
ProduceAccumulator("test");
+        produceAccumulator.start();
+
+        Message message = new Message("testTopic", "1".getBytes());
+        produceAccumulator.send(message, messageQueue, mockMQProducer);
+        assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        produceAccumulator.send(message, messageQueue, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                countDownLatch.countDown();
+            }
+        }, mockMQProducer);
+        assertThat(countDownLatch.await(3000L, 
TimeUnit.MILLISECONDS)).isTrue();
+        assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a423048c5c..30369b8f37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements 
Iterable<Message> {
     private static final long serialVersionUID = 621335151046335557L;
     private final List<Message> messages;
 
-    private MessageBatch(List<Message> messages) {
+    public MessageBatch(List<Message> messages) {
         this.messages = messages;
     }
 


Reply via email to