This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c13d1c7c171 [client][c++] add getLastMessageIdAsync in Consumer
(#16182)
c13d1c7c171 is described below
commit c13d1c7c17166750461913dc1395c53a90a84bc5
Author: komalatammal <[email protected]>
AuthorDate: Thu Jun 23 09:58:22 2022 -0400
[client][c++] add getLastMessageIdAsync in Consumer (#16182)
### Motivation
Add getLastMessageId method to C++ Consumer.cc to address the missing part
in this PR https://github.com/apache/pulsar/pull/15993
### Modifications
Expose methods to get last message Id in consumer, the C++ client's
Consumer class
---
pulsar-client-cpp/include/pulsar/Consumer.h | 11 +++++++++++
.../include/pulsar/ConsumerConfiguration.h | 1 +
pulsar-client-cpp/include/pulsar/Reader.h | 1 -
pulsar-client-cpp/include/pulsar/ReaderConfiguration.h | 1 +
pulsar-client-cpp/lib/Consumer.cc | 17 +++++++++++++++++
5 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h
b/pulsar-client-cpp/include/pulsar/Consumer.h
index e82d2c07fbc..f1e180aea2d 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -390,6 +390,17 @@ class PULSAR_PUBLIC Consumer {
*/
bool isConnected() const;
+ /**
+ * Asynchronously get an ID of the last available message or a message ID
with -1 as an entryId if the
+ * topic is empty.
+ */
+ void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
+ /**
+ * Get an ID of the last available message or a message ID with -1 as an
entryId if the topic is empty.
+ */
+ Result getLastMessageId(MessageId& messageId);
+
private:
ConsumerImplBasePtr impl_;
explicit Consumer(ConsumerImplBasePtr);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 0898b95736a..b326ca8fb31 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -40,6 +40,7 @@ class PulsarWrapper;
/// Callback definition for non-data operation
typedef std::function<void(Result result)> ResultCallback;
typedef std::function<void(Result, const Message& msg)> ReceiveCallback;
+typedef std::function<void(Result result, MessageId messageId)>
GetLastMessageIdCallback;
/// Callback definition for MessageListener
typedef std::function<void(Consumer consumer, const Message& msg)>
MessageListener;
diff --git a/pulsar-client-cpp/include/pulsar/Reader.h
b/pulsar-client-cpp/include/pulsar/Reader.h
index 04d6fb86c78..554788e8cd6 100644
--- a/pulsar-client-cpp/include/pulsar/Reader.h
+++ b/pulsar-client-cpp/include/pulsar/Reader.h
@@ -29,7 +29,6 @@ class PulsarFriend;
class ReaderImpl;
typedef std::function<void(Result result, bool hasMessageAvailable)>
HasMessageAvailableCallback;
-typedef std::function<void(Result result, MessageId messageId)>
GetLastMessageIdCallback;
/**
* A Reader can be used to scan through all the messages currently available
in a topic.
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 3d0af205f98..5b88553534a 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -35,6 +35,7 @@ class PulsarWrapper;
/// Callback definition for non-data operation
typedef std::function<void(Result result)> ResultCallback;
+typedef std::function<void(Result result, MessageId messageId)>
GetLastMessageIdCallback;
/// Callback definition for MessageListener
typedef std::function<void(Reader reader, const Message& msg)> ReaderListener;
diff --git a/pulsar-client-cpp/lib/Consumer.cc
b/pulsar-client-cpp/lib/Consumer.cc
index 3dcd3b54bcc..5d163629128 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -250,4 +250,21 @@ Result Consumer::seek(uint64_t timestamp) {
bool Consumer::isConnected() const { return impl_ && impl_->isConnected(); }
+void Consumer::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+ if (!impl_) {
+ callback(ResultConsumerNotInitialized, MessageId());
+ return;
+ }
+ getLastMessageIdAsync([callback](Result result, const
GetLastMessageIdResponse& response) {
+ callback(result, response.getLastMessageId());
+ });
+}
+
+Result Consumer::getLastMessageId(MessageId& messageId) {
+ Promise<Result, MessageId> promise;
+
+ getLastMessageIdAsync(WaitForCallbackValue<MessageId>(promise));
+ return promise.getFuture().get(messageId);
+}
+
} // namespace pulsar