This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 34fcd2a  Treat invalid receipt handle on ack as a success (#47)
34fcd2a is described below

commit 34fcd2aeab286685c1755c659feba3546a286851
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 14 13:58:24 2022 +0800

    Treat invalid receipt handle on ack as a success (#47)
---
 cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp 
b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index bb56e71..722347e 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -22,6 +22,7 @@
 #include "PushConsumerImpl.h"
 #include "Tag.h"
 #include "ThreadPoolImpl.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -85,8 +86,21 @@ void ConsumeMessageServiceImpl::ack(const Message& message, 
std::function<void(c
 
   std::weak_ptr<PushConsumerImpl> client(consumer_);
   const auto& topic = message.topic();
-  auto callback = [cb, client, topic](const std::error_code& ec) {
+
+  const auto& message_id = message.id();
+  const auto& receipt_handle = message.extension().receipt_handle;
+
+  auto callback = [cb, client, topic, message_id, receipt_handle](const 
std::error_code& ec) {
     auto consumer = client.lock();
+
+    // If the receipt_handle was already expired, it is safe to treat it as 
success.
+    if (ec == ErrorCode::InvalidReceiptHandle) {
+      SPDLOG_WARN("Broker complained bad receipt handle on ack 
message[MsgId={}, ReceiptHandle={}]", message_id,
+                  receipt_handle);
+      cb(ErrorCode::Success);
+      return;
+    }
+
     cb(ec);
   };
 

Reply via email to