This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e5c9c7  [feat] Reader support readeNextAsync interface. (#176)
3e5c9c7 is described below

commit 3e5c9c71e5fd8662eed4430eaa249a2bf7936c66
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jan 19 14:34:40 2023 +0800

    [feat] Reader support readeNextAsync interface. (#176)
---
 include/pulsar/Reader.h              |  8 +++++++
 include/pulsar/ReaderConfiguration.h |  1 +
 lib/ConsumerImpl.cc                  |  2 +-
 lib/ConsumerImpl.h                   |  2 +-
 lib/ConsumerImplBase.h               |  2 +-
 lib/MultiTopicsConsumerImpl.cc       |  2 +-
 lib/MultiTopicsConsumerImpl.h        |  2 +-
 lib/Reader.cc                        |  8 +++++++
 lib/ReaderImpl.cc                    |  8 +++++++
 lib/ReaderImpl.h                     |  1 +
 tests/ReaderTest.cc                  | 45 ++++++++++++++++++++++++++++++++++++
 11 files changed, 76 insertions(+), 5 deletions(-)

diff --git a/include/pulsar/Reader.h b/include/pulsar/Reader.h
index 233da4f..4c124c9 100644
--- a/include/pulsar/Reader.h
+++ b/include/pulsar/Reader.h
@@ -29,6 +29,7 @@ class PulsarFriend;
 class ReaderImpl;
 
 typedef std::function<void(Result result, bool hasMessageAvailable)> 
HasMessageAvailableCallback;
+typedef std::function<void(Result result, const Message& message)> 
ReadNextCallback;
 
 /**
  * A Reader can be used to scan through all the messages currently available 
in a topic.
@@ -68,6 +69,13 @@ class PULSAR_PUBLIC Reader {
      */
     Result readNext(Message& msg, int timeoutMs);
 
+    /**
+     * Read asynchronously the next message in the topic.
+     *
+     * @param callback
+     */
+    void readNextAsync(ReadNextCallback callback);
+
     /**
      * Close the reader and stop the broker to push more messages
      *
diff --git a/include/pulsar/ReaderConfiguration.h 
b/include/pulsar/ReaderConfiguration.h
index 9ae8e1c..4f6464f 100644
--- a/include/pulsar/ReaderConfiguration.h
+++ b/include/pulsar/ReaderConfiguration.h
@@ -162,6 +162,7 @@ class PULSAR_PUBLIC ReaderConfiguration {
      * Set the internal subscription name.
      *
      * @param internal subscriptionName
+     * Default value is reader-{random string}.
      */
     void setInternalSubscriptionName(std::string internalSubscriptionName);
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index ccd082b..e7f6cf4 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -841,7 +841,7 @@ Result ConsumerImpl::receive(Message& msg) {
     return res;
 }
 
-void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+void ConsumerImpl::receiveAsync(ReceiveCallback callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 4832f4e..6e1b8ff 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -94,7 +94,7 @@ class ConsumerImpl : public ConsumerImplBase {
     const std::string& getTopic() const override;
     Result receive(Message& msg) override;
     Result receive(Message& msg, int timeout) override;
-    void receiveAsync(ReceiveCallback& callback) override;
+    void receiveAsync(ReceiveCallback callback) override;
     void unsubscribeAsync(ResultCallback callback) override;
     void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) 
override;
     void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback 
callback) override;
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 5bc7e1b..9cf63a3 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -51,7 +51,7 @@ class ConsumerImplBase : public HandlerBase, public 
std::enable_shared_from_this
     virtual const std::string& getSubscriptionName() const = 0;
     virtual Result receive(Message& msg) = 0;
     virtual Result receive(Message& msg, int timeout) = 0;
-    virtual void receiveAsync(ReceiveCallback& callback) = 0;
+    virtual void receiveAsync(ReceiveCallback callback) = 0;
     void batchReceiveAsync(BatchReceiveCallback callback);
     virtual void unsubscribeAsync(ResultCallback callback) = 0;
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback 
callback) = 0;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index a013566..c7a656c 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -583,7 +583,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int 
timeout) {
     }
 }
 
