This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3e5c9c7 [feat] Reader support readeNextAsync interface. (#176)
3e5c9c7 is described below
commit 3e5c9c71e5fd8662eed4430eaa249a2bf7936c66
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jan 19 14:34:40 2023 +0800
[feat] Reader support readeNextAsync interface. (#176)
---
include/pulsar/Reader.h | 8 +++++++
include/pulsar/ReaderConfiguration.h | 1 +
lib/ConsumerImpl.cc | 2 +-
lib/ConsumerImpl.h | 2 +-
lib/ConsumerImplBase.h | 2 +-
lib/MultiTopicsConsumerImpl.cc | 2 +-
lib/MultiTopicsConsumerImpl.h | 2 +-
lib/Reader.cc | 8 +++++++
lib/ReaderImpl.cc | 8 +++++++
lib/ReaderImpl.h | 1 +
tests/ReaderTest.cc | 45 ++++++++++++++++++++++++++++++++++++
11 files changed, 76 insertions(+), 5 deletions(-)
diff --git a/include/pulsar/Reader.h b/include/pulsar/Reader.h
index 233da4f..4c124c9 100644
--- a/include/pulsar/Reader.h
+++ b/include/pulsar/Reader.h
@@ -29,6 +29,7 @@ class PulsarFriend;
class ReaderImpl;
typedef std::function<void(Result result, bool hasMessageAvailable)>
HasMessageAvailableCallback;
+typedef std::function<void(Result result, const Message& message)>
ReadNextCallback;
/**
* A Reader can be used to scan through all the messages currently available
in a topic.
@@ -68,6 +69,13 @@ class PULSAR_PUBLIC Reader {
*/
Result readNext(Message& msg, int timeoutMs);
+ /**
+ * Read asynchronously the next message in the topic.
+ *
+ * @param callback
+ */
+ void readNextAsync(ReadNextCallback callback);
+
/**
* Close the reader and stop the broker to push more messages
*
diff --git a/include/pulsar/ReaderConfiguration.h
b/include/pulsar/ReaderConfiguration.h
index 9ae8e1c..4f6464f 100644
--- a/include/pulsar/ReaderConfiguration.h
+++ b/include/pulsar/ReaderConfiguration.h
@@ -162,6 +162,7 @@ class PULSAR_PUBLIC ReaderConfiguration {
* Set the internal subscription name.
*
* @param internal subscriptionName
+ * Default value is reader-{random string}.
*/
void setInternalSubscriptionName(std::string internalSubscriptionName);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index ccd082b..e7f6cf4 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -841,7 +841,7 @@ Result ConsumerImpl::receive(Message& msg) {
return res;
}
-void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+void ConsumerImpl::receiveAsync(ReceiveCallback callback) {
Message msg;
// fail the callback if consumer is closing or closed
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 4832f4e..6e1b8ff 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -94,7 +94,7 @@ class ConsumerImpl : public ConsumerImplBase {
const std::string& getTopic() const override;
Result receive(Message& msg) override;
Result receive(Message& msg, int timeout) override;
- void receiveAsync(ReceiveCallback& callback) override;
+ void receiveAsync(ReceiveCallback callback) override;
void unsubscribeAsync(ResultCallback callback) override;
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback)
override;
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback
callback) override;
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 5bc7e1b..9cf63a3 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -51,7 +51,7 @@ class ConsumerImplBase : public HandlerBase, public
std::enable_shared_from_this
virtual const std::string& getSubscriptionName() const = 0;
virtual Result receive(Message& msg) = 0;
virtual Result receive(Message& msg, int timeout) = 0;
- virtual void receiveAsync(ReceiveCallback& callback) = 0;
+ virtual void receiveAsync(ReceiveCallback callback) = 0;
void batchReceiveAsync(BatchReceiveCallback callback);
virtual void unsubscribeAsync(ResultCallback callback) = 0;
virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback
callback) = 0;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index a013566..c7a656c 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -583,7 +583,7 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int
timeout) {
}
}
-void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
+void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback callback) {
Message msg;
// fail the callback if consumer is closing or closed
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index da42b74..50cdecf 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -65,7 +65,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const std::string& getTopic() const override;
Result receive(Message& msg) override;
Result receive(Message& msg, int timeout) override;
- void receiveAsync(ReceiveCallback& callback) override;
+ void receiveAsync(ReceiveCallback callback) override;
void unsubscribeAsync(ResultCallback callback) override;
void acknowledgeAsync(const MessageId& msgId, ResultCallback callback)
override;
void acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback
callback) override;
diff --git a/lib/Reader.cc b/lib/Reader.cc
index 261c0fa..c02fb2e 100644
--- a/lib/Reader.cc
+++ b/lib/Reader.cc
@@ -49,6 +49,14 @@ Result Reader::readNext(Message& msg, int timeoutMs) {
return impl_->readNext(msg, timeoutMs);
}
+void Reader::readNextAsync(ReadNextCallback callback) {
+ if (!impl_) {
+ return callback(ResultConsumerNotInitialized, {});
+ }
+
+ impl_->readNextAsync(callback);
+}
+
Result Reader::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index a80e2e5..da1d95e 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -111,6 +111,14 @@ Result ReaderImpl::readNext(Message& msg, int timeoutMs) {
return res;
}
+void ReaderImpl::readNextAsync(ReceiveCallback callback) {
+ auto self = shared_from_this();
+ consumer_->receiveAsync([self, callback](Result result, const Message&
message) {
+ self->acknowledgeIfNecessary(result, message);
+ callback(result, message);
+ });
+}
+
void ReaderImpl::messageListener(Consumer consumer, const Message& msg) {
readerListener_(Reader(shared_from_this()), msg);
acknowledgeIfNecessary(ResultOk, msg);
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index ed16c7d..e216241 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -67,6 +67,7 @@ class PULSAR_PUBLIC ReaderImpl : public
std::enable_shared_from_this<ReaderImpl>
Result readNext(Message& msg);
Result readNext(Message& msg, int timeoutMs);
+ void readNextAsync(ReceiveCallback callback);
void closeAsync(ResultCallback callback);
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index b88da99..eefe1bc 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -25,6 +25,7 @@
#include "HttpHelper.h"
#include "PulsarFriend.h"
+#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
@@ -68,6 +69,50 @@ TEST(ReaderTest, testSimpleReader) {
client.close();
}
+TEST(ReaderTest, testAsyncRead) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/test-simple-reader" +
std::to_string(time(nullptr));
+
+ ReaderConfiguration readerConf;
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(),
readerConf, reader));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ for (int i = 0; i < 10; i++) {
+ std::string content = "my-message-" + std::to_string(i);
+ Message msg = MessageBuilder().setContent(content).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ reader.readNextAsync([i](Result result, const Message& msg) {
+ ASSERT_EQ(ResultOk, result);
+ std::string content = msg.getDataAsString();
+ std::string expected = "my-message-" + std::to_string(i);
+ ASSERT_EQ(expected, content);
+ });
+ }
+
+ waitUntil(
+ std::chrono::seconds(5),
+ [&]() {
+ bool hasMsg;
+ reader.hasMessageAvailable(hasMsg);
+ return !hasMsg;
+ },
+ 1000);
+ bool hasMsg;
+ reader.hasMessageAvailable(hasMsg);
+ ASSERT_FALSE(hasMsg);
+
+ producer.close();
+ reader.close();
+ client.close();
+}
+
TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
Client client(serviceUrl);