This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new afd4f56d [ISSUE #928] Fix C++ push consumer handle error code and
change demo log level (#932)
afd4f56d is described below
commit afd4f56d0d310852fbea836f6c1e8b5eec7bae2f
Author: lizhimins <[email protected]>
AuthorDate: Thu Jan 23 11:41:11 2025 +0800
[ISSUE #928] Fix C++ push consumer handle error code and change demo log
level (#932)
---
cpp/examples/ExampleFifoProducer.cpp | 6 +++---
cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 5 +++++
cpp/source/rocketmq/SimpleConsumerImpl.cpp | 4 ++--
3 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/cpp/examples/ExampleFifoProducer.cpp
b/cpp/examples/ExampleFifoProducer.cpp
index 1e7829d4..1876ebb1 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -105,8 +105,8 @@ int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
auto& logger = getLogger();
- logger.setConsoleLevel(Level::Debug);
- logger.setLevel(Level::Debug);
+ logger.setConsoleLevel(Level::Info);
+ logger.setLevel(Level::Info);
logger.init();
// Access Key/Secret pair may be acquired from management console
@@ -172,7 +172,7 @@ int main(int argc, char* argv[]) {
semaphore->acquire();
producer.send(std::move(message), callback);
- std::cout << "Cached No." << i << " message" << std::endl;
+ // std::cout << "Cached No." << i << " message" << std::endl;
}
} catch (...) {
std::cerr << "Ah...No!!!" << std::endl;
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1c96b095..f68c9f88 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -51,6 +51,11 @@ void AsyncReceiveMessageCallback::onCompletion(const
std::error_code& ec, const
return;
}
+ if (ec == ErrorCode::NoContent) {
+ checkThrottleThenReceive();
+ return;
+ }
+
if (ec) {
SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1
second.", process_queue->simpleName(), ec.message());
receiveMessageLater(std::chrono::seconds (1));
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index df060793..e0a78eeb 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -95,11 +95,11 @@ void SimpleConsumerImpl::start() {
}
};
- // refer java sdk: set refresh interval to 30 seconds
+ // refer java sdk: set refresh interval to 5 seconds
// org.apache.rocketmq.client.java.impl.ClientImpl#startUp
refresh_assignment_task_ = manager()->getScheduler()->schedule(
refresh_assignment_task, "RefreshAssignmentTask",
- std::chrono::minutes(5), std::chrono::seconds(5));
+ std::chrono::seconds(5), std::chrono::seconds(5));
client_manager_->addClientObserver(shared_from_this());
}