This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 34fcd2a Treat invalid receipt handle on ack as a success (#47)
34fcd2a is described below
commit 34fcd2aeab286685c1755c659feba3546a286851
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 14 13:58:24 2022 +0800
Treat invalid receipt handle on ack as a success (#47)
---
cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index bb56e71..722347e 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -22,6 +22,7 @@
#include "PushConsumerImpl.h"
#include "Tag.h"
#include "ThreadPoolImpl.h"
+#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -85,8 +86,21 @@ void ConsumeMessageServiceImpl::ack(const Message& message,
std::function<void(c
std::weak_ptr<PushConsumerImpl> client(consumer_);
const auto& topic = message.topic();
- auto callback = [cb, client, topic](const std::error_code& ec) {
+
+ const auto& message_id = message.id();
+ const auto& receipt_handle = message.extension().receipt_handle;
+
+ auto callback = [cb, client, topic, message_id, receipt_handle](const
std::error_code& ec) {
auto consumer = client.lock();
+
+ // If the receipt_handle was already expired, it is safe to treat it as
success.
+ if (ec == ErrorCode::InvalidReceiptHandle) {
+ SPDLOG_WARN("Broker complained bad receipt handle on ack
message[MsgId={}, ReceiptHandle={}]", message_id,
+ receipt_handle);
+ cb(ErrorCode::Success);
+ return;
+ }
+
cb(ec);
};