This is an automated email from the ASF dual-hosted git repository. ifplusor pushed a commit to branch re_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 3bd226ee201a8e924e88420ba35de911f530ac60 Author: James Yin <[email protected]> AuthorDate: Thu Mar 11 01:32:53 2021 +0800 fix: leak of InvokeCallback --- src/MQClientAPIImpl.cpp | 32 ++++++++++---------------------- src/MQClientAPIImpl.h | 8 ++++---- src/common/SendCallbackWrap.cpp | 4 +--- src/transport/ResponseFuture.cpp | 11 +++++------ src/transport/ResponseFuture.h | 11 ++++++++--- src/transport/TcpRemotingClient.cpp | 36 +++++++++++++++--------------------- src/transport/TcpRemotingClient.h | 6 +++--- 7 files changed, 46 insertions(+), 62 deletions(-) diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp index 91c6dc1..e5372d0 100644 --- a/src/MQClientAPIImpl.cpp +++ b/src/MQClientAPIImpl.cpp @@ -159,21 +159,16 @@ void MQClientAPIImpl::sendMessageAsync(const std::string& addr, int64_t timeoutMillis, int retryTimesWhenSendFailed, DefaultMQProducerImplPtr producer) { - // delete in future - auto* cbw = new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request), sendCallback, - topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer); - - try { - sendMessageAsyncImpl(cbw, timeoutMillis); - } catch (RemotingException& e) { - deleteAndZero(cbw); - throw; - } + std::unique_ptr<InvokeCallback> cbw( + new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request), sendCallback, + topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer)); + sendMessageAsyncImpl(cbw, timeoutMillis); } -void MQClientAPIImpl::sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis) { - const auto& addr = cbw->getAddr(); - auto& request = cbw->getRemotingCommand(); +void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis) { + auto* scbw = static_cast<SendCallbackWrap*>(cbw.get()); + const auto& addr = scbw->getAddr(); + auto& request = scbw->getRemotingCommand(); remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis); } @@ -261,15 +256,8 @@ void MQClientAPIImpl::pullMessageAsync(const std::string& addr, RemotingCommand& request, int timeoutMillis, PullCallback* pullCallback) { - // delete in future - auto* cbw = new PullCallbackWrap(pullCallback, this); - - try { - remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis); - } catch (RemotingException& e) { - deleteAndZero(cbw); - throw; - } + std::unique_ptr<InvokeCallback> cbw(new PullCallbackWrap(pullCallback, this)); + remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis); } PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis) { diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h index 143a6a5..535f33d 100644 --- a/src/MQClientAPIImpl.h +++ b/src/MQClientAPIImpl.h @@ -20,8 +20,8 @@ #include "CommunicationMode.h" #include "DefaultMQProducerImpl.h" #include "KVTable.h" -#include "MQException.h" #include "MQClientInstance.h" +#include "MQException.h" #include "MQMessageExt.h" #include "PullCallback.h" #include "SendCallback.h" @@ -29,8 +29,8 @@ #include "TopicConfig.h" #include "TopicList.h" #include "TopicPublishInfo.hpp" -#include "protocol/body/TopicRouteData.hpp" #include "protocol/body/LockBatchRequestBody.hpp" +#include "protocol/body/TopicRouteData.hpp" #include "protocol/body/UnlockBatchRequestBody.hpp" #include "protocol/header/CommandHeader.h" #include "protocol/heartbeat/HeartbeatData.hpp" @@ -40,7 +40,7 @@ namespace rocketmq { class TcpRemotingClient; class ClientRemotingProcessor; class RPCHook; -class SendCallbackWrap; +class InvokeCallback; /** * wrap all RPC API @@ -182,7 +182,7 @@ class MQClientAPIImpl { int retryTimesWhenSendFailed, DefaultMQProducerImplPtr producer); - void sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis); + void sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis); PullResult* pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis); diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp index 92fd6e1..562b30f 100644 --- a/src/common/SendCallbackWrap.cpp +++ b/src/common/SendCallbackWrap.cpp @@ -156,9 +156,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture, // resend addr_ = std::move(addr); broker_name_ = std::move(retryBrokerName); - instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(this, timeoutMillis); - - responseFuture->releaseInvokeCallback(); // for avoid delete this SendCallbackWrap + instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(responseFuture->invoke_callback(), timeoutMillis); return; } catch (MQException& e1) { producer->updateFaultItem(broker_name_, 3000, true); diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp index 9384135..a2887be 100755 --- a/src/transport/ResponseFuture.cpp +++ b/src/transport/ResponseFuture.cpp @@ -20,11 +20,14 @@ namespace rocketmq { -ResponseFuture::ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback* invokeCallback) +ResponseFuture::ResponseFuture(int requestCode, + int opaque, + int64_t timeoutMillis, + std::unique_ptr<InvokeCallback> invokeCallback) : request_code_(requestCode), opaque_(opaque), timeout_millis_(timeoutMillis), - invoke_callback_(invokeCallback), + invoke_callback_(std::move(invokeCallback)), begin_timestamp_(UtilAll::currentTimeMillis()), send_request_ok_(false), response_command_(nullptr), @@ -47,10 +50,6 @@ bool ResponseFuture::hasInvokeCallback() { return invoke_callback_ != nullptr; } -InvokeCallback* ResponseFuture::releaseInvokeCallback() { - return invoke_callback_.release(); -} - void ResponseFuture::executeInvokeCallback() noexcept { if (invoke_callback_ != nullptr) { invoke_callback_->operationComplete(this); diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h index 56ffc67..f950743 100755 --- a/src/transport/ResponseFuture.h +++ b/src/transport/ResponseFuture.h @@ -17,9 +17,10 @@ #ifndef ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_ #define ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_ -#include "concurrent/latch.hpp" +#include <memory> #include "InvokeCallback.h" #include "RemotingCommand.h" +#include "concurrent/latch.hpp" namespace rocketmq { @@ -28,13 +29,15 @@ typedef std::shared_ptr<ResponseFuture> ResponseFuturePtr; class ResponseFuture { public: - ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback* invokeCallback = nullptr); + ResponseFuture(int requestCode, + int opaque, + int64_t timeoutMillis, + std::unique_ptr<InvokeCallback> invokeCallback = nullptr); virtual ~ResponseFuture(); void releaseThreadCondition(); bool hasInvokeCallback(); - InvokeCallback* releaseInvokeCallback(); void executeInvokeCallback() noexcept; @@ -59,6 +62,8 @@ class ResponseFuture { inline bool send_request_ok() const { return send_request_ok_; } inline void set_send_request_ok(bool sendRequestOK = true) { send_request_ok_ = sendRequestOK; }; + inline std::unique_ptr<InvokeCallback>& invoke_callback() { return invoke_callback_; } + private: int request_code_; int opaque_; diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp index c6c222d..655a811 100644 --- a/src/transport/TcpRemotingClient.cpp +++ b/src/transport/TcpRemotingClient.cpp @@ -279,7 +279,7 @@ std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSyncImpl(TcpTransportP void TcpRemotingClient::invokeAsync(const std::string& addr, RemotingCommand& request, - InvokeCallback* invokeCallback, + std::unique_ptr<InvokeCallback>& invokeCallback, int64_t timeoutMillis) { auto beginStartTime = UtilAll::currentTimeMillis(); auto channel = GetTransport(addr); @@ -304,33 +304,27 @@ void TcpRemotingClient::invokeAsync(const std::string& addr, void TcpRemotingClient::invokeAsyncImpl(TcpTransportPtr channel, RemotingCommand& request, int64_t timeoutMillis, - InvokeCallback* invokeCallback) { + std::unique_ptr<InvokeCallback>& invokeCallback) { int code = request.code(); int opaque = request.opaque(); // delete in callback - auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis, invokeCallback); + auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis, std::move(invokeCallback)); putResponseFuture(channel, opaque, responseFuture); - try { - if (SendCommand(channel, request)) { - responseFuture->set_send_request_ok(true); - } else { - // requestFail - responseFuture = popResponseFuture(channel, opaque); - if (responseFuture != nullptr) { - responseFuture->set_send_request_ok(false); - if (responseFuture->hasInvokeCallback()) { - handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture)); - } + if (SendCommand(channel, request)) { + responseFuture->set_send_request_ok(true); + } else { + // request fail + responseFuture = popResponseFuture(channel, opaque); + if (responseFuture != nullptr) { + responseFuture->set_send_request_ok(false); + if (responseFuture->hasInvokeCallback()) { + handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture)); } - - LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort()); } - } catch (const std::exception& e) { - LOG_WARN_NEW("send a request command to channel <{}> Exception.\n{}", channel->getPeerAddrAndPort(), e.what()); - THROW_MQEXCEPTION(RemotingSendRequestException, "send request to <" + channel->getPeerAddrAndPort() + "> failed", - -1); + + LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort()); } } @@ -553,7 +547,7 @@ bool TcpRemotingClient::CloseNameServerTransport(TcpTransportPtr channel) { return removeItemFromTable; } -bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) { +bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept { auto package = msg.encode(); return channel->sendMessage(package->array(), package->size()); } diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h index 3ffbb66..7d86060 100755 --- a/src/transport/TcpRemotingClient.h +++ b/src/transport/TcpRemotingClient.h @@ -52,7 +52,7 @@ class TcpRemotingClient { void invokeAsync(const std::string& addr, RemotingCommand& request, - InvokeCallback* invokeCallback, + std::unique_ptr<InvokeCallback>& invokeCallback, int64_t timeoutMillis); void invokeOneway(const std::string& addr, RemotingCommand& request); @@ -62,7 +62,7 @@ class TcpRemotingClient { std::vector<std::string> getNameServerAddressList() const { return namesrv_addr_list_; } private: - static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg); + static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept; void channelClosed(TcpTransportPtr channel); @@ -88,7 +88,7 @@ class TcpRemotingClient { void invokeAsyncImpl(TcpTransportPtr channel, RemotingCommand& request, int64_t timeoutMillis, - InvokeCallback* invokeCallback); + std::unique_ptr<InvokeCallback>& invokeCallback); void invokeOnewayImpl(TcpTransportPtr channel, RemotingCommand& request); // rpc hook
