This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit e3f6267b08d9eb750bfb35bc1a910e25219505a4 Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jul 28 19:41:42 2022 +0800 Allow to specify receiving message await duration --- cpp/include/rocketmq/SimpleConsumer.h | 7 +++++++ cpp/source/client/ReceiveMessageStreamReader.cpp | 7 ++++--- cpp/source/rocketmq/SimpleConsumer.cpp | 1 + cpp/source/rocketmq/SimpleConsumerImpl.cpp | 5 ++--- cpp/source/rocketmq/include/SimpleConsumerImpl.h | 8 ++++++++ 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/cpp/include/rocketmq/SimpleConsumer.h b/cpp/include/rocketmq/SimpleConsumer.h index 0f7030f..0550f73 100644 --- a/cpp/include/rocketmq/SimpleConsumer.h +++ b/cpp/include/rocketmq/SimpleConsumer.h @@ -97,6 +97,11 @@ public: return *this; } + SimpleConsumerBuilder& withAwaitDuration(std::chrono::milliseconds await_duration) { + await_duration_ = await_duration; + return *this; + } + SimpleConsumer build(); private: @@ -106,6 +111,8 @@ private: Configuration configuration_; std::unordered_map<std::string, FilterExpression> subscriptions_; + + std::chrono::milliseconds await_duration_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp b/cpp/source/client/ReceiveMessageStreamReader.cpp index 03204c6..7cdd665 100644 --- a/cpp/source/client/ReceiveMessageStreamReader.cpp +++ b/cpp/source/client/ReceiveMessageStreamReader.cpp @@ -17,11 +17,12 @@ #include "ReceiveMessageStreamReader.h" -#include "apache/rocketmq/v2/definition.pb.h" +#include <chrono> +#include "apache/rocketmq/v2/definition.pb.h" +#include "rocketmq/ErrorCode.h" #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" -#include "rocketmq/ErrorCode.h" ROCKETMQ_NAMESPACE_BEGIN @@ -38,7 +39,7 @@ ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag for (const auto& entry : context_->metadata) { client_context_.AddMetadata(entry.first, entry.second); } - client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout); + client_context_.set_deadline(std::chrono::system_clock::now() + context_->timeout + std::chrono::milliseconds(500)); stub_->async()->ReceiveMessage(&client_context_, &request_, this); result_.source_host = peer_address_; diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp index 0c902d4..d7e94ae 100644 --- a/cpp/source/rocketmq/SimpleConsumer.cpp +++ b/cpp/source/rocketmq/SimpleConsumer.cpp @@ -129,6 +129,7 @@ SimpleConsumer SimpleConsumerBuilder::build() { simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout()); simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints())); simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider()); + simple_consumer.impl_->withReceiveMessageTimeout(await_duration_); for (const auto& entry : subscriptions_) { simple_consumer.impl_->subscribe(entry.first, entry.second); diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index 6a2dc75..1408d06 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -324,9 +324,8 @@ void SimpleConsumerImpl::receive(std::size_t limit, callback(ec, result.messages); }; - auto timeout = absl::ToChronoMilliseconds(config().subscriber.polling_timeout); - SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", timeout.count()); - manager()->receiveMessage(target, metadata, request, timeout, cb); + SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", MixAll::millisecondsOf(long_polling_duration_)); + manager()->receiveMessage(target, metadata, request, long_polling_duration_, cb); } void SimpleConsumerImpl::wrapAckRequest(const Message& message, AckMessageRequest& request) { diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 9fac1bb..7ef3d8e 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -16,6 +16,8 @@ */ #pragma once +#include <chrono> + #include "ClientImpl.h" #include "rocketmq/FilterExpression.h" #include "rocketmq/SimpleConsumer.h" @@ -55,6 +57,10 @@ public: std::chrono::milliseconds duration, ChangeInvisibleDurationCallback callback); + void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) { + long_polling_duration_ = receive_timeout; + } + protected: void topicsOfInterest(std::vector<std::string> topics) override; @@ -72,6 +78,8 @@ private: static thread_local std::size_t assignment_index_; + std::chrono::milliseconds long_polling_duration_{MixAll::DefaultReceiveMessageTimeout}; + void refreshAssignments0() LOCKS_EXCLUDED(topic_assignments_mtx_, subscriptions_mtx_); void refreshAssignments() LOCKS_EXCLUDED(subscriptions_mtx_);
