This is an automated email from the ASF dual-hosted git repository. ifplusor pushed a commit to branch re_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit cebb202022a280c7da890f4a15f09559db058b55 Author: James Yin <[email protected]> AuthorDate: Thu Mar 11 18:29:26 2021 +0800 fix: leak of AsyncPullCallback --- src/consumer/DefaultMQPushConsumerImpl.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 79bcdf5..11a12a1 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -478,7 +478,9 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) { false); // class filter try { - auto* callback = new AsyncPullCallback(shared_from_this(), pull_request, subscription_data); + std::unique_ptr<AsyncPullCallback> callback( + new AsyncPullCallback(shared_from_this(), pull_request, subscription_data)); + pull_api_wrapper_->pullKernelImpl(message_queue, // mq subExpression, // subExpression subscription_data->expression_type(), // expressionType @@ -490,7 +492,9 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) { BROKER_SUSPEND_MAX_TIME_MILLIS, // brokerSuspendMaxTimeMillis CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // timeoutMillis CommunicationMode::ASYNC, // communicationMode - callback); // pullCallback + callback.get()); // pullCallback + + (void)callback.release(); } catch (MQException& e) { LOG_ERROR_NEW("pullKernelImpl exception: {}", e.what()); executePullRequestLater(pull_request, getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());
