This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.7
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit fb3cbd4dfab81416ca95f232af7a4f369605a7fd
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jun 11 05:49:18 2025 +0800

    Fix acknowledgeCumulative never returns when accepting an invalid message 
id for a multi-topics consumer (#492)
    
    (cherry picked from commit 9e119cee53199da5694cd59491d097399ba67a1f)
---
 lib/MultiTopicsConsumerImpl.cc   | 28 +++++++++++++++++-----------
 tests/MultiTopicsConsumerTest.cc | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 56 insertions(+), 11 deletions(-)

diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 8a43173..ff793fb 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -650,6 +650,14 @@ void 
MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const
     callback(result, msg);
 }
 
+static void logErrorTopicNameForAcknowledge(const std::string& topic) {
+    if (topic.empty()) {
+        LOG_ERROR("MessageId without a topic name cannot be acknowledged for a 
multi-topics consumer");
+    } else {
+        LOG_ERROR("Message of topic: " << topic << " not in consumers");
+    }
+}
+
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, 
ResultCallback callback) {
     if (state_ != Ready) {
         interceptors_->onAcknowledge(Consumer(shared_from_this()), 
ResultAlreadyClosed, msgId);
@@ -658,19 +666,14 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const 
MessageId& msgId, ResultCal
     }
 
     const std::string& topicPartitionName = msgId.getTopicName();
-    if (topicPartitionName.empty()) {
-        LOG_ERROR("MessageId without a topic name cannot be acknowledged for a 
multi-topics consumer");
-        callback(ResultOperationNotSupported);
-        return;
-    }
     auto optConsumer = consumers_.find(topicPartitionName);
 
     if (optConsumer) {
         unAckedMessageTrackerPtr_->remove(msgId);
         optConsumer.value()->acknowledgeAsync(msgId, callback);
     } else {
-        LOG_ERROR("Message of topic: " << topicPartitionName << " not in 
unAckedMessageTracker");
-        callback(ResultUnknownError);
+        logErrorTopicNameForAcknowledge(topicPartitionName);
+        callback(ResultOperationNotSupported);
     }
 }
 
@@ -684,7 +687,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const 
MessageIdList& messageIdLis
     for (const MessageId& messageId : messageIdList) {
         auto topicName = messageId.getTopicName();
         if (topicName.empty()) {
-            LOG_ERROR("MessageId without a topic name cannot be acknowledged 
for a multi-topics consumer");
+            logErrorTopicNameForAcknowledge(topicName);
             callback(ResultOperationNotSupported);
             return;
         }
@@ -710,18 +713,21 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const 
MessageIdList& messageIdLis
             unAckedMessageTrackerPtr_->remove(kv.second);
             optConsumer.value()->acknowledgeAsync(kv.second, cb);
         } else {
-            LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
-            callback(ResultUnknownError);
+            logErrorTopicNameForAcknowledge(kv.first);
+            callback(ResultOperationNotSupported);
         }
     }
 }
 
 void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& 
msgId, ResultCallback callback) {
-    msgId.getTopicName();
+    const auto& topic = msgId.getTopicName();
     auto optConsumer = consumers_.find(msgId.getTopicName());
     if (optConsumer) {
         unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
         optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
+    } else {
+        logErrorTopicNameForAcknowledge(topic);
+        callback(ResultOperationNotSupported);
     }
 }
 
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index 5aae1eb..6b9f3b3 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -103,3 +103,42 @@ TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
 
     client.close();
 }
+
+TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
+    const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id";
+    Client client{lookupUrl};
+    std::vector<std::string> topics(2);
+    for (size_t i = 0; i < topics.size(); i++) {
+        Producer producer;
+        auto topic = topicPrefix + unique_str();
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + 
std::to_string(i)).build()));
+        topics[i] = std::move(topic);
+    }
+
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
+
+    std::vector<MessageId> msgIds(topics.size());
+    for (size_t i = 0; i < topics.size(); i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        std::string serialized;
+        msg.getMessageId().serialize(serialized);
+        msgIds[i] = MessageId::deserialize(serialized);
+    }
+
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+    ASSERT_EQ(ResultOperationNotSupported, 
consumer.acknowledgeCumulative(msgIds[1]));
+
+    msgIds[0].setTopicName("invalid-topic");
+    msgIds[1].setTopicName("invalid-topic");
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+    ASSERT_EQ(ResultOperationNotSupported, 
consumer.acknowledgeCumulative(msgIds[1]));
+
+    client.close();
+}

Reply via email to