This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new 726b7dc refactor: invoke async send in executor
726b7dc is described below
commit 726b7dc424e798d917d4f29b2639242633e3bd56
Author: James Yin <[email protected]>
AuthorDate: Wed Mar 10 15:34:49 2021 +0800
refactor: invoke async send in executor
---
include/DefaultMQProducerConfig.h | 25 +++---
include/DefaultMQProducerConfigProxy.h | 41 +++++----
src/producer/DefaultMQProducerConfigImpl.hpp | 34 +++++---
src/producer/DefaultMQProducerImpl.cpp | 119 ++++++++++++++++-----------
src/producer/DefaultMQProducerImpl.h | 2 +
5 files changed, 136 insertions(+), 85 deletions(-)
diff --git a/include/DefaultMQProducerConfig.h
b/include/DefaultMQProducerConfig.h
index 46f9c2f..67c7231 100644
--- a/include/DefaultMQProducerConfig.h
+++ b/include/DefaultMQProducerConfig.h
@@ -32,36 +32,39 @@ class ROCKETMQCLIENT_API DefaultMQProducerConfig : virtual
public MQClientConfig
public:
virtual ~DefaultMQProducerConfig() = default;
- // if msgbody size larger than maxMsgBodySize, exception will be throwed
+ virtual int async_send_thread_nums() const = 0;
+ virtual void set_async_send_thread_nums(int async_send_thread_nums) = 0;
+
+ // if msgbody size larger than max_message_size, exception will be throwed
virtual int max_message_size() const = 0;
- virtual void set_max_message_size(int maxMessageSize) = 0;
+ virtual void set_max_message_size(int max_message_size) = 0;
/*
- * if msgBody size is large than m_compressMsgBodyOverHowmuch
- * rocketmq cpp will compress msgBody according to compressLevel
+ * if msgBody size is large than compress_msg_body_over_howmuch,
+ * sdk will compress message body according to compress_level
*/
virtual int compress_msg_body_over_howmuch() const = 0;
- virtual void set_compress_msg_body_over_howmuch(int
compressMsgBodyOverHowmuch) = 0;
+ virtual void set_compress_msg_body_over_howmuch(int
compress_msg_body_over_howmuch) = 0;
virtual int compress_level() const = 0;
- virtual void set_compress_level(int compressLevel) = 0;
+ virtual void set_compress_level(int compress_level) = 0;
// set and get timeout of per msg
virtual int send_msg_timeout() const = 0;
- virtual void set_send_msg_timeout(int sendMsgTimeout) = 0;
+ virtual void set_send_msg_timeout(int send_msg_timeout) = 0;
// set msg max retry times, default retry times is 5
virtual int retry_times() const = 0;
- virtual void set_retry_times(int times) = 0;
+ virtual void set_retry_times(int retry_times) = 0;
virtual int retry_times_for_async() const = 0;
- virtual void set_retry_times_for_async(int times) = 0;
+ virtual void set_retry_times_for_async(int retry_times) = 0;
virtual bool retry_another_broker_when_not_store_ok() const = 0;
- virtual void set_retry_another_broker_when_not_store_ok(bool
retryAnotherBrokerWhenNotStoreOK) = 0;
+ virtual void set_retry_another_broker_when_not_store_ok(bool
retry_another_broker_when_not_store_ok) = 0;
virtual bool send_latency_fault_enable() const { return false; };
- virtual void set_send_latency_fault_enable(bool sendLatencyFaultEnable){};
+ virtual void set_send_latency_fault_enable(bool send_latency_fault_enable){};
};
} // namespace rocketmq
diff --git a/include/DefaultMQProducerConfigProxy.h
b/include/DefaultMQProducerConfigProxy.h
index b879b43..8870e98 100644
--- a/include/DefaultMQProducerConfigProxy.h
+++ b/include/DefaultMQProducerConfigProxy.h
@@ -32,70 +32,79 @@ class ROCKETMQCLIENT_API DefaultMQProducerConfigProxy :
public MQClientConfigPro
DefaultMQProducerConfigProxy(DefaultMQProducerConfigPtr producerConfig) :
MQClientConfigProxy(producerConfig) {}
virtual ~DefaultMQProducerConfigProxy() = default;
+ int async_send_thread_nums() const override {
+ return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->async_send_thread_nums();
+ }
+
+ void set_async_send_thread_nums(int async_send_thread_nums) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_async_send_thread_nums(async_send_thread_nums);
+ }
+
int max_message_size() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size();
}
- void set_max_message_size(int maxMessageSize) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_max_message_size(maxMessageSize);
+ void set_max_message_size(int max_message_size) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_max_message_size(max_message_size);
}
int compress_msg_body_over_howmuch() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->compress_msg_body_over_howmuch();
}
- void set_compress_msg_body_over_howmuch(int compressMsgBodyOverHowmuch)
override {
+ void set_compress_msg_body_over_howmuch(int compress_msg_body_over_howmuch)
override {
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
- ->set_compress_msg_body_over_howmuch(compressMsgBodyOverHowmuch);
+ ->set_compress_msg_body_over_howmuch(compress_msg_body_over_howmuch);
}
int compress_level() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->compress_level();
}
- void set_compress_level(int compressLevel) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_compress_level(compressLevel);
+ void set_compress_level(int compress_level) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_compress_level(compress_level);
}
int send_msg_timeout() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_msg_timeout();
}
- void set_send_msg_timeout(int sendMsgTimeout) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_msg_timeout(sendMsgTimeout);
+ void set_send_msg_timeout(int send_msg_timeout) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_msg_timeout(send_msg_timeout);
}
int retry_times() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_times();
}
- void set_retry_times(int times) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times(times);
+ void set_retry_times(int retry_times) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times(retry_times);
}
int retry_times_for_async() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_times_for_async();
}
- void set_retry_times_for_async(int times) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times_for_async(times);
+ void set_retry_times_for_async(int retry_times) override {
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_retry_times_for_async(retry_times);
}
bool retry_another_broker_when_not_store_ok() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->retry_another_broker_when_not_store_ok();
}
- void set_retry_another_broker_when_not_store_ok(bool
retryAnotherBrokerWhenNotStoreOK) override {
+ void set_retry_another_broker_when_not_store_ok(bool
retry_another_broker_when_not_store_ok) override {
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
-
->set_retry_another_broker_when_not_store_ok(retryAnotherBrokerWhenNotStoreOK);
+
->set_retry_another_broker_when_not_store_ok(retry_another_broker_when_not_store_ok);
}
bool send_latency_fault_enable() const override {
return
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->send_latency_fault_enable();
}
- void set_send_latency_fault_enable(bool sendLatencyFaultEnable) override {
-
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->set_send_latency_fault_enable(sendLatencyFaultEnable);
+ void set_send_latency_fault_enable(bool send_latency_fault_enable) override {
+ dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())
+ ->set_send_latency_fault_enable(send_latency_fault_enable);
}
inline DefaultMQProducerConfigPtr real_config() const {
diff --git a/src/producer/DefaultMQProducerConfigImpl.hpp
b/src/producer/DefaultMQProducerConfigImpl.hpp
index 5b1f78d..00f3eff 100644
--- a/src/producer/DefaultMQProducerConfigImpl.hpp
+++ b/src/producer/DefaultMQProducerConfigImpl.hpp
@@ -18,6 +18,7 @@
#define ROCKETMQ_PRODUCER_DEFAULTMQPRODUCERCONFIGIMPL_HPP_
#include <algorithm> // std::min, std::max
+#include <thread>
#include "DefaultMQProducerConfig.h"
#include "MQClientConfigImpl.hpp"
@@ -30,7 +31,8 @@ namespace rocketmq {
class DefaultMQProducerConfigImpl : virtual public DefaultMQProducerConfig,
public MQClientConfigImpl {
public:
DefaultMQProducerConfigImpl()
- : max_message_size_(1024 * 1024 * 4), // 4MB
+ : async_send_thread_nums_(std::min(4,
(int)std::thread::hardware_concurrency())),
+ max_message_size_(1024 * 1024 * 4), // 4MB
compress_msg_body_over_howmuch_(1024 * 4), // 4KB
compress_level_(5),
send_msg_timeout_(3000),
@@ -40,36 +42,44 @@ class DefaultMQProducerConfigImpl : virtual public
DefaultMQProducerConfig, publ
virtual ~DefaultMQProducerConfigImpl() = default;
+ int async_send_thread_nums() const override { return
async_send_thread_nums_; }
+ void set_async_send_thread_nums(int async_send_thread_nums) override {
+ async_send_thread_nums_ = async_send_thread_nums;
+ }
+
int max_message_size() const override { return max_message_size_; }
- void set_max_message_size(int maxMessageSize) override { max_message_size_ =
maxMessageSize; }
+ void set_max_message_size(int max_message_size) override { max_message_size_
= max_message_size; }
int compress_msg_body_over_howmuch() const override { return
compress_msg_body_over_howmuch_; }
- void set_compress_msg_body_over_howmuch(int compressMsgBodyOverHowmuch)
override {
- compress_msg_body_over_howmuch_ = compressMsgBodyOverHowmuch;
+ void set_compress_msg_body_over_howmuch(int compress_msg_body_over_howmuch)
override {
+ compress_msg_body_over_howmuch_ = compress_msg_body_over_howmuch;
}
int compress_level() const override { return compress_level_; }
- void set_compress_level(int compressLevel) override {
- if ((compressLevel >= 0 && compressLevel <= 9) || compressLevel == -1) {
- compress_level_ = compressLevel;
+ void set_compress_level(int compress_level) override {
+ if ((compress_level >= 0 && compress_level <= 9) || compress_level == -1) {
+ compress_level_ = compress_level;
}
}
int send_msg_timeout() const override { return send_msg_timeout_; }
- void set_send_msg_timeout(int sendMsgTimeout) override { send_msg_timeout_ =
sendMsgTimeout; }
+ void set_send_msg_timeout(int send_msg_timeout) override { send_msg_timeout_
= send_msg_timeout; }
int retry_times() const override { return retry_times_; }
- void set_retry_times(int times) override { retry_times_ =
std::min(std::max(0, times), 15); }
+ void set_retry_times(int retry_times) override { retry_times_ =
std::min(std::max(0, retry_times), 15); }
int retry_times_for_async() const override { return retry_times_for_async_; }
- void set_retry_times_for_async(int times) override { retry_times_for_async_
= std::min(std::max(0, times), 15); }
+ void set_retry_times_for_async(int retry_times) override {
+ retry_times_for_async_ = std::min(std::max(0, retry_times), 15);
+ }
bool retry_another_broker_when_not_store_ok() const override { return
retry_another_broker_when_not_store_ok_; }
- void set_retry_another_broker_when_not_store_ok(bool
retryAnotherBrokerWhenNotStoreOK) override {
- retry_another_broker_when_not_store_ok_ = retryAnotherBrokerWhenNotStoreOK;
+ void set_retry_another_broker_when_not_store_ok(bool
retry_another_broker_when_not_store_ok) override {
+ retry_another_broker_when_not_store_ok_ =
retry_another_broker_when_not_store_ok;
}
protected:
+ int async_send_thread_nums_;
int max_message_size_; // default: 4 MB
int compress_msg_body_over_howmuch_; // default: 4 KB
int compress_level_;
diff --git a/src/producer/DefaultMQProducerImpl.cpp
b/src/producer/DefaultMQProducerImpl.cpp
index 20704f8..ade43d1 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -29,14 +29,14 @@
#include "CorrelationIdUtil.hpp"
#include "Logging.h"
#include "MQClientAPIImpl.h"
-#include "MQException.h"
#include "MQClientInstance.h"
#include "MQClientManager.h"
-#include "MessageDecoder.h"
+#include "MQException.h"
#include "MQFaultStrategy.h"
#include "MQProtos.h"
#include "MessageBatch.h"
#include "MessageClientIDSetter.h"
+#include "MessageDecoder.h"
#include "MessageSysFlag.h"
#include "RequestFutureTable.h"
#include "TopicPublishInfo.hpp"
@@ -92,7 +92,10 @@
DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducerConfigPtr config)
: DefaultMQProducerImpl(config, nullptr) {}
DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducerConfigPtr
config, RPCHookPtr rpcHook)
- : MQClientImpl(config, rpcHook), mq_fault_strategy_(new
MQFaultStrategy()), check_transaction_executor_(nullptr) {}
+ : MQClientImpl(config, rpcHook),
+ mq_fault_strategy_(new MQFaultStrategy()),
+ async_send_executor_(nullptr),
+ check_transaction_executor_(nullptr) {}
DefaultMQProducerImpl::~DefaultMQProducerImpl() = default;
@@ -120,11 +123,19 @@ void DefaultMQProducerImpl::start() {
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->group_name(),
this);
if (!registerOK) {
service_state_ = CREATE_JUST;
- THROW_MQEXCEPTION(MQClientException, "The producer group[" +
client_config_->group_name() +
- "] has been created before,
specify another name please.",
+ THROW_MQEXCEPTION(MQClientException,
+ "The producer group[" + client_config_->group_name()
+
+ "] has been created before, specify another name
please.",
-1);
}
+ if (nullptr == async_send_executor_) {
+ async_send_executor_.reset(new thread_pool_executor(
+ "AsyncSendThread",
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->async_send_thread_nums(),
+ false));
+ }
+ async_send_executor_->startup();
+
client_instance_->start();
LOG_INFO_NEW("the producer [{}] start OK.",
client_config_->group_name());
@@ -147,6 +158,9 @@ void DefaultMQProducerImpl::shutdown() {
switch (service_state_) {
case RUNNING: {
LOG_INFO("DefaultMQProducerImpl shutdown");
+
+ async_send_executor_->shutdown();
+
client_instance_->unregisterProducer(client_config_->group_name());
client_instance_->shutdown();
@@ -209,15 +223,18 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
SendCallback* sendCallback) noe
}
void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback,
long timeout) noexcept {
- try {
- (void)sendDefaultImpl(msg.getMessageImpl(), ASYNC, sendCallback, timeout);
- } catch (MQException& e) {
- LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->invokeOnException(e);
- } catch (std::exception& e) {
- LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
- exit(-1);
- }
+ auto msg_impl = msg.getMessageImpl();
+ async_send_executor_->submit([this, msg_impl, sendCallback, timeout] {
+ try {
+ (void)sendDefaultImpl(msg_impl, ASYNC, sendCallback, timeout);
+ } catch (MQException& e) {
+ LOG_ERROR_NEW("send failed, exception:{}", e.what());
+ sendCallback->invokeOnException(e);
+ } catch (std::exception& e) {
+ LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+ exit(-1);
+ }
+ });
}
void DefaultMQProducerImpl::send(MQMessage& msg, const MQMessageQueue& mq,
SendCallback* sendCallback) noexcept {
@@ -228,26 +245,30 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
const MQMessageQueue& mq,
SendCallback* sendCallback,
long timeout) noexcept {
- try {
- Validators::checkMessage(msg,
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
+ auto msg_impl = msg.getMessageImpl();
+ async_send_executor_->submit([this, msg_impl, mq, sendCallback, timeout] {
+ try {
+ Validators::checkMessage(*msg_impl,
+
dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
- if (msg.topic() != mq.topic()) {
- THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's
topic", -1);
- }
+ if (msg_impl->topic() != mq.topic()) {
+ THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's
topic", -1);
+ }
- try {
- sendKernelImpl(msg.getMessageImpl(), mq, ASYNC, sendCallback, nullptr,
timeout);
- } catch (MQBrokerException& e) {
- std::string info = std::string("unknown exception, ") + e.what();
- THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+ try {
+ sendKernelImpl(msg_impl, mq, ASYNC, sendCallback, nullptr, timeout);
+ } catch (MQBrokerException& e) {
+ std::string info = std::string("unknown exception, ") + e.what();
+ THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+ }
+ } catch (MQException& e) {
+ LOG_ERROR_NEW("send failed, exception:{}", e.what());
+ sendCallback->invokeOnException(e);
+ } catch (std::exception& e) {
+ LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+ exit(-1);
}
- } catch (MQException& e) {
- LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->invokeOnException(e);
- } catch (std::exception& e) {
- LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
- exit(-1);
- }
+ });
}
void DefaultMQProducerImpl::sendOneway(MQMessage& msg) {
@@ -303,20 +324,23 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
void* arg,
SendCallback* sendCallback,
long timeout) noexcept {
- try {
+ auto msg_impl = msg.getMessageImpl();
+ async_send_executor_->submit([this, msg_impl, selector, arg, sendCallback,
timeout] {
try {
- sendSelectImpl(msg.getMessageImpl(), selector, arg, ASYNC, sendCallback,
timeout);
- } catch (MQBrokerException& e) {
- std::string info = std::string("unknown exception, ") + e.what();
- THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+ try {
+ sendSelectImpl(msg_impl, selector, arg, ASYNC, sendCallback, timeout);
+ } catch (MQBrokerException& e) {
+ std::string info = std::string("unknown exception, ") + e.what();
+ THROW_MQEXCEPTION(MQClientException, info, e.GetError());
+ }
+ } catch (MQException& e) {
+ LOG_ERROR_NEW("send failed, exception:{}", e.what());
+ sendCallback->invokeOnException(e);
+ } catch (std::exception& e) {
+ LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
+ exit(-1);
}
- } catch (MQException& e) {
- LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->invokeOnException(e);
- } catch (std::exception& e) {
- LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
- exit(-1);
- }
+ });
}
void DefaultMQProducerImpl::sendOneway(MQMessage& msg, MessageQueueSelector*
selector, void* arg) {
@@ -375,7 +399,10 @@ void DefaultMQProducerImpl::send(std::vector<MQMessage>&
msgs, const MQMessageQu
send(batchMessage, mq, sendCallback);
}
-void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs, const
MQMessageQueue& mq, SendCallback* sendCallback, long timeout) {
+void DefaultMQProducerImpl::send(std::vector<MQMessage>& msgs,
+ const MQMessageQueue& mq,
+ SendCallback* sendCallback,
+ long timeout) {
MQMessage batchMessage(batch(msgs));
send(batchMessage, mq, sendCallback, timeout);
}
@@ -655,8 +682,8 @@ SendResult*
DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
}
std::string info = "Send [" + UtilAll::to_string(times) + "] times, still
failed, cost [" +
- UtilAll::to_string(UtilAll::currentTimeMillis() -
beginTimestampFirst) + "]ms, Topic: " +
- msg->topic();
+ UtilAll::to_string(UtilAll::currentTimeMillis() -
beginTimestampFirst) +
+ "]ms, Topic: " + msg->topic();
THROW_MQEXCEPTION(MQClientException, info, -1);
}
diff --git a/src/producer/DefaultMQProducerImpl.h
b/src/producer/DefaultMQProducerImpl.h
index da96df5..57eef51 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -22,6 +22,7 @@
#include "MQClientImpl.h"
#include "MQProducerInner.h"
#include "MessageBatch.h"
+#include "concurrent/executor.hpp"
namespace rocketmq {
@@ -178,6 +179,7 @@ class DefaultMQProducerImpl : public
std::enable_shared_from_this<DefaultMQProdu
private:
std::unique_ptr<MQFaultStrategy> mq_fault_strategy_;
+ std::unique_ptr<thread_pool_executor> async_send_executor_;
std::unique_ptr<thread_pool_executor> check_transaction_executor_;
};