This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new cb82141 Fix ack non-persistent topic will be blocked. (#240)
cb82141 is described below
commit cb821419d1ca8d4bc896946665c0e5643efaf9bf
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 31 11:38:36 2023 +0800
Fix ack non-persistent topic will be blocked. (#240)
---
lib/AckGroupingTracker.h | 10 +++++++---
tests/ConsumerTest.cc | 23 +++++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/lib/AckGroupingTracker.h b/lib/AckGroupingTracker.h
index 2b48142..97d0d85 100644
--- a/lib/AckGroupingTracker.h
+++ b/lib/AckGroupingTracker.h
@@ -71,21 +71,25 @@ class AckGroupingTracker : public
std::enable_shared_from_this<AckGroupingTracke
* @param[in] msgId ID of the message to be ACKed.
* @param[in] callback the callback that is triggered when the message is
acknowledged
*/
- virtual void addAcknowledge(const MessageId& msgId, ResultCallback
callback) {}
+ virtual void addAcknowledge(const MessageId& msgId, ResultCallback
callback) { callback(ResultOk); }
/**
* Adding message ID list into ACK group for individual ACK.
* @param[in] msgIds of the message to be ACKed.
* @param[in] callback the callback that is triggered when the messages
are acknowledged
*/
- virtual void addAcknowledgeList(const MessageIdList& msgIds,
ResultCallback callback) {}
+ virtual void addAcknowledgeList(const MessageIdList& msgIds,
ResultCallback callback) {
+ callback(ResultOk);
+ }
/**
* Adding message ID into ACK group for cumulative ACK.
* @param[in] msgId ID of the message to be ACKed.
* @param[in] callback the callback that is triggered when the message is
acknowledged
*/
- virtual void addAcknowledgeCumulative(const MessageId& msgId,
ResultCallback callback) {}
+ virtual void addAcknowledgeCumulative(const MessageId& msgId,
ResultCallback callback) {
+ callback(ResultOk);
+ }
/**
* Flush all the pending grouped ACKs (as flush() does), and stop period
ACKs sending.
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b1329e4..0ecf82c 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1118,6 +1118,29 @@ TEST(ConsumerTest, testNegativeAcksTrackerClose) {
client.close();
}
+TEST(ConsumerTest, testAckNotPersistentTopic) {
+ Client client(lookupUrl);
+ auto topicName =
"non-persistent://public/default/testAckNotPersistentTopic";
+
+ Consumer consumer;
+ client.subscribe(topicName, "test-sub", consumer);
+
+ Producer producer;
+ client.createProducer(topicName, producer);
+
+ for (int i = 0; i < 10; ++i) {
+ producer.send(MessageBuilder().setContent(std::to_string(i)).build());
+ }
+
+ Message msg;
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
+ }
+
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true,
false));
} // namespace pulsar