This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/cpp_dev by this push:
new ca9e15c Treat invalid receipt handle on ack as a success
ca9e15c is described below
commit ca9e15c2d3c0d362b1f8e2cc483dd08b49b2ac46
Author: Li Zhanhui <[email protected]>
AuthorDate: Thu Jul 14 12:56:30 2022 +0800
Treat invalid receipt handle on ack as a success
---
cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index bb56e71..722347e 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -22,6 +22,7 @@
#include "PushConsumerImpl.h"
#include "Tag.h"
#include "ThreadPoolImpl.h"
+#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -85,8 +86,21 @@ void ConsumeMessageServiceImpl::ack(const Message& message,
std::function<void(c
std::weak_ptr<PushConsumerImpl> client(consumer_);
const auto& topic = message.topic();
- auto callback = [cb, client, topic](const std::error_code& ec) {
+
+ const auto& message_id = message.id();
+ const auto& receipt_handle = message.extension().receipt_handle;
+
+ auto callback = [cb, client, topic, message_id, receipt_handle](const
std::error_code& ec) {
auto consumer = client.lock();
+
+ // If the receipt_handle was already expired, it is safe to treat it as
success.
+ if (ec == ErrorCode::InvalidReceiptHandle) {
+ SPDLOG_WARN("Broker complained bad receipt handle on ack
message[MsgId={}, ReceiptHandle={}]", message_id,
+ receipt_handle);
+ cb(ErrorCode::Success);
+ return;
+ }
+
cb(ec);
};