This is an automated email from the ASF dual-hosted git repository. ifplusor pushed a commit to branch re_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 06470c70b529a8f893311e72edb0d945b88147d8 Author: James Yin <[email protected]> AuthorDate: Fri Mar 26 18:29:30 2021 +0800 fix: auto delete callback --- include/PullCallback.h | 23 +++++------------------ include/RequestCallback.h | 23 +++++------------------ include/SendCallback.h | 23 +++++------------------ src/common/PullCallbackWrap.cpp | 24 ++++++++++++++++++++++++ src/common/SendCallbackWrap.cpp | 24 ++++++++++++++++++++++++ src/producer/RequestResponseFuture.cpp | 24 ++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 54 deletions(-) diff --git a/include/PullCallback.h b/include/PullCallback.h index 08deaca..841b598 100755 --- a/include/PullCallback.h +++ b/include/PullCallback.h @@ -22,7 +22,7 @@ namespace rocketmq { -enum PullCallbackType { PULL_CALLBACK_TYPE_SIMPLE = 0, PULL_CALLBACK_TYPE_AUTO_DELETE = 1 }; +enum class PullCallbackType { kSimple, kAutoDelete }; /** * PullCallback - callback interface for async pull @@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API PullCallback { virtual void onSuccess(std::unique_ptr<PullResult> pull_result) = 0; virtual void onException(MQException& e) noexcept = 0; - virtual PullCallbackType getPullCallbackType() const { return PULL_CALLBACK_TYPE_SIMPLE; } + virtual PullCallbackType getPullCallbackType() const { return PullCallbackType::kSimple; } public: - inline void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) { - auto type = getPullCallbackType(); - onSuccess(std::move(pull_result)); - if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } - - inline void invokeOnException(MQException& exception) noexcept { - auto type = getPullCallbackType(); - onException(exception); - if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } + void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) noexcept; + void invokeOnException(MQException& exception) noexcept; }; /** @@ -61,7 +48,7 @@ class ROCKETMQCLIENT_API PullCallback { */ class ROCKETMQCLIENT_API AutoDeletePullCallback : public PullCallback { public: - PullCallbackType getPullCallbackType() const override final { return PULL_CALLBACK_TYPE_AUTO_DELETE; } + PullCallbackType getPullCallbackType() const override final { return PullCallbackType::kAutoDelete; } }; } // namespace rocketmq diff --git a/include/RequestCallback.h b/include/RequestCallback.h index c666ccc..493e681 100644 --- a/include/RequestCallback.h +++ b/include/RequestCallback.h @@ -22,7 +22,7 @@ namespace rocketmq { -enum RequestCallbackType { REQUEST_CALLBACK_TYPE_SIMPLE = 0, REQUEST_CALLBACK_TYPE_AUTO_DELETE = 1 }; +enum class RequestCallbackType { kSimple, kAutoDelete }; /** * RequestCallback - callback interface for async request @@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API RequestCallback { virtual void onSuccess(MQMessage message) = 0; virtual void onException(MQException& e) noexcept = 0; - virtual RequestCallbackType getRequestCallbackType() const { return REQUEST_CALLBACK_TYPE_SIMPLE; } + virtual RequestCallbackType getRequestCallbackType() const { return RequestCallbackType::kSimple; } public: - inline void invokeOnSuccess(MQMessage message) { - auto type = getRequestCallbackType(); - onSuccess(std::move(message)); - if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } - - inline void invokeOnException(MQException& exception) noexcept { - auto type = getRequestCallbackType(); - onException(exception); - if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } + void invokeOnSuccess(MQMessage message) noexcept; + void invokeOnException(MQException& exception) noexcept; }; /** @@ -61,7 +48,7 @@ class ROCKETMQCLIENT_API RequestCallback { */ class ROCKETMQCLIENT_API AutoDeleteRequestCallback : public RequestCallback { public: - RequestCallbackType getRequestCallbackType() const override final { return REQUEST_CALLBACK_TYPE_AUTO_DELETE; } + RequestCallbackType getRequestCallbackType() const override final { return RequestCallbackType::kAutoDelete; } }; } // namespace rocketmq diff --git a/include/SendCallback.h b/include/SendCallback.h index 9191c25..a301d94 100755 --- a/include/SendCallback.h +++ b/include/SendCallback.h @@ -22,7 +22,7 @@ namespace rocketmq { -enum SendCallbackType { SEND_CALLBACK_TYPE_SIMPLE = 0, SEND_CALLBACK_TYPE_AUTO_DELETE = 1 }; +enum class SendCallbackType { kSimple, kAutoDelete }; /** * SendCallback - callback interface for async send @@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API SendCallback { virtual void onSuccess(SendResult& sendResult) = 0; virtual void onException(MQException& e) noexcept = 0; - virtual SendCallbackType getSendCallbackType() const { return SEND_CALLBACK_TYPE_SIMPLE; } + virtual SendCallbackType getSendCallbackType() const { return SendCallbackType::kSimple; } public: - inline void invokeOnSuccess(SendResult& send_result) { - auto type = getSendCallbackType(); - onSuccess(send_result); - if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } - - inline void invokeOnException(MQException& exception) noexcept { - auto type = getSendCallbackType(); - onException(exception); - if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) { - delete this; - } - } + void invokeOnSuccess(SendResult& send_result) noexcept; + void invokeOnException(MQException& exception) noexcept; }; /** @@ -62,7 +49,7 @@ class ROCKETMQCLIENT_API SendCallback { class ROCKETMQCLIENT_API AutoDeleteSendCallback : public SendCallback // base interface { public: - SendCallbackType getSendCallbackType() const override final { return SEND_CALLBACK_TYPE_AUTO_DELETE; } + SendCallbackType getSendCallbackType() const override final { return SendCallbackType::kAutoDelete; } }; } // namespace rocketmq diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp index bc915a4..9128351 100644 --- a/src/common/PullCallbackWrap.cpp +++ b/src/common/PullCallbackWrap.cpp @@ -18,6 +18,30 @@ namespace rocketmq { +void PullCallback::invokeOnSuccess(std::unique_ptr<PullResult> pull_result) noexcept { + auto type = getPullCallbackType(); + try { + onSuccess(std::move(pull_result)); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke PullCallback::onSuccess(), {}", e.what()); + } + if (type == PullCallbackType::kAutoDelete) { + delete this; + } +} + +void PullCallback::invokeOnException(MQException& exception) noexcept { + auto type = getPullCallbackType(); + try { + onException(exception); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke PullCallback::onException(), {}", e.what()); + } + if (type == PullCallbackType::kAutoDelete) { + delete this; + } +} + PullCallbackWrap::PullCallbackWrap(PullCallback* pullCallback, MQClientAPIImpl* pClientAPI) : pull_callback_(pullCallback), client_api_impl_(pClientAPI) {} diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp index 562b30f..eab2fb5 100644 --- a/src/common/SendCallbackWrap.cpp +++ b/src/common/SendCallbackWrap.cpp @@ -32,6 +32,30 @@ namespace rocketmq { +void SendCallback::invokeOnSuccess(SendResult& send_result) noexcept { + auto type = getSendCallbackType(); + try { + onSuccess(send_result); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke SendCallback::onSuccess(), {}", e.what()); + } + if (type == SendCallbackType::kAutoDelete) { + delete this; + } +} + +void SendCallback::invokeOnException(MQException& exception) noexcept { + auto type = getSendCallbackType(); + try { + onException(exception); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke SendCallback::onException(), {}", e.what()); + } + if (type == SendCallbackType::kAutoDelete) { + delete this; + } +} + SendCallbackWrap::SendCallbackWrap(const std::string& addr, const std::string& brokerName, const MessagePtr msg, diff --git a/src/producer/RequestResponseFuture.cpp b/src/producer/RequestResponseFuture.cpp index 615d27b..bb65adc 100644 --- a/src/producer/RequestResponseFuture.cpp +++ b/src/producer/RequestResponseFuture.cpp @@ -21,6 +21,30 @@ namespace rocketmq { +void RequestCallback::invokeOnSuccess(MQMessage message) noexcept { + auto type = getRequestCallbackType(); + try { + onSuccess(std::move(message)); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke RequestCallback::onSuccess(), {}", e.what()); + } + if (type == RequestCallbackType::kAutoDelete) { + delete this; + } +} + +void RequestCallback::invokeOnException(MQException& exception) noexcept { + auto type = getRequestCallbackType(); + try { + onException(exception); + } catch (const std::exception& e) { + LOG_WARN_NEW("encounter exception when invoke RequestCallback::onException(), {}", e.what()); + } + if (type == RequestCallbackType::kAutoDelete) { + delete this; + } +} + RequestResponseFuture::RequestResponseFuture(const std::string& correlationId, long timeoutMillis, RequestCallback* requestCallback)
