This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c2b33fb Allow to specify receiving message await duration (#84)
c2b33fb is described below
commit c2b33fb99f4523a4ef3d29e708b332a6e6e5d8a4
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 28 19:47:21 2022 +0800
Allow to specify receiving message await duration (#84)
---
cpp/include/rocketmq/SimpleConsumer.h | 7 +++++++
cpp/source/client/ReceiveMessageStreamReader.cpp | 7 ++++---
cpp/source/rocketmq/SimpleConsumer.cpp | 1 +
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 5 ++---
cpp/source/rocketmq/include/SimpleConsumerImpl.h | 8 ++++++++
5 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/cpp/include/rocketmq/SimpleConsumer.h
b/cpp/include/rocketmq/SimpleConsumer.h
index 0f7030f..0550f73 100644
--- a/cpp/include/rocketmq/SimpleConsumer.h
+++ b/cpp/include/rocketmq/SimpleConsumer.h
@@ -97,6 +97,11 @@ public:
return *this;
}
+ SimpleConsumerBuilder& withAwaitDuration(std::chrono::milliseconds
await_duration) {
+ await_duration_ = await_duration;
+ return *this;
+ }
+
SimpleConsumer build();
private:
@@ -106,6 +111,8 @@ private:
Configuration configuration_;
std::unordered_map<std::string, FilterExpression> subscriptions_;
+
+ std::chrono::milliseconds await_duration_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp
b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 03204c6..7cdd665 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -17,11 +17,12 @@
#include "ReceiveMessageStreamReader.h"
-#include "apache/rocketmq/v2/definition.pb.h"
+#include <chrono>
+#include "apache/rocketmq/v2/definition.pb.h"
+#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
-#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -38,7 +39,7 @@
ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag
for (const auto& entry : context_->metadata) {
client_context_.AddMetadata(entry.first, entry.second);
}
- client_context_.set_deadline(std::chrono::system_clock::now() +
context_->timeout);
+ client_context_.set_deadline(std::chrono::system_clock::now() +
context_->timeout + std::chrono::milliseconds(500));
stub_->async()->ReceiveMessage(&client_context_, &request_, this);
result_.source_host = peer_address_;
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp
b/cpp/source/rocketmq/SimpleConsumer.cpp
index 0c902d4..d7e94ae 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -129,6 +129,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout());
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
+ simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
for (const auto& entry : subscriptions_) {
simple_consumer.impl_->subscribe(entry.first, entry.second);
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index 6a2dc75..1408d06 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -324,9 +324,8 @@ void SimpleConsumerImpl::receive(std::size_t limit,
callback(ec, result.messages);
};
- auto timeout =
absl::ToChronoMilliseconds(config().subscriber.polling_timeout);
- SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms", timeout.count());
- manager()->receiveMessage(target, metadata, request, timeout, cb);
+ SPDLOG_DEBUG("ReceiveMessage.polling_timeout: {}ms",
MixAll::millisecondsOf(long_polling_duration_));
+ manager()->receiveMessage(target, metadata, request, long_polling_duration_,
cb);
}
void SimpleConsumerImpl::wrapAckRequest(const Message& message,
AckMessageRequest& request) {
diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
index 9fac1bb..7ef3d8e 100644
--- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h
@@ -16,6 +16,8 @@
*/
#pragma once
+#include <chrono>
+
#include "ClientImpl.h"
#include "rocketmq/FilterExpression.h"
#include "rocketmq/SimpleConsumer.h"
@@ -55,6 +57,10 @@ public:
std::chrono::milliseconds duration,
ChangeInvisibleDurationCallback callback);
+ void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) {
+ long_polling_duration_ = receive_timeout;
+ }
+
protected:
void topicsOfInterest(std::vector<std::string> topics) override;
@@ -72,6 +78,8 @@ private:
static thread_local std::size_t assignment_index_;
+ std::chrono::milliseconds
long_polling_duration_{MixAll::DefaultReceiveMessageTimeout};
+
void refreshAssignments0() LOCKS_EXCLUDED(topic_assignments_mtx_,
subscriptions_mtx_);
void refreshAssignments() LOCKS_EXCLUDED(subscriptions_mtx_);