This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new cb82141  Fix ack non-persistent topic will be blocked. (#240)
cb82141 is described below

commit cb821419d1ca8d4bc896946665c0e5643efaf9bf
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 31 11:38:36 2023 +0800

    Fix ack non-persistent topic will be blocked. (#240)
---
 lib/AckGroupingTracker.h | 10 +++++++---
 tests/ConsumerTest.cc    | 23 +++++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index 2b48142..97d0d85 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -71,21 +71,25 @@ class AckGroupingTracker : public 
std::enable_shared_from_this<AckGroupingTracke
      * @param[in] msgId ID of the message to be ACKed.
      * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
-    virtual void addAcknowledge(const MessageId& msgId, ResultCallback 
callback) {}
+    virtual void addAcknowledge(const MessageId& msgId, ResultCallback 
callback) { callback(ResultOk); }
 
     /**
      * Adding message ID list into ACK group for individual ACK.
      * @param[in] msgIds of the message to be ACKed.
      * @param[in] callback the callback that is triggered when the messages 
are acknowledged
      */
-    virtual void addAcknowledgeList(const MessageIdList& msgIds, 
ResultCallback callback) {}
+    virtual void addAcknowledgeList(const MessageIdList& msgIds, 
ResultCallback callback) {
+        callback(ResultOk);
+    }
 
     /**
      * Adding message ID into ACK group for cumulative ACK.
      * @param[in] msgId ID of the message to be ACKed.
      * @param[in] callback the callback that is triggered when the message is 
acknowledged
      */
-    virtual void addAcknowledgeCumulative(const MessageId& msgId, 
ResultCallback callback) {}
+    virtual void addAcknowledgeCumulative(const MessageId& msgId, 
ResultCallback callback) {
+        callback(ResultOk);
+    }
 
     /**
      * Flush all the pending grouped ACKs (as flush() does), and stop period 
ACKs sending.
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b1329e4..0ecf82c 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1118,6 +1118,29 @@ TEST(ConsumerTest, testNegativeAcksTrackerClose) {
     client.close();
 }
 
+TEST(ConsumerTest, testAckNotPersistentTopic) {
+    Client client(lookupUrl);
+    auto topicName = 
"non-persistent://public/default/testAckNotPersistentTopic";
+
+    Consumer consumer;
+    client.subscribe(topicName, "test-sub", consumer);
+
+    Producer producer;
+    client.createProducer(topicName, producer);
+
+    for (int i = 0; i < 10; ++i) {
+        producer.send(MessageBuilder().setContent(std::to_string(i)).build());
+    }
+
+    Message msg;
+    for (int i = 0; i < 10; ++i) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
+    }
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, 
false));
 
 }  // namespace pulsar

Reply via email to