This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 60fe05c672d9e7502cf96e9df46ea3fb4ed97c90
Author: VadimMolodyh <[email protected]>
AuthorDate: Tue Aug 24 14:49:17 2021 +0300

    [pulsar-client-cpp] Expose getLastMessageId in the Reader API (#11723)
    
    ### Motivation
    
    The changes are trivial. getLastMessageIdAsync is already implemented in 
the ConsumerImpl class but it is only used internally for checking if there are 
any available messages in the topic. It is really helpful to have it exposed in 
the Reader API e.g., to be able to read all messages currently available in the 
topic. I.e., to get last message id and then to read all messages till this id. 
(hasMessageAvailable is not helpful because it potentially might always return 
'false' if new me [...]
    
    ### Modifications
    
    Trivial changes of ReaderImpl and Reader classes to expose getLastMessageId.
    
    (cherry picked from commit 640e63b232cf24c186088d2019201836f0c5b5ad)
---
 pulsar-client-cpp/include/pulsar/Reader.h | 12 ++++++++++++
 pulsar-client-cpp/lib/Reader.cc           | 15 +++++++++++++++
 pulsar-client-cpp/lib/ReaderImpl.cc       |  4 ++++
 pulsar-client-cpp/lib/ReaderImpl.h        |  2 ++
 4 files changed, 33 insertions(+)

diff --git a/pulsar-client-cpp/include/pulsar/Reader.h 
b/pulsar-client-cpp/include/pulsar/Reader.h
index 727b012..04d6fb8 100644
--- a/pulsar-client-cpp/include/pulsar/Reader.h
+++ b/pulsar-client-cpp/include/pulsar/Reader.h
@@ -29,6 +29,7 @@ class PulsarFriend;
 class ReaderImpl;
 
 typedef std::function<void(Result result, bool hasMessageAvailable)> 
HasMessageAvailableCallback;
+typedef std::function<void(Result result, MessageId messageId)> 
GetLastMessageIdCallback;
 
 /**
  * A Reader can be used to scan through all the messages currently available 
in a topic.
@@ -137,6 +138,17 @@ class PULSAR_PUBLIC Reader {
      */
     bool isConnected() const;
 
+    /**
+     * Asynchronously get an ID of the last available message or a message ID 
with -1 as an entryId if the
+     * topic is empty.
+     */
+    void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
+    /**
+     * Get an ID of the last available message or a message ID with -1 as an 
entryId if the topic is empty.
+     */
+    Result getLastMessageId(MessageId& messageId);
+
    private:
     typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
     ReaderImplPtr impl_;
diff --git a/pulsar-client-cpp/lib/Reader.cc b/pulsar-client-cpp/lib/Reader.cc
index 3327558..fa48536 100644
--- a/pulsar-client-cpp/lib/Reader.cc
+++ b/pulsar-client-cpp/lib/Reader.cc
@@ -117,4 +117,19 @@ Result Reader::seek(uint64_t timestamp) {
 
 bool Reader::isConnected() const { return impl_ && impl_->isConnected(); }
 
+void Reader::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+    if (!impl_) {
+        callback(ResultConsumerNotInitialized, MessageId());
+        return;
+    }
+    impl_->getLastMessageIdAsync(callback);
+}
+
+Result Reader::getLastMessageId(MessageId& messageId) {
+    Promise<Result, MessageId> promise;
+
+    getLastMessageIdAsync(WaitForCallbackValue<MessageId>(promise));
+    return promise.getFuture().get(messageId);
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index ccde6e7..48f5d58 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -138,6 +138,10 @@ void ReaderImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
     consumer_->seekAsync(timestamp, callback);
 }
 
+void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
+    consumer_->getLastMessageIdAsync(callback);
+}
+
 ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return 
readerImplWeakPtr_; }
 
 bool ReaderImpl::isConnected() const { return consumer_->isConnected(); }
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h 
b/pulsar-client-cpp/lib/ReaderImpl.h
index 7c7a556..a546ae8 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -60,6 +60,8 @@ class PULSAR_PUBLIC ReaderImpl : public 
std::enable_shared_from_this<ReaderImpl>
     void seekAsync(const MessageId& msgId, ResultCallback callback);
     void seekAsync(uint64_t timestamp, ResultCallback callback);
 
+    void getLastMessageIdAsync(GetLastMessageIdCallback callback);
+
     ReaderImplWeakPtr getReaderImplWeakPtr();
 
     bool isConnected() const;

Reply via email to