-void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index da42b74..50cdecf 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -65,7 +65,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     const std::string& getTopic() const override;
     Result receive(Message& msg) override;
     Result receive(Message& msg, int timeout) override;
-    void receiveAsync(ReceiveCallback& callback) override;
+    void receiveAsync(ReceiveCallback callback) override;
     void unsubscribeAsync(ResultCallback callback) override;
     void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) 
override;
     void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback 
callback) override;
diff --git a/lib/Reader.cc b/lib/Reader.cc
index 261c0fa..c02fb2e 100644
--- a/lib/Reader.cc
+++ b/lib/Reader.cc
@@ -49,6 +49,14 @@ Result Reader::readNext(Message& msg, int timeoutMs) {
     return impl_->readNext(msg, timeoutMs);
 }
 
+void Reader::readNextAsync(ReadNextCallback callback) {
+    if (!impl_) {
+        return callback(ResultConsumerNotInitialized, {});
+    }
+
+    impl_->readNextAsync(callback);
+}
+
 Result Reader::close() {
     Promise<bool, Result> promise;
     closeAsync(WaitForCallback(promise));
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index a80e2e5..da1d95e 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -111,6 +111,14 @@ Result ReaderImpl::readNext(Message& msg, int timeoutMs) {
     return res;
 }
 
+void ReaderImpl::readNextAsync(ReceiveCallback callback) {
+    auto self = shared_from_this();
+    consumer_->receiveAsync([self, callback](Result result, const Message& 
message) {
+        self->acknowledgeIfNecessary(result, message);
+        callback(result, message);
+    });
+}
+
 void ReaderImpl::messageListener(Consumer consumer, const Message& msg) {
     readerListener_(Reader(shared_from_this()), msg);
     acknowledgeIfNecessary(ResultOk, msg);
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index ed16c7d..e216241 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -67,6 +67,7 @@ class PULSAR_PUBLIC ReaderImpl : public 
std::enable_shared_from_this<ReaderImpl>
 
     Result readNext(Message& msg);
     Result readNext(Message& msg, int timeoutMs);
+    void readNextAsync(ReceiveCallback callback);
 
     void closeAsync(ResultCallback callback);
 
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index b88da99..eefe1bc 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -25,6 +25,7 @@
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "WaitUtils.h"
 #include "lib/ClientConnection.h"
 #include "lib/Latch.h"
 #include "lib/LogUtils.h"
@@ -68,6 +69,50 @@ TEST(ReaderTest, testSimpleReader) {
     client.close();
 }
 
+TEST(ReaderTest, testAsyncRead) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/test-simple-reader" + 
std::to_string(time(nullptr));
+
+    ReaderConfiguration readerConf;
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), 
readerConf, reader));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    for (int i = 0; i < 10; i++) {
+        std::string content = "my-message-" + std::to_string(i);
+        Message msg = MessageBuilder().setContent(content).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    for (int i = 0; i < 10; i++) {
+        reader.readNextAsync([i](Result result, const Message& msg) {
+            ASSERT_EQ(ResultOk, result);
+            std::string content = msg.getDataAsString();
+            std::string expected = "my-message-" + std::to_string(i);
+            ASSERT_EQ(expected, content);
+        });
+    }
+
+    waitUntil(
+        std::chrono::seconds(5),
+        [&]() {
+            bool hasMsg;
+            reader.hasMessageAvailable(hasMsg);
+            return !hasMsg;
+        },
+        1000);
+    bool hasMsg;
+    reader.hasMessageAvailable(hasMsg);
+    ASSERT_FALSE(hasMsg);
+
+    producer.close();
+    reader.close();
+    client.close();
+}
+
 TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
     Client client(serviceUrl);
 

Reply via email to