This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch cpp/visibility-timeout in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 5d111fa4eca0aa87b52762fd8586b883e76c4f15 Author: lizhimins <[email protected]> AuthorDate: Fri Jul 11 10:14:35 2025 +0800 [ISSUE #1035] [C++] Support continuous visibility timeout adjustment for SimpleConsumer --- cpp/examples/ExampleSimpleConsumer.cpp | 34 +++++++++++++++--------- cpp/include/rocketmq/SimpleConsumer.h | 6 ++--- cpp/source/client/ClientManagerImpl.cpp | 7 ++--- cpp/source/client/include/ClientManager.h | 2 +- cpp/source/client/include/ClientManagerImpl.h | 2 +- cpp/source/rocketmq/PushConsumerImpl.cpp | 7 ++++- cpp/source/rocketmq/SimpleConsumer.cpp | 13 +++++---- cpp/source/rocketmq/SimpleConsumerImpl.cpp | 15 ++++++++--- cpp/source/rocketmq/include/SimpleConsumerImpl.h | 4 +-- 9 files changed, 58 insertions(+), 32 deletions(-) diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index 2d93d239..d89d0b13 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -21,6 +21,7 @@ #include "rocketmq/ErrorCode.h" #include "rocketmq/Logger.h" #include "rocketmq/SimpleConsumer.h" +#include "spdlog/spdlog.h" using namespace ROCKETMQ_NAMESPACE; @@ -58,34 +59,43 @@ int main(int argc, char* argv[]) { .subscribe(FLAGS_topic, tag) .withAwaitDuration(std::chrono::seconds(10)) .build(); + std::size_t total = 0; // Should use while (true) instead for (int j = 0; j < 30; j++) { std::vector<MessageConstSharedPtr> messages; std::error_code ec; simple_consumer.receive(4, std::chrono::seconds(15), ec, messages); + if (ec) { std::cerr << "Failed to receive messages. Cause: " << ec.message() << std::endl; + } else { + std::cout << "Received " << messages.size() << " messages" << std::endl; } - std::cout << "Received " << messages.size() << " messages" << std::endl; - std::size_t i = 0; - for (const auto& message : messages) { - std::cout << "Received a message[topic=" << message->topic() - << ", message-id=" << message->id() - << ", receipt-handle='" << message->extension().receipt_handle - << "']" << std::endl; + std::string receipt_handle = message->extension().receipt_handle; + SPDLOG_INFO("Receive message, topic={}, message-id={}, receipt-handle={}]", message->topic(), message->id(), receipt_handle); - if (++i % 2 == 0) { + if (total++ % 2 == 0) { + // Consume message successfully then ack it simple_consumer.ack(*message, ec); if (ec) { - std::cerr << "Failed to ack message. Cause: " << ec.message() << std::endl; + SPDLOG_ERROR("Failed to ack message. Cause: {}", ec.message()); + } else { + SPDLOG_INFO("Ack message, topic={}, message-id={}, receipt-handle={}]", message->topic(), message->id(), receipt_handle); } } else { - simple_consumer.changeInvisibleDuration(*message, std::chrono::seconds(3), ec); - if (ec) { - std::cerr << "Failed to change invisible duration of message. Cause: " << ec.message() << std::endl; + // Extend the message consumption time by modifying the invisible duration API + for (int k = 0; k < 3; k++) { + simple_consumer.changeInvisibleDuration( + *message, receipt_handle, std::chrono::seconds(15), ec); + if (ec) { + SPDLOG_WARN("Failed to change invisible duration of message. Cause: ", ec.message()); + } else { + SPDLOG_INFO("Change invisible duration, topic={}, message-id={}, times={}, receipt-handle={}]", + message->topic(), message->id(), k, receipt_handle); + } } } } diff --git a/cpp/include/rocketmq/SimpleConsumer.h b/cpp/include/rocketmq/SimpleConsumer.h index cb489c91..dff1158a 100644 --- a/cpp/include/rocketmq/SimpleConsumer.h +++ b/cpp/include/rocketmq/SimpleConsumer.h @@ -35,7 +35,7 @@ using ReceiveCallback = std::function<void(const std::error_code&, const std::ve using AckCallback = std::function<void(const std::error_code&)>; -using ChangeInvisibleDurationCallback = std::function<void(const std::error_code&)>; +using ChangeInvisibleDurationCallback = std::function<void(const std::error_code&, std::string& receipt_handle)>; class SimpleConsumerImpl; @@ -60,9 +60,9 @@ public: void asyncAck(const Message& message, AckCallback callback); - void changeInvisibleDuration(const Message& message, std::chrono::milliseconds duration, std::error_code& ec); + void changeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, std::error_code& ec); - void asyncChangeInvisibleDuration(const Message& message, + void asyncChangeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, ChangeInvisibleDurationCallback callback); diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 7c3498e9..adcd0708 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -1100,7 +1100,8 @@ void ClientManagerImpl::changeInvisibleDuration( const Metadata& metadata, const ChangeInvisibleDurationRequest& request, std::chrono::milliseconds timeout, - const std::function<void(const std::error_code&)>& completion_callback) { + const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>& completion_callback) { + RpcClientSharedPtr client = getRpcClient(target_host); assert(client); auto invocation_context = new InvocationContext<ChangeInvisibleDurationResponse>(); @@ -1118,7 +1119,7 @@ void ClientManagerImpl::changeInvisibleDuration( SPDLOG_WARN("Failed to write Nack request to wire. gRPC-code: {}, gRPC-message: {}", invocation_context->status.error_code(), invocation_context->status.error_message()); std::error_code ec = ErrorCode::RequestTimeout; - completion_callback(ec); + completion_callback(ec, invocation_context->response); return; } @@ -1185,7 +1186,7 @@ void ClientManagerImpl::changeInvisibleDuration( break; } } - completion_callback(ec); + completion_callback(ec, invocation_context->response); }; invocation_context->callback = callback; client->asyncChangeInvisibleDuration(request, invocation_context); diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h index c67c6ea8..d6e20ace 100644 --- a/cpp/source/client/include/ClientManager.h +++ b/cpp/source/client/include/ClientManager.h @@ -70,7 +70,7 @@ public: virtual void changeInvisibleDuration(const std::string& target_host, const Metadata& metadata, const ChangeInvisibleDurationRequest&, std::chrono::milliseconds timeout, - const std::function<void(const std::error_code&)>&) = 0; + const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>&) = 0; virtual void forwardMessageToDeadLetterQueue( const std::string& target_host, const Metadata& metadata, const ForwardMessageToDeadLetterQueueRequest& request, diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index cd862154..c6e60064 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -148,7 +148,7 @@ public: const Metadata& metadata, const ChangeInvisibleDurationRequest&, std::chrono::milliseconds timeout, - const std::function<void(const std::error_code&)>&) override; + const std::function<void(const std::error_code&, const ChangeInvisibleDurationResponse&)>&) override; void forwardMessageToDeadLetterQueue(const std::string& target_host, const Metadata& metadata, diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index 4ba038b2..45f40541 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -413,8 +413,13 @@ void PushConsumerImpl::nack(const Message& message, const std::function<void(con request.set_message_id(message.id()); request.mutable_invisible_duration()->CopyFrom( google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count())); + + auto cb = + [callback](const std::error_code& ec, const ChangeInvisibleDurationResponse& response) { + callback(ec); + }; client_manager_->changeInvisibleDuration(target_host, metadata, request, - absl::ToChronoMilliseconds(client_config_.request_timeout), callback); + absl::ToChronoMilliseconds(client_config_.request_timeout), cb); } void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message, diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp b/cpp/source/rocketmq/SimpleConsumer.cpp index 8acf16ac..a6a834a6 100644 --- a/cpp/source/rocketmq/SimpleConsumer.cpp +++ b/cpp/source/rocketmq/SimpleConsumer.cpp @@ -89,22 +89,25 @@ void SimpleConsumer::asyncAck(const Message& message, AckCallback callback) { impl_->ackAsync(message, callback); } -void SimpleConsumer::changeInvisibleDuration(const Message& message, +void SimpleConsumer::changeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, std::error_code& ec) { auto mtx = std::make_shared<absl::Mutex>(); auto cv = std::make_shared<absl::CondVar>(); bool completed = false; - auto callback = [&, mtx, cv](const std::error_code& code) { + + auto callback = + [&, mtx, cv](const std::error_code& code, std::string& server_receipt_handle) { { absl::MutexLock lk(mtx.get()); completed = true; + receipt_handle = server_receipt_handle; ec = code; } cv->Signal(); }; - impl_->changeInvisibleDuration(message, duration, callback); + impl_->changeInvisibleDuration(message, receipt_handle, duration, callback); { absl::MutexLock lk(mtx.get()); @@ -114,10 +117,10 @@ void SimpleConsumer::changeInvisibleDuration(const Message& message, } } -void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message, +void SimpleConsumer::asyncChangeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, ChangeInvisibleDurationCallback callback) { - impl_->changeInvisibleDuration(message, duration, callback); + impl_->changeInvisibleDuration(message, receipt_handle, duration, callback); } SimpleConsumer SimpleConsumerBuilder::build() { diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index 2441bb2d..25803429 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -409,9 +409,9 @@ void SimpleConsumerImpl::ackAsync(const Message& message, AckCallback callback) absl::ToChronoMilliseconds(client_config_.request_timeout), callback); } -void SimpleConsumerImpl::changeInvisibleDuration(const Message& message, +void SimpleConsumerImpl::changeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, - ChangeInvisibleDurationCallback callback) { + const ChangeInvisibleDurationCallback callback) { Metadata metadata; Signature::sign(client_config_, metadata); @@ -420,11 +420,18 @@ void SimpleConsumerImpl::changeInvisibleDuration(const Message& message, request.mutable_topic()->set_resource_namespace(resourceNamespace()); request.mutable_topic()->set_name(message.topic()); request.set_message_id(message.id()); - request.set_receipt_handle(message.extension().receipt_handle); + request.set_receipt_handle(receipt_handle); auto d = google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count()); request.mutable_invisible_duration()->CopyFrom(d); - manager()->changeInvisibleDuration(message.extension().target_endpoint, metadata, request, duration, callback); + auto cb = + [callback](const std::error_code& ec, const ChangeInvisibleDurationResponse& response) { + std::string server_receipt_handle = response.receipt_handle(); + callback(ec, server_receipt_handle); + }; + + manager()->changeInvisibleDuration( + message.extension().target_endpoint, metadata, request, duration, cb); } ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index 39e6523e..0bcb7fca 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -53,9 +53,9 @@ public: void ackAsync(const Message& message, AckCallback callback); - void changeInvisibleDuration(const Message& message, + void changeInvisibleDuration(const Message& message, std::string& receipt_handle, std::chrono::milliseconds duration, - ChangeInvisibleDurationCallback callback); + const ChangeInvisibleDurationCallback callback); void withReceiveMessageTimeout(std::chrono::milliseconds receive_timeout) { long_polling_duration_ = receive_timeout;
