This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new af993d28e2 [ISSUE #3717][RIP-27] Auto batching in producer
af993d28e2 is described below
commit af993d28e20922d91862f0911e59f748dcb64e6a
Author: guyinyou <[email protected]>
AuthorDate: Fri Jul 21 09:31:56 2023 +0800
[ISSUE #3717][RIP-27] Auto batching in producer
Co-authored-by: guyinyou <[email protected]>
---
.../rocketmq/client/impl/MQClientManager.java | 21 +-
.../impl/producer/DefaultMQProducerImpl.java | 36 ++
.../client/producer/DefaultMQProducer.java | 501 +++++++++++++-------
.../rocketmq/client/producer/MQProducer.java | 24 +-
.../client/producer/ProduceAccumulator.java | 510 +++++++++++++++++++++
.../client/producer/DefaultMQProducerTest.java | 38 +-
.../client/producer/ProduceAccumulatorTest.java | 176 +++++++
.../rocketmq/common/message/MessageBatch.java | 2 +-
8 files changed, 1133 insertions(+), 175 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 49186633fa..02eaa66e99 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.producer.ProduceAccumulator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -31,6 +32,9 @@ public class MQClientManager {
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable
=
new ConcurrentHashMap<>();
+ private ConcurrentMap<String/* clientId */, ProduceAccumulator>
accumulatorTable =
+ new ConcurrentHashMap<String, ProduceAccumulator>();
+
private MQClientManager() {
@@ -43,7 +47,6 @@ public class MQClientManager {
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig
clientConfig) {
return getOrCreateMQClientInstance(clientConfig, null);
}
-
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig
clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
@@ -62,6 +65,22 @@ public class MQClientManager {
return instance;
}
+ public ProduceAccumulator getOrCreateProduceAccumulator(final ClientConfig
clientConfig) {
+ String clientId = clientConfig.buildMQClientId();
+ ProduceAccumulator accumulator = this.accumulatorTable.get(clientId);
+ if (null == accumulator) {
+ accumulator = new ProduceAccumulator(clientId);
+ ProduceAccumulator prev =
this.accumulatorTable.putIfAbsent(clientId, accumulator);
+ if (prev != null) {
+ accumulator = prev;
+ log.warn("Returned Previous ProduceAccumulator for
clientId:[{}]", clientId);
+ } else {
+ log.info("Created new ProduceAccumulator for clientId:[{}]",
clientId);
+ }
+ }
+
+ return accumulator;
+ }
public void removeClientFactory(final String clientId) {
this.factoryTable.remove(clientId);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4eb0e69247..3f4c6e5f7a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -573,6 +573,42 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
}
+ public MessageQueue invokeMessageQueueSelector(Message msg,
MessageQueueSelector selector, Object arg,
+ final long timeout) throws MQClientException,
RemotingTooMuchRequestException {
+ long beginStartTime = System.currentTimeMillis();
+ this.makeSureStateOK();
+ Validators.checkMessage(msg, this.defaultMQProducer);
+
+ TopicPublishInfo topicPublishInfo =
this.tryToFindTopicPublishInfo(msg.getTopic());
+ if (topicPublishInfo != null && topicPublishInfo.ok()) {
+ MessageQueue mq = null;
+ try {
+ List<MessageQueue> messageQueueList =
+
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
+ Message userMessage = MessageAccessor.cloneMessage(msg);
+ String userTopic =
NamespaceUtil.withoutNamespace(userMessage.getTopic(),
mQClientFactory.getClientConfig().getNamespace());
+ userMessage.setTopic(userTopic);
+
+ mq =
mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList,
userMessage, arg));
+ } catch (Throwable e) {
+ throw new MQClientException("select message queue threw
exception.", e);
+ }
+
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeout < costTime) {
+ throw new RemotingTooMuchRequestException("sendSelectImpl call
timeout");
+ }
+ if (mq != null) {
+ return mq;
+ } else {
+ throw new MQClientException("select message queue return
null.", null);
+ }
+ }
+
+ validateNameServerSetting();
+ throw new MQClientException("No route info for this topic, " +
msg.getTopic(), null);
+ }
+
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,
final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo,
lastBrokerName);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 6e9ffed8c0..c5b1b52230 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
@@ -38,6 +39,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
@@ -49,10 +51,10 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
* This class is the entry point for applications intending to send messages.
</p>
- *
+ * <p>
* It's fine to tune fields which exposes getter/setter methods, but keep in
mind, all of them should work well out of
* box for most scenarios. </p>
- *
+ * <p>
* This class aggregates various <code>send</code> methods to deliver messages
to broker(s). Each of them has pros and
* cons; you'd better understand strengths and weakness of them before
actually coding. </p>
*
@@ -78,9 +80,9 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Producer group conceptually aggregates all producer instances of
exactly same role, which is particularly
* important when transactional messages are involved. </p>
- *
+ * <p>
* For non-transactional messages, it does not matter as long as it's
unique per process. </p>
- *
+ * <p>
* See <a
href="https://rocketmq.apache.org/docs/introduction/02concepts">core
concepts</a> for more discussion.
*/
private String producerGroup;
@@ -107,14 +109,14 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Maximum number of retry to perform internally before claiming sending
failure in synchronous mode. </p>
- *
+ * <p>
* This may potentially cause message duplication which is up to
application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* Maximum number of retry to perform internally before claiming sending
failure in asynchronous mode. </p>
- *
+ * <p>
* This may potentially cause message duplication which is up to
application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
@@ -134,6 +136,15 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private TraceDispatcher traceDispatcher = null;
+ /**
+ * Switch flag instance for automatic batch message
+ */
+ private boolean autoBatch = false;
+ /**
+ * Instance for batching message automatically
+ */
+ private ProduceAccumulator produceAccumulator = null;
+
/**
* Indicate whether to block message when asynchronous sending traffic is
too heavy.
*/
@@ -179,11 +190,11 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting
command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace,
final String customizedTraceTopic) {
@@ -193,7 +204,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Constructor specifying producer group.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
*/
public DefaultMQProducer(final String namespace, final String
producerGroup) {
@@ -204,7 +215,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* Constructor specifying both producer group and RPC hook.
*
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command
execution.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this(null, producerGroup, rpcHook);
@@ -213,20 +224,21 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Constructor specifying namespace, producer group and RPC hook.
*
- * @param namespace Namespace for this MQ Producer instance.
+ * @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param rpcHook RPC hook to execute per each remoting command
execution.
*/
public DefaultMQProducer(final String namespace, final String
producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
/**
* Constructor specifying producer group and enabled msg trace flag.
*
- * @param producerGroup Producer group, see the name-sake field.
+ * @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean
enableMsgTrace) {
@@ -236,10 +248,10 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Constructor specifying producer group, enabled msgTrace flag and
customized trace topic name.
*
- * @param producerGroup Producer group, see the name-sake field.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean
enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -249,18 +261,19 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* Constructor specifying namespace, producer group, RPC hook, enabled
msgTrace flag and customized trace topic
* name.
*
- * @param namespace Namespace for this MQ Producer instance.
- * @param producerGroup Producer group, see the name-sake field.
- * @param rpcHook RPC hook to execute per each remoting command execution.
- * @param enableMsgTrace Switch flag instance for message trace.
+ * @param namespace Namespace for this MQ Producer instance.
+ * @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting
command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If
you don't config,you can use the default
- * trace topic name.
+ * trace topic name.
*/
public DefaultMQProducer(final String namespace, final String
producerGroup, RPCHook rpcHook,
boolean enableMsgTrace, final String customizedTraceTopic) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
//if client open the message trace feature
if (enableMsgTrace) {
try {
@@ -297,6 +310,9 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.start();
+ }
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(),
this.getAccessChannel());
@@ -312,6 +328,9 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
+ if (this.produceAccumulator != null) {
+ this.produceAccumulator.shutdown();
+ }
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
@@ -329,6 +348,26 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
return
this.defaultMQProducerImpl.fetchPublishMessageQueues(withNamespace(topic));
}
+ private boolean canBatch(Message msg) {
+ // produceAccumulator is full
+ if (!produceAccumulator.tryAddMessage(msg)) {
+ return false;
+ }
+ // delay message do not support batch processing
+ if (msg.getDelayTimeLevel() > 0) {
+ return false;
+ }
+ // retry message do not support batch processing
+ if (msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ return false;
+ }
+ // message which have been assigned to producer group do not support
batch processing
+ if
(msg.getProperties().containsKey(MessageConst.PROPERTY_PRODUCER_GROUP)) {
+ return false;
+ }
+ return true;
+ }
+
/**
* Send message in synchronous mode. This method returns only when the
sending procedure totally completes. </p>
*
@@ -339,28 +378,32 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, null, null);
+ } else {
+ return sendDirect(msg, null, null);
+ }
}
/**
* Same to {@link #send(Message)} with send timeout specified in addition.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -372,34 +415,42 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Send message to broker asynchronously. </p>
- *
+ * <p>
* This method returns immediately. On sending completion,
<code>sendCallback</code> will be executed. </p>
- *
+ * <p>
* Similar to {@link #send(Message)}, internal implementation would
potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure,
which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either
successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg,
SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, sendCallback);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, null, sendCallback);
+ } else {
+ sendDirect(msg, null, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with send timeout
specified in addition.
*
- * @param msg message to send.
+ * @param msg message to send.
* @param sendCallback Callback to execute.
- * @param timeout send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -414,8 +465,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* acknowledgement from broker before return. Obviously, it has maximums
throughput yet potentials of message loss.
*
* @param msg Message to send.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -428,32 +479,37 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* Same to {@link #send(Message)} with target message queue specified in
addition.
*
* @param msg Message to send.
- * @param mq Target message queue.
+ * @param mq Target message queue.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq));
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message)} with target message queue and send
timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param timeout send timeout.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -466,29 +522,38 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with target message queue
specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either
successful or unsuccessful.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, queueWithNamespace(mq),
sendCallback);
+ mq = queueWithNamespace(mq);
+ try {
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (MQBrokerException e) {
+ // ignore
+ }
}
/**
* Same to {@link #send(Message, SendCallback)} with target message queue
and send timeout specified.
*
- * @param msg Message to send.
- * @param mq Target message queue.
+ * @param msg Message to send.
+ * @param mq Target message queue.
* @param sendCallback Callback to execute on sending completed, either
successful or unsuccessful.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -502,9 +567,9 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* Same to {@link #sendOneway(Message)} with target message queue
specified.
*
* @param msg Message to send.
- * @param mq Target message queue.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param mq Target message queue.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -517,35 +582,41 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #send(Message)} with message queue selector specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target
message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
+ * @param arg Argument to work along with message queue selector.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object
arg)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- return this.defaultMQProducerImpl.send(msg, selector, arg);
+ MessageQueue mq =
this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg,
this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ return sendByAccumulator(msg, mq, null);
+ } else {
+ return sendDirect(msg, mq, null);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object)} with send
timeout specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which we get target
message queue to deliver message to.
- * @param arg Argument to work along with message queue selector.
- * @param timeout Send timeout.
+ * @param arg Argument to work along with message queue selector.
+ * @param timeout Send timeout.
* @return {@link SendResult} instance to inform senders details of the
deliverable, say Message ID of the message,
* {@link SendStatus} indicating broker storage/replication status,
message queue sent to, etc.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any error with broker.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -558,31 +629,41 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #send(Message, SendCallback)} with message queue
selector specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message
queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target
message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public void send(Message msg, MessageQueueSelector selector, Object arg,
SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+ try {
+ MessageQueue mq =
this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg,
this.getSendMsgTimeout());
+ mq = queueWithNamespace(mq);
+ if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
+ sendByAccumulator(msg, mq, sendCallback);
+ } else {
+ sendDirect(msg, mq, sendCallback);
+ }
+ } catch (Throwable e) {
+ sendCallback.onException(e);
+ }
}
/**
* Same to {@link #send(Message, MessageQueueSelector, Object,
SendCallback)} with timeout specified.
*
- * @param msg Message to send.
- * @param selector Message selector through which to get target message
queue.
- * @param arg Argument used along with message queue selector.
+ * @param msg Message to send.
+ * @param selector Message selector through which to get target
message queue.
+ * @param arg Argument used along with message queue selector.
* @param sendCallback callback to execute on sending completion.
- * @param timeout Send timeout.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout Send timeout.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -592,6 +673,42 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback,
timeout);
}
+ public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
+ // send in sync mode
+ if (sendCallback == null) {
+ if (mq == null) {
+ return this.defaultMQProducerImpl.send(msg);
+ } else {
+ return this.defaultMQProducerImpl.send(msg, mq);
+ }
+ } else {
+ if (mq == null) {
+ this.defaultMQProducerImpl.send(msg, sendCallback);
+ } else {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+ }
+ return null;
+ }
+ }
+
+ public SendResult sendByAccumulator(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
+ // check whether it can batch
+ if (!canBatch(msg)) {
+ return sendDirect(msg, mq, sendCallback);
+ } else {
+ Validators.checkMessage(msg, this);
+ MessageClientIDSetter.setUniqID(msg);
+ if (sendCallback == null) {
+ return this.produceAccumulator.send(msg, mq, this);
+ } else {
+ this.produceAccumulator.send(msg, mq, sendCallback, this);
+ return null;
+ }
+ }
+ }
+
/**
* Send request message in synchronous mode. This method returns only when
the consumer consume the request message and reply a message. </p>
*
@@ -599,13 +716,13 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a
result, multiple messages may be potentially
* delivered to broker(s). It's up to the application developers to
resolve potential duplication issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param timeout request timeout
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -618,18 +735,18 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Request asynchronously. </p>
* This method returns immediately. On receiving reply message,
<code>requestCallback</code> will be executed. </p>
- *
+ * <p>
* Similar to {@link #request(Message, long)}, internal implementation
would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure,
which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param requestCallback callback to execute on request completion.
- * @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final RequestCallback
requestCallback, final long timeout)
@@ -641,15 +758,15 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #request(Message, long)} with message queue selector
specified.
*
- * @param msg request message to send
+ * @param msg request message to send
* @param selector message queue selector, through which we get target
message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
- * @param timeout timeout of request.
+ * @param arg argument to work along with message queue selector.
+ * @param timeout timeout of request.
* @return reply message
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -663,15 +780,15 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target
message selector specified.
*
- * @param msg requst message to send
- * @param selector message queue selector, through which we get target
message queue to deliver message to.
- * @param arg argument to work along with message queue selector.
+ * @param msg requst message to send
+ * @param selector message queue selector, through which we get
target message queue to deliver message to.
+ * @param arg argument to work along with message queue
selector.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueueSelector
selector, final Object arg,
@@ -684,13 +801,13 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #request(Message, long)} with target message queue
specified in addition.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param timeout request timeout
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
- * @throws MQBrokerException if there is any broker error.
- * @throws InterruptedException if the thread is interrupted.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
* @throws RequestTimeoutException if request timeout.
*/
@Override
@@ -703,14 +820,14 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #request(Message, RequestCallback, long)} with target
message queue specified.
*
- * @param msg request message to send
- * @param mq target message queue.
+ * @param msg request message to send
+ * @param mq target message queue.
* @param requestCallback callback to execute on request completion.
- * @param timeout timeout of request.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the thread is interrupted.
- * @throws MQBrokerException if there is any broker error.
+ * @throws MQBrokerException if there is any broker error.
*/
@Override
public void request(final Message msg, final MessageQueue mq, final
RequestCallback requestCallback, long timeout)
@@ -722,11 +839,11 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Same to {@link #sendOneway(Message)} with message queue selector
specified.
*
- * @param msg Message to send.
+ * @param msg Message to send.
* @param selector Message queue selector, through which to determine
target message queue to deliver message
- * @param arg Argument used along with message queue selector.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @param arg Argument used along with message queue selector.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
@@ -739,9 +856,9 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* This method is to send transactional messages.
*
- * @param msg Transactional message to send.
+ * @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
- * @param arg Argument used along with local transaction executor.
+ * @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@@ -769,15 +886,16 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
- * @param key accessKey
- * @param newTopic topic name
- * @param queueNum topic's queue number
+ * @param key accessKey
+ * @param newTopic topic name
+ * @param queueNum topic's queue number
* @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum,
+ Map<String, String> attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}
@@ -785,23 +903,24 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
* Create a topic on broker. This method will be removed in a certain
version after April 5, 2020, so please do not
* use this method.
*
- * @param key accessKey
- * @param newTopic topic name
- * @param queueNum topic's queue number
+ * @param key accessKey
+ * @param newTopic topic name
+ * @param queueNum topic's queue number
* @param topicSysFlag topic system flag
* @param attributes
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
- public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag, Map<String, String> attributes) throws MQClientException {
+ public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag,
+ Map<String, String> attributes) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic),
queueNum, topicSysFlag);
}
/**
* Search consume queue offset of the given time stamp.
*
- * @param mq Instance of MessageQueue
+ * @param mq Instance of MessageQueue
* @param timestamp from when in milliseconds.
* @return Consume queue offset.
* @throws MQClientException if there is any client error.
@@ -813,7 +932,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query maximum offset of the given message queue.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -828,7 +947,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query minimum offset of the given message queue.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -843,7 +962,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query the earliest message store time.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
* @param mq Instance of MessageQueue
@@ -858,14 +977,14 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query message of the given offset message ID.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
* @param offsetMsgId message id
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -877,16 +996,16 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query message by key.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
- * @param topic message topic
- * @param key message key index word
+ * @param topic message topic
+ * @param key message key index word
* @param maxNum max message number
- * @param begin from when
- * @param end to when
+ * @param begin from when
+ * @param end to when
* @return QueryResult instance contains matched messages.
- * @throws MQClientException if there is any client error.
+ * @throws MQClientException if there is any client error.
* @throws InterruptedException if the thread is interrupted.
*/
@Deprecated
@@ -898,15 +1017,15 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
/**
* Query message of the given message ID.
- *
+ * <p>
* This method will be removed in a certain version after April 5, 2020,
so please do not use this method.
*
* @param topic Topic
* @param msgId Message ID
* @return Message specified.
- * @throws MQBrokerException if there is any broker error.
- * @throws MQClientException if there is any client error.
- * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@@ -945,7 +1064,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
@Override
- public void send(Collection<Message> msgs, SendCallback sendCallback)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
+ public void send(Collection<Message> msgs,
+ SendCallback sendCallback) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}
@@ -963,7 +1083,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
@Override
public void send(Collection<Message> msgs, MessageQueue mq,
- SendCallback sendCallback, long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
+ SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq),
sendCallback, timeout);
}
@@ -1012,6 +1133,62 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
return msgBatch;
}
+ public int getBatchMaxDelayMs() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxDelayMs();
+ }
+
+ public void batchMaxDelayMs(int holdMs) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxDelayMs(holdMs);
+ }
+
+ public long getBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getBatchMaxBytes();
+ }
+
+ public void batchMaxBytes(long holdSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ }
+ this.produceAccumulator.batchMaxBytes(holdSize);
+ }
+
+ public long getTotalBatchMaxBytes() {
+ if (this.produceAccumulator == null) {
+ return 0;
+ }
+ return produceAccumulator.getTotalBatchMaxBytes();
+ }
+
+ public void totalBatchMaxBytes(long totalHoldSize) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ }
+ this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
+ }
+
+ public boolean getAutoBatch() {
+ if (this.produceAccumulator == null) {
+ return false;
+ }
+ return this.autoBatch;
+ }
+
+ public void setAutoBatch(boolean autoBatch) {
+ if (this.produceAccumulator == null) {
+ throw new UnsupportedOperationException("The currently constructed
producer does not support autoBatch");
+ }
+ this.autoBatch = autoBatch;
+ }
+
public String getProducerGroup() {
return producerGroup;
}
@@ -1130,7 +1307,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
}
public boolean isEnableBackpressureForAsyncMode() {
- return enableBackpressureForAsyncMode;
+ return enableBackpressureForAsyncMode;
}
public void setEnableBackpressureForAsyncMode(boolean
enableBackpressureForAsyncMode) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index f70ddb283d..78657e623e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -40,7 +40,7 @@ public interface MQProducer extends MQAdmin {
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws
MQClientException,
- RemotingException, InterruptedException;
+ RemotingException, InterruptedException, MQBrokerException;
void send(final Message msg, final SendCallback sendCallback, final long
timeout)
throws MQClientException, RemotingException, InterruptedException;
@@ -99,19 +99,23 @@ public interface MQProducer extends MQAdmin {
SendResult send(final Collection<Message> msgs, final MessageQueue mq,
final long timeout)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
-
- void send(final Collection<Message> msgs, final SendCallback sendCallback)
throws MQClientException, RemotingException, MQBrokerException,
+
+ void send(final Collection<Message> msgs,
+ final SendCallback sendCallback) throws MQClientException,
RemotingException, MQBrokerException,
InterruptedException;
-
- void send(final Collection<Message> msgs, final SendCallback sendCallback,
final long timeout) throws MQClientException, RemotingException,
+
+ void send(final Collection<Message> msgs, final SendCallback sendCallback,
+ final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection<Message> msgs, final MessageQueue mq, final
SendCallback sendCallback) throws MQClientException, RemotingException,
+
+ void send(final Collection<Message> msgs, final MessageQueue mq,
+ final SendCallback sendCallback) throws MQClientException,
RemotingException,
MQBrokerException, InterruptedException;
-
- void send(final Collection<Message> msgs, final MessageQueue mq, final
SendCallback sendCallback, final long timeout) throws MQClientException,
+
+ void send(final Collection<Message> msgs, final MessageQueue mq, final
SendCallback sendCallback,
+ final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
-
+
//for rpc
Message request(final Message msg, final long timeout) throws
RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
new file mode 100644
index 0000000000..46dfcf71d2
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProduceAccumulator {
+ // totalHoldSize normal value
+ private long totalHoldSize = 32 * 1024 * 1024;
+ // holdSize normal value
+ private long holdSize = 32 * 1024;
+ // holdMs normal value
+ private int holdMs = 10;
+ private final Logger log =
LoggerFactory.getLogger(DefaultMQProducer.class);
+ private final GuardForSyncSendService guardThreadForSyncSend;
+ private final GuardForAsyncSendService guardThreadForAsyncSend;
+ private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+ private AtomicLong currentlyHoldSize = new AtomicLong(0);
+ private final String instanceName;
+
+ public ProduceAccumulator(String instanceName) {
+ this.instanceName = instanceName;
+ this.guardThreadForSyncSend = new
GuardForSyncSendService(this.instanceName);
+ this.guardThreadForAsyncSend = new
GuardForAsyncSendService(this.instanceName);
+ }
+
+ private class GuardForSyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForSyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForSyncSend",
clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception.
", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws InterruptedException {
+ Collection<MessageAccumulation> values = syncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ v.wakeup();
+ synchronized (v) {
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ syncSendBatchs.remove(v.aggregateKey, v);
+ } else {
+ v.notify();
+ }
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ private class GuardForAsyncSendService extends ServiceThread {
+ private final String serviceName;
+
+ public GuardForAsyncSendService(String clientInstanceName) {
+ serviceName = String.format("Client_%s_GuardForAsyncSend",
clientInstanceName);
+ }
+
+ @Override public String getServiceName() {
+ return serviceName;
+ }
+
+ @Override public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.doWork();
+ } catch (Exception e) {
+ log.warn(this.getServiceName() + " service has exception.
", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ private void doWork() throws Exception {
+ Collection<MessageAccumulation> values = asyncSendBatchs.values();
+ final int sleepTime = Math.max(1, holdMs / 2);
+ for (MessageAccumulation v : values) {
+ if (v.readyToSend()) {
+ v.send(null);
+ }
+ synchronized (v.closed) {
+ if (v.messagesSize.get() == 0) {
+ v.closed.set(true);
+ asyncSendBatchs.remove(v.aggregateKey, v);
+ }
+ }
+ }
+ Thread.sleep(sleepTime);
+ }
+ }
+
+ void start() {
+ guardThreadForSyncSend.start();
+ guardThreadForAsyncSend.start();
+ }
+
+ void shutdown() {
+ guardThreadForSyncSend.shutdown();
+ guardThreadForAsyncSend.shutdown();
+ }
+
+ int getBatchMaxDelayMs() {
+ return holdMs;
+ }
+
+ void batchMaxDelayMs(int holdMs) {
+ if (holdMs <= 0 || holdMs > 30 * 1000) {
+ throw new IllegalArgumentException(String.format("batchMaxDelayMs
expect between 1ms and 30s, but get %d!", holdMs));
+ }
+ this.holdMs = holdMs;
+ }
+
+ long getBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void batchMaxBytes(long holdSize) {
+ if (holdSize <= 0 || holdSize > 2 * 1024 * 1024) {
+ throw new IllegalArgumentException(String.format("batchMaxBytes
expect between 1B and 2MB, but get %d!", holdSize));
+ }
+ this.holdSize = holdSize;
+ }
+
+ long getTotalBatchMaxBytes() {
+ return holdSize;
+ }
+
+ void totalBatchMaxBytes(long totalHoldSize) {
+ if (totalHoldSize <= 0) {
+ throw new
IllegalArgumentException(String.format("totalBatchMaxBytes must bigger then 0,
but get %d!", totalHoldSize));
+ }
+ this.totalHoldSize = totalHoldSize;
+ }
+
+ private MessageAccumulation getOrCreateSyncSendBatch(AggregateKey
aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = syncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous =
syncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ private MessageAccumulation getOrCreateAsyncSendBatch(AggregateKey
aggregateKey,
+ DefaultMQProducer defaultMQProducer) {
+ MessageAccumulation batch = asyncSendBatchs.get(aggregateKey);
+ if (batch != null) {
+ return batch;
+ }
+ batch = new MessageAccumulation(aggregateKey, defaultMQProducer);
+ MessageAccumulation previous =
asyncSendBatchs.putIfAbsent(aggregateKey, batch);
+
+ return previous == null ? batch : previous;
+ }
+
+ SendResult send(Message msg,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey,
defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ SendResult send(Message msg, MessageQueue mq,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch = getOrCreateSyncSendBatch(partitionKey,
defaultMQProducer);
+ int index = batch.add(msg);
+ if (index == -1) {
+ syncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return batch.sendResults[index];
+ }
+ }
+ }
+
+ void send(Message msg, SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException,
RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg);
+ while (true) {
+ MessageAccumulation batch =
getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ void send(Message msg, MessageQueue mq,
+ SendCallback sendCallback,
+ DefaultMQProducer defaultMQProducer) throws InterruptedException,
RemotingException, MQClientException {
+ AggregateKey partitionKey = new AggregateKey(msg, mq);
+ while (true) {
+ MessageAccumulation batch =
getOrCreateAsyncSendBatch(partitionKey, defaultMQProducer);
+ if (!batch.add(msg, sendCallback)) {
+ asyncSendBatchs.remove(partitionKey, batch);
+ } else {
+ return;
+ }
+ }
+ }
+
+ boolean tryAddMessage(Message message) {
+ synchronized (currentlyHoldSize) {
+ if (currentlyHoldSize.get() < totalHoldSize) {
+ currentlyHoldSize.addAndGet(message.getBody().length);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private class AggregateKey {
+ public String topic = null;
+ public MessageQueue mq = null;
+ public boolean waitStoreMsgOK = false;
+ public String tag = null;
+
+ public AggregateKey(Message message) {
+ this(message.getTopic(), null, message.isWaitStoreMsgOK(),
message.getTags());
+ }
+
+ public AggregateKey(Message message, MessageQueue mq) {
+ this(message.getTopic(), mq, message.isWaitStoreMsgOK(),
message.getTags());
+ }
+
+ public AggregateKey(String topic, MessageQueue mq, boolean
waitStoreMsgOK, String tag) {
+ this.topic = topic;
+ this.mq = mq;
+ this.waitStoreMsgOK = waitStoreMsgOK;
+ this.tag = tag;
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ AggregateKey key = (AggregateKey) o;
+ return waitStoreMsgOK == key.waitStoreMsgOK &&
topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag,
key.tag);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(topic, mq, waitStoreMsgOK, tag);
+ }
+ }
+
+ private class MessageAccumulation {
+ private final DefaultMQProducer defaultMQProducer;
+ private LinkedList<Message> messages;
+ private LinkedList<SendCallback> sendCallbacks;
+ private Set<String> keys;
+ private AtomicBoolean closed;
+ private SendResult[] sendResults;
+ private AggregateKey aggregateKey;
+ private AtomicInteger messagesSize;
+ private int count;
+ private long createTime;
+
+ public MessageAccumulation(AggregateKey aggregateKey,
DefaultMQProducer defaultMQProducer) {
+ this.defaultMQProducer = defaultMQProducer;
+ this.messages = new LinkedList<Message>();
+ this.sendCallbacks = new LinkedList<SendCallback>();
+ this.keys = new HashSet<String>();
+ this.closed = new AtomicBoolean(false);
+ this.messagesSize = new AtomicInteger(0);
+ this.aggregateKey = aggregateKey;
+ this.count = 0;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ private boolean readyToSend() {
+ if (this.messagesSize.get() > holdSize
+ || System.currentTimeMillis() >= this.createTime + holdMs) {
+ return true;
+ }
+ return false;
+ }
+
+ public int add(
+ Message msg) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
+ int ret = -1;
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return ret;
+ }
+ ret = this.count++;
+ this.messages.add(msg);
+ messagesSize.addAndGet(msg.getBody().length);
+ String msgKeys = msg.getKeys();
+ if (msgKeys != null) {
+
this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
+ }
+ }
+ synchronized (this) {
+ while (!this.closed.get()) {
+ if (readyToSend()) {
+ this.send();
+ break;
+ } else {
+ this.wait();
+ }
+ }
+ return ret;
+ }
+ }
+
+ public boolean add(Message msg,
+ SendCallback sendCallback) throws InterruptedException,
RemotingException, MQClientException {
+ synchronized (this.closed) {
+ if (this.closed.get()) {
+ return false;
+ }
+ this.count++;
+ this.messages.add(msg);
+ this.sendCallbacks.add(sendCallback);
+ messagesSize.getAndAdd(msg.getBody().length);
+ }
+ if (readyToSend()) {
+ this.send(sendCallback);
+ }
+ return true;
+
+ }
+
+ public synchronized void wakeup() {
+ if (this.closed.get()) {
+ return;
+ }
+ this.notify();
+ }
+
+ private MessageBatch batch() {
+ MessageBatch messageBatch = new MessageBatch(this.messages);
+ messageBatch.setTopic(this.aggregateKey.topic);
+ messageBatch.setWaitStoreMsgOK(this.aggregateKey.waitStoreMsgOK);
+ messageBatch.setKeys(this.keys);
+ messageBatch.setTags(this.aggregateKey.tag);
+ MessageClientIDSetter.setUniqID(messageBatch);
+ messageBatch.setBody(MessageDecoder.encodeMessages(this.messages));
+ return messageBatch;
+ }
+
+ private void splitSendResults(SendResult sendResult) {
+ if (sendResult == null) {
+ throw new IllegalArgumentException("sendResult is null");
+ }
+ boolean isBatchConsumerQueue =
!sendResult.getMsgId().contains(",");
+ this.sendResults = new SendResult[this.count];
+ if (!isBatchConsumerQueue) {
+ String[] msgIds = sendResult.getMsgId().split(",");
+ String[] offsetMsgIds = sendResult.getOffsetMsgId().split(",");
+ if (offsetMsgIds.length != this.count || msgIds.length !=
this.count) {
+ throw new IllegalArgumentException("sendResult is
illegal");
+ }
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = new
SendResult(sendResult.getSendStatus(), msgIds[i],
+ sendResult.getMessageQueue(),
sendResult.getQueueOffset() + i,
+ sendResult.getTransactionId(), offsetMsgIds[i],
sendResult.getRegionId());
+ }
+ } else {
+ for (int i = 0; i < this.count; i++) {
+ this.sendResults[i] = sendResult;
+ }
+ }
+ }
+
+ private void send() throws InterruptedException, MQClientException,
MQBrokerException, RemotingException {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ sendResult = defaultMQProducer.sendDirect(messageBatch,
aggregateKey.mq, null);
+ this.splitSendResults(sendResult);
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is
null, can not send message");
+ }
+ } finally {
+ currentlyHoldSize.addAndGet(-messagesSize.get());
+ this.notifyAll();
+ }
+ }
+
+ private void send(SendCallback sendCallback) {
+ synchronized (this.closed) {
+ if (this.closed.getAndSet(true)) {
+ return;
+ }
+ }
+ MessageBatch messageBatch = this.batch();
+ SendResult sendResult = null;
+ try {
+ if (defaultMQProducer != null) {
+ final int size = messagesSize.get();
+ defaultMQProducer.sendDirect(messageBatch,
aggregateKey.mq, new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult)
{
+ try {
+ splitSendResults(sendResult);
+ int i = 0;
+ Iterator<SendCallback> it =
sendCallbacks.iterator();
+ while (it.hasNext()) {
+ SendCallback v = it.next();
+ v.onSuccess(sendResults[i++]);
+ }
+ if (i != count) {
+ throw new
IllegalArgumentException("sendResult is illegal");
+ }
+ currentlyHoldSize.addAndGet(-size);
+ } catch (Exception e) {
+ onException(e);
+ }
+ }
+
+ @Override public void onException(Throwable e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ currentlyHoldSize.addAndGet(-size);
+ }
+ });
+ } else {
+ throw new IllegalArgumentException("defaultMQProducer is
null, can not send message");
+ }
+ } catch (Exception e) {
+ for (SendCallback v : sendCallbacks) {
+ v.onException(e);
+ }
+ }
+ }
+ }
+}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 658f22ab0d..d4153c7cd9 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -250,7 +250,7 @@ public class DefaultMQProducerTest {
@Test
public void testBatchSendMessageAsync()
- throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
+ throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(4);
@@ -504,6 +504,42 @@ public class DefaultMQProducerTest {
assertThat(cc.get()).isEqualTo(1);
}
+ @Test
+ public void testBatchSendMessageAsync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ producer.setAutoBatch(true);
+ producer.send(message, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+
assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ });
+
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ producer.setAutoBatch(false);
+ }
+
+ @Test
+ public void testBatchSendMessageSync_Success() throws RemotingException,
InterruptedException, MQBrokerException, MQClientException {
+ producer.setAutoBatch(true);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(),
anyLong())).thenReturn(createTopicRoute());
+ SendResult sendResult = producer.send(message);
+
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+ producer.setAutoBatch(false);
+ }
+
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
new file mode 100644
index 0000000000..7074fae243
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/ProduceAccumulatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ProduceAccumulatorTest {
+ private boolean compareMessageBatch(MessageBatch a, MessageBatch b) {
+ if (!a.getTopic().equals(b.getTopic())) {
+ return false;
+ }
+ if (!Arrays.equals(a.getBody(), b.getBody())) {
+ return false;
+ }
+ return true;
+ }
+
+ private class MockMQProducer extends DefaultMQProducer {
+ private Message beSendMessage = null;
+ private MessageQueue beSendMessageQueue = null;
+
+ @Override
+ public SendResult sendDirect(Message msg, MessageQueue mq,
+ SendCallback sendCallback) {
+ this.beSendMessage = msg;
+ this.beSendMessageQueue = mq;
+
+ SendResult sendResult = new SendResult();
+ sendResult.setMsgId("123");
+ if (sendCallback != null) {
+ sendCallback.onSuccess(sendResult);
+ }
+ return sendResult;
+ }
+ }
+
+ @Test
+ public void testProduceAccumulator_async() throws MQBrokerException,
RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ ProduceAccumulator produceAccumulator = new ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ for (Message message : messages) {
+ produceAccumulator.send(message, new SendCallback() {
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ finalCountDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ finalCountDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ }
+ assertThat(countDownLatch.await(3000L,
TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessage instanceof
MessageBatch).isTrue();
+
+ MessageBatch messageBatch1 = (MessageBatch)
mockMQProducer.beSendMessage;
+ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+ messageBatch2.setBody(messageBatch2.encode());
+
+ assertThat(compareMessageBatch(messageBatch1, messageBatch2)).isTrue();
+ }
+
+ @Test
+ public void testProduceAccumulator_sync() throws MQBrokerException,
RemotingException, InterruptedException, MQClientException {
+ final MockMQProducer mockMQProducer = new MockMQProducer();
+
+ final ProduceAccumulator produceAccumulator = new
ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ List<Message> messages = new ArrayList<Message>();
+ messages.add(new Message("testTopic", "1".getBytes()));
+ messages.add(new Message("testTopic", "22".getBytes()));
+ messages.add(new Message("testTopic", "333".getBytes()));
+ messages.add(new Message("testTopic", "4444".getBytes()));
+ messages.add(new Message("testTopic", "55555".getBytes()));
+ final CountDownLatch countDownLatch = new
CountDownLatch(messages.size());
+
+ for (final Message message : messages) {
+ new Thread(new Runnable() {
+ final ProduceAccumulator finalProduceAccumulator =
produceAccumulator;
+ final CountDownLatch finalCountDownLatch = countDownLatch;
+ final MockMQProducer finalMockMQProducer = mockMQProducer;
+ final Message finalMessage = message;
+
+ @Override
+ public void run() {
+ try {
+ finalProduceAccumulator.send(finalMessage,
finalMockMQProducer);
+ finalCountDownLatch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+ assertThat(countDownLatch.await(3000L,
TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessage instanceof
MessageBatch).isTrue();
+
+ MessageBatch messageBatch1 = (MessageBatch)
mockMQProducer.beSendMessage;
+ MessageBatch messageBatch2 = MessageBatch.generateFromList(messages);
+ messageBatch2.setBody(messageBatch2.encode());
+
+
assertThat(messageBatch1.getTopic()).isEqualTo(messageBatch2.getTopic());
+ // The execution order is uncertain, just compare the length
+
assertThat(messageBatch1.getBody().length).isEqualTo(messageBatch2.getBody().length);
+ }
+
+ @Test
+ public void testProduceAccumulator_sendWithMessageQueue() throws
MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ MockMQProducer mockMQProducer = new MockMQProducer();
+
+ MessageQueue messageQueue = new MessageQueue("topicTest",
"brokerTest", 0);
+ final ProduceAccumulator produceAccumulator = new
ProduceAccumulator("test");
+ produceAccumulator.start();
+
+ Message message = new Message("testTopic", "1".getBytes());
+ produceAccumulator.send(message, messageQueue, mockMQProducer);
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ produceAccumulator.send(message, messageQueue, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ countDownLatch.countDown();
+ }
+ }, mockMQProducer);
+ assertThat(countDownLatch.await(3000L,
TimeUnit.MILLISECONDS)).isTrue();
+ assertThat(mockMQProducer.beSendMessageQueue).isEqualTo(messageQueue);
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
index a423048c5c..30369b8f37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java
@@ -27,7 +27,7 @@ public class MessageBatch extends Message implements
Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
- private MessageBatch(List<Message> messages) {
+ public MessageBatch(List<Message> messages) {
this.messages = messages;
}