This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e50493e  [C++] Fix hasMessageAvailable returns wrong value for last 
message (#13883)
e50493e is described below

commit e50493ea17dd5f2f9d4527d74cc4f40e12439df2
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 24 12:06:00 2022 +0800

    [C++] Fix hasMessageAvailable returns wrong value for last message (#13883)
    
    ### Motivation
    
    In C++ client, there is a corner case that when a reader's start message ID 
is the last message of a topic, `hasMessageAvailable` returns true. However, it 
should return false because the start message ID is exclusive and in this case 
`readNext` would never return a message unless new messages arrived.
    
    ### Modifications
    
    The current C++ implementation of `hasMessageAvailable` is from long days 
ago and has many problems. So this PR migrates the Java implementation of 
`hasMessageAvailable` to C++ client.
    
    Since after the modifications we need to access `startMessageId` in 
`hasMessageAvailable`, which is called in a different thread from 
`connectionOpened` that might modify `startMessageId`. We use a common mutex 
`mutexForMessageIds` to protect the access to `lastDequedMessageId_` and 
`lastMessageIdInBroker_`.
    
    To fix the original tests when `startMessageId` is latest, this PR adds a 
`GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  
`GetLastMessageIdResponse` contains the `consumer_mark_delete_position` 
introduced from https://github.com/apache/pulsar/pull/9652 to compare with 
`last_message_id` when `startMessageId` is latest.
    
    ### Verifying this change
    
    This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and 
`MessageIdTest# testCompareLedgerAndEntryId`.
---
 pulsar-client-cpp/lib/ClientConnection.cc          |  23 +++--
 pulsar-client-cpp/lib/ClientConnection.h           |   5 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 112 +++++++++++++--------
 pulsar-client-cpp/lib/ConsumerImpl.h               |  23 ++---
 pulsar-client-cpp/lib/GetLastMessageIdResponse.h   |  56 +++++++++++
 .../MessageIdTest.cc => lib/MessageIdUtil.h}       |  33 +++---
 pulsar-client-cpp/lib/ReaderImpl.cc                |   4 +-
 pulsar-client-cpp/tests/MessageIdTest.cc           |  22 ++++
 pulsar-client-cpp/tests/ReaderTest.cc              |  45 +++++++++
 9 files changed, 239 insertions(+), 84 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 79bc1d7..d246bf8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -37,6 +37,7 @@
 #include "ProducerImpl.h"
 #include "ConsumerImpl.h"
 #include "checksum/ChecksumProvider.h"
+#include "MessageIdUtil.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -1072,7 +1073,7 @@ void ClientConnection::handleIncomingCommand() {
                         PendingGetLastMessageIdRequestsMap::iterator it =
                             
pendingGetLastMessageIdRequests_.find(error.request_id());
                         if (it != pendingGetLastMessageIdRequests_.end()) {
-                            Promise<Result, MessageId> getLastMessageIdPromise 
= it->second;
+                            auto getLastMessageIdPromise = it->second;
                             pendingGetLastMessageIdRequests_.erase(it);
                             lock.unlock();
 
@@ -1191,15 +1192,18 @@ void ClientConnection::handleIncomingCommand() {
                         
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
 
                     if (it != pendingGetLastMessageIdRequests_.end()) {
-                        Promise<Result, MessageId> getLastMessageIdPromise = 
it->second;
+                        auto getLastMessageIdPromise = it->second;
                         pendingGetLastMessageIdRequests_.erase(it);
                         lock.unlock();
 
-                        MessageIdData messageIdData = 
getLastMessageIdResponse.last_message_id();
-                        MessageId messageId = 
MessageId(messageIdData.partition(), messageIdData.ledgerid(),
-                                                        
messageIdData.entryid(), messageIdData.batch_index());
-
-                        getLastMessageIdPromise.setValue(messageId);
+                        if 
(getLastMessageIdResponse.has_consumer_mark_delete_position()) {
+                            getLastMessageIdPromise.setValue(
+                                
{toMessageId(getLastMessageIdResponse.last_message_id()),
+                                 
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
+                        } else {
+                            getLastMessageIdPromise.setValue(
+                                
{toMessageId(getLastMessageIdResponse.last_message_id())});
+                        }
                     } else {
                         lock.unlock();
                         LOG_WARN(
@@ -1610,9 +1614,10 @@ Commands::ChecksumType 
ClientConnection::getChecksumType() const {
     return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : 
Commands::None;
 }
 
-Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t 
consumerId, uint64_t requestId) {
+Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(uint64_t consumerId,
+                                                                               
uint64_t requestId) {
     Lock lock(mutex_);
-    Promise<Result, MessageId> promise;
+    Promise<Result, GetLastMessageIdResponse> promise;
     if (isClosed()) {
         lock.unlock();
         LOG_ERROR(cnxString_ << " Client is not connected to the broker");
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index 48e6d57..7ca5f37 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -46,6 +46,7 @@
 #include <set>
 #include <lib/BrokerConsumerStatsImpl.h>
 #include "lib/PeriodicTask.h"
+#include "lib/GetLastMessageIdResponse.h"
 
 using namespace pulsar;
 
@@ -156,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t 
consumerId, uint64_t requestId);
 
-    Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, 
uint64_t requestId);
+    Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t 
consumerId, uint64_t requestId);
 
     Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName, uint64_t requestId);
 
@@ -306,7 +307,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> 
PendingConsumerStatsMap;
     PendingConsumerStatsMap pendingConsumerStatsMap_;
 
-    typedef std::map<long, Promise<Result, MessageId>> 
PendingGetLastMessageIdRequestsMap;
+    typedef std::map<long, Promise<Result, GetLastMessageIdResponse>> 
PendingGetLastMessageIdRequestsMap;
     PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
 
     typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 4d4a135..fa817a0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -25,6 +25,7 @@
 #include "pulsar/Result.h"
 #include "pulsar/MessageId.h"
 #include "Utils.h"
+#include "MessageIdUtil.h"
 #include "AckGroupingTracker.h"
 #include "AckGroupingTrackerEnabled.h"
 #include "AckGroupingTrackerDisabled.h"
@@ -51,7 +52,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       hasParent_(hasParent),
       consumerTopicType_(consumerTopicType),
       subscriptionMode_(subscriptionMode),
-      startMessageId_(startMessageId),
       // This is the initial capacity of the queue
       incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
       availablePermits_(0),
@@ -63,7 +63,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       negativeAcksTracker_(client, *this, conf),
       ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
       readCompacted_(conf.isReadCompacted()),
-      lastMessageInBroker_(Optional<MessageId>::of(MessageId())),
+      startMessageId_(startMessageId),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoOldestChunkedMessageOnQueueFull())
 {
     std::stringstream consumerStrStream;
@@ -161,8 +161,9 @@ void ConsumerImpl::start() {
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lock(mutex_);
-    if (state_ == Closed) {
-        lock.unlock();
+    const auto state = state_;
+    lock.unlock();
+    if (state == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already 
closed");
         return;
     }
@@ -171,23 +172,24 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     // sending the subscribe request.
     cnx->registerConsumer(consumerId_, shared_from_this());
 
+    Lock lockForMessageId(mutexForMessageId_);
     Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
-    unAckedMessageTrackerPtr_->clear();
-    batchAcknowledgementTracker_.clear();
-
     if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
         // Update startMessageId so that we can discard messages after delivery
         // restarts
         startMessageId_ = firstMessageInQueue;
     }
+    const auto startMessageId = startMessageId_;
+    lockForMessageId.unlock();
 
-    lock.unlock();
+    unAckedMessageTrackerPtr_->clear();
+    batchAcknowledgementTracker_.clear();
 
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
         topic_, subscription_, consumerId_, requestId, getSubType(), 
consumerName_, subscriptionMode_,
-        startMessageId_, readCompacted_, config_.getProperties(), 
config_.getSchema(), getInitialPosition(),
+        startMessageId, readCompacted_, config_.getProperties(), 
config_.getSchema(), getInitialPosition(),
         config_.isReplicateSubscriptionStateEnabled(), 
config_.getKeySharedPolicy(),
         config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
@@ -538,6 +540,9 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     batchAcknowledgementTracker_.receivedMessage(batchedMessage);
     LOG_DEBUG("Received Batch messages of size - " << batchSize
                                                    << " -- msgId: " << 
batchedMessage.getMessageId());
+    Lock lock(mutexForMessageId_);
+    const auto startMessageId = startMessageId_;
+    lock.unlock();
 
     int skippedMessages = 0;
 
@@ -547,14 +552,14 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         msg.impl_->setRedeliveryCount(redeliveryCount);
         msg.impl_->setTopicName(batchedMessage.getTopicName());
 
-        if (startMessageId_.is_present()) {
+        if (startMessageId.is_present()) {
             const MessageId& msgId = msg.getMessageId();
 
             // If we are receiving a batch message, we need to discard 
messages that were prior
             // to the startMessageId
-            if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
-                msgId.entryId() == startMessageId_.value().entryId() &&
-                msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
+            if (msgId.ledgerId() == startMessageId.value().ledgerId() &&
+                msgId.entryId() == startMessageId.value().entryId() &&
+                msgId.batchIndex() <= startMessageId.value().batchIndex()) {
                 LOG_DEBUG(getName() << "Ignoring message from before the 
startMessageId"
                                     << msg.getMessageId());
                 ++skippedMessages;
@@ -686,7 +691,7 @@ void ConsumerImpl::internalListener() {
     trackMessage(msg.getMessageId());
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
-        lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
+        lastDequedMessageId_ = msg.getMessageId();
         messageListener_(Consumer(shared_from_this()), msg);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
@@ -820,8 +825,9 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
 }
 
 void ConsumerImpl::messageProcessed(Message& msg, bool track) {
-    Lock lock(mutex_);
-    lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
+    Lock lock(mutexForMessageId_);
+    lastDequedMessageId_ = msg.getMessageId();
+    lock.unlock();
 
     ClientConnectionPtr currentCnx = getCnx().lock();
     if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
@@ -853,11 +859,11 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
             previousMessageId = MessageId(-1, nextMessageId.ledgerId(), 
nextMessageId.entryId() - 1, -1);
         }
         return Optional<MessageId>::of(previousMessageId);
-    } else if (lastDequedMessage_.is_present()) {
+    } else if (lastDequedMessageId_ != MessageId::earliest()) {
         // If the queue was empty we need to restart from the message just 
after the last one that has been
         // dequeued
         // in the past
-        return lastDequedMessage_;
+        return Optional<MessageId>::of(lastDequedMessageId_);
     } else {
         // No message was received or dequeued by this consumer. Next message 
would still be the
         // startMessageId
@@ -1193,6 +1199,9 @@ void ConsumerImpl::brokerConsumerStatsListener(Result 
res, BrokerConsumerStatsIm
 
 void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
     if (result == ResultOk) {
+        Lock lock(mutexForMessageId_);
+        lastDequedMessageId_ = MessageId::earliest();
+        lock.unlock();
         LOG_INFO(getName() << "Seek successfully");
     } else {
         LOG_ERROR(getName() << "Failed to seek: " << strResult(result));
@@ -1267,37 +1276,42 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
 
+inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const 
MessageId& messageId) {
+    return lastMessageIdInBroker > messageId && 
lastMessageIdInBroker.entryId() != -1;
+}
+
 void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback 
callback) {
-    MessageId lastDequed = this->lastMessageIdDequed();
-    MessageId lastInBroker = this->lastMessageIdInBroker();
-    if (lastInBroker > lastDequed && lastInBroker.entryId() != -1) {
-        callback(ResultOk, true);
-        return;
-    }
+    Lock lock(mutexForMessageId_);
+    const auto messageId =
+        (lastDequedMessageId_ == MessageId::earliest()) ? 
startMessageId_.value() : lastDequedMessageId_;
 
-    getLastMessageIdAsync([lastDequed, callback](Result result, MessageId 
messageId) {
-        if (result == ResultOk) {
-            if (messageId > lastDequed && messageId.entryId() != -1) {
-                callback(ResultOk, true);
+    if (messageId == MessageId::latest()) {
+        lock.unlock();
+        getLastMessageIdAsync([callback](Result result, const 
GetLastMessageIdResponse& response) {
+            if (result != ResultOk) {
+                callback(result, {});
+                return;
+            }
+            if (response.hasMarkDeletePosition() && 
response.getLastMessageId().entryId() >= 0) {
+                // We only care about comparing ledger ids and entry ids as 
mark delete position doesn't have
+                // other ids such as batch index
+                callback(ResultOk, 
compareLedgerAndEntryId(response.getMarkDeletePosition(),
+                                                           
response.getLastMessageId()) < 0);
             } else {
                 callback(ResultOk, false);
             }
-        } else {
-            callback(result, false);
-        }
-    });
-}
-
-void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId 
messageId,
-                                                  
BrokerGetLastMessageIdCallback callback) {
-    Lock lock(mutex_);
-    if (messageId > lastMessageIdInBroker()) {
-        lastMessageInBroker_ = Optional<MessageId>::of(messageId);
-        lock.unlock();
-        callback(res, messageId);
+        });
     } else {
+        if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
+            lock.unlock();
+            callback(ResultOk, true);
+            return;
+        }
         lock.unlock();
-        callback(res, lastMessageIdInBroker());
+
+        getLastMessageIdAsync([callback, messageId](Result result, const 
GetLastMessageIdResponse& response) {
+            callback(result, (result == ResultOk) && 
hasMoreMessages(response.getLastMessageId(), messageId));
+        });
     }
 }
 
@@ -1321,9 +1335,19 @@ void 
ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
             LOG_DEBUG(getName() << " Sending getLastMessageId Command for 
Consumer - " << getConsumerId()
                                 << ", requestId - " << requestId);
 
+            auto self = shared_from_this();
             cnx->newGetLastMessageId(consumerId_, requestId)
-                
.addListener(std::bind(&ConsumerImpl::brokerGetLastMessageIdListener, 
shared_from_this(),
-                                       std::placeholders::_1, 
std::placeholders::_2, callback));
+                .addListener([this, self, callback](Result result, const 
GetLastMessageIdResponse& response) {
+                    if (result == ResultOk) {
+                        LOG_DEBUG(getName() << "getLastMessageId: " << 
response);
+                        Lock lock(mutexForMessageId_);
+                        lastMessageIdInBroker_ = response.getLastMessageId();
+                        lock.unlock();
+                    } else {
+                        LOG_ERROR(getName() << "Failed to getLastMessageId: " 
<< result);
+                    }
+                    callback(result, response);
+                });
         } else {
             LOG_ERROR(getName() << " Operation not supported since server 
protobuf version "
                                 << cnx->getServerProtocolVersion() << " is 
older than proto::v12");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 2bdb82f..346d351 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -33,6 +33,7 @@
 #include "lib/UnAckedMessageTrackerDisabled.h"
 #include "MessageCrypto.h"
 #include "AckGroupingTracker.h"
+#include "GetLastMessageIdResponse.h"
 
 #include "CompressionCodec.h"
 #include <boost/dynamic_bitset.hpp>
@@ -54,7 +55,7 @@ class ExecutorService;
 class ConsumerImpl;
 class BatchAcknowledgementTracker;
 typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
-typedef std::function<void(Result result, MessageId messageId)> 
BrokerGetLastMessageIdCallback;
+typedef std::function<void(Result, const GetLastMessageIdResponse&)> 
BrokerGetLastMessageIdCallback;
 
 enum ConsumerTopicType
 {
@@ -193,10 +194,8 @@ class ConsumerImpl : public ConsumerImplBase,
     bool hasParent_;
     ConsumerTopicType consumerTopicType_;
 
-    Commands::SubscriptionMode subscriptionMode_;
-    Optional<MessageId> startMessageId_;
+    const Commands::SubscriptionMode subscriptionMode_;
 
-    Optional<MessageId> lastDequedMessage_;
     UnboundedBlockingQueue<Message> incomingMessages_;
     std::queue<ReceiveCallback> pendingReceives_;
     std::atomic_int availablePermits_;
@@ -217,17 +216,11 @@ class ConsumerImpl : public ConsumerImplBase,
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
 
-    Optional<MessageId> lastMessageInBroker_;
-    void brokerGetLastMessageIdListener(Result res, MessageId messageId,
-                                        BrokerGetLastMessageIdCallback 
callback);
-
-    const MessageId& lastMessageIdDequed() {
-        return lastDequedMessage_.is_present() ? lastDequedMessage_.value() : 
MessageId::earliest();
-    }
-
-    const MessageId& lastMessageIdInBroker() {
-        return lastMessageInBroker_.is_present() ? 
lastMessageInBroker_.value() : MessageId::earliest();
-    }
+    // Make the access to `startMessageId_`, `lastDequedMessageId_` and 
`lastMessageIdInBroker_` thread safe
+    mutable std::mutex mutexForMessageId_;
+    Optional<MessageId> startMessageId_;
+    MessageId lastDequedMessageId_{MessageId::earliest()};
+    MessageId lastMessageIdInBroker_{MessageId::earliest()};
 
     class ChunkedMessageCtx {
        public:
diff --git a/pulsar-client-cpp/lib/GetLastMessageIdResponse.h 
b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h
new file mode 100644
index 0000000..0acb783
--- /dev/null
+++ b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/MessageId.h>
+#include <iostream>
+
+namespace pulsar {
+
+class GetLastMessageIdResponse {
+    friend std::ostream& operator<<(std::ostream& os, const 
GetLastMessageIdResponse& response) {
+        os << "lastMessageId: " << response.lastMessageId_;
+        if (response.hasMarkDeletePosition_) {
+            os << ", markDeletePosition: " << response.markDeletePosition_;
+        }
+        return os;
+    }
+
+   public:
+    GetLastMessageIdResponse() = default;
+
+    GetLastMessageIdResponse(const MessageId& lastMessageId)
+        : lastMessageId_(lastMessageId), hasMarkDeletePosition_{false} {}
+
+    GetLastMessageIdResponse(const MessageId& lastMessageId, const MessageId& 
markDeletePosition)
+        : lastMessageId_(lastMessageId),
+          markDeletePosition_(markDeletePosition),
+          hasMarkDeletePosition_(true) {}
+
+    const MessageId& getLastMessageId() const noexcept { return 
lastMessageId_; }
+    const MessageId& getMarkDeletePosition() const noexcept { return 
markDeletePosition_; }
+    bool hasMarkDeletePosition() const noexcept { return 
hasMarkDeletePosition_; }
+
+   private:
+    MessageId lastMessageId_;
+    MessageId markDeletePosition_;
+    bool hasMarkDeletePosition_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc 
b/pulsar-client-cpp/lib/MessageIdUtil.h
similarity index 52%
copy from pulsar-client-cpp/tests/MessageIdTest.cc
copy to pulsar-client-cpp/lib/MessageIdUtil.h
index 06c2528..d6f80a1 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/lib/MessageIdUtil.h
@@ -17,21 +17,28 @@
  * under the License.
  */
 #include <pulsar/MessageId.h>
-#include "PulsarFriend.h"
+#include "PulsarApi.pb.h"
 
-#include <gtest/gtest.h>
+namespace pulsar {
 
-#include <string>
-
-using namespace pulsar;
-
-TEST(MessageIdTest, testSerialization) {
-    MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3);
-
-    std::string serialized;
-    msgId.serialize(serialized);
+inline MessageId toMessageId(const proto::MessageIdData& messageIdData) {
+    return MessageId{messageIdData.partition(), 
static_cast<int64_t>(messageIdData.ledgerid()),
+                     static_cast<int64_t>(messageIdData.entryid()), 
messageIdData.batch_index()};
+}
 
-    MessageId deserialized = MessageId::deserialize(serialized);
+namespace internal {
+template <typename T>
+static int compare(T lhs, T rhs) {
+    return (lhs < rhs) ? -1 : ((lhs == rhs) ? 0 : 1);
+}
+}  // namespace internal
 
-    ASSERT_EQ(msgId, deserialized);
+inline int compareLedgerAndEntryId(const MessageId& lhs, const MessageId& rhs) 
{
+    auto result = internal::compare(lhs.ledgerId(), rhs.ledgerId());
+    if (result != 0) {
+        return result;
+    }
+    return internal::compare(lhs.entryId(), rhs.entryId());
 }
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 48f5d58..0a7b321 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 }
 
 void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
-    consumer_->getLastMessageIdAsync(callback);
+    consumer_->getLastMessageIdAsync([callback](Result result, const 
GetLastMessageIdResponse& response) {
+        callback(result, response.getLastMessageId());
+    });
 }
 
 ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return 
readerImplWeakPtr_; }
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc 
b/pulsar-client-cpp/tests/MessageIdTest.cc
index 06c2528..55fa181 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/tests/MessageIdTest.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include <pulsar/MessageId.h>
+#include "lib/MessageIdUtil.h"
 #include "PulsarFriend.h"
 
 #include <gtest/gtest.h>
@@ -35,3 +36,24 @@ TEST(MessageIdTest, testSerialization) {
 
     ASSERT_EQ(msgId, deserialized);
 }
+
+TEST(MessageIdTest, testCompareLedgerAndEntryId) {
+    MessageId id1(-1, 2L, 1L, 0);
+    MessageId id2(-1, 2L, 1L, 1);
+    MessageId id3(-1, 2L, 2L, 0);
+    MessageId id4(-1, 3L, 0L, 0);
+    ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0);
+    ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0);
+
+    ASSERT_EQ(compareLedgerAndEntryId(id1, id3), -1);
+    ASSERT_EQ(compareLedgerAndEntryId(id3, id1), 1);
+
+    ASSERT_EQ(compareLedgerAndEntryId(id1, id4), -1);
+    ASSERT_EQ(compareLedgerAndEntryId(id4, id1), 1);
+
+    ASSERT_EQ(compareLedgerAndEntryId(id2, id4), -1);
+    ASSERT_EQ(compareLedgerAndEntryId(id4, id2), 1);
+
+    ASSERT_EQ(compareLedgerAndEntryId(id3, id4), -1);
+    ASSERT_EQ(compareLedgerAndEntryId(id4, id3), 1);
+}
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc 
b/pulsar-client-cpp/tests/ReaderTest.cc
index d95038b..8cd535c 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -26,6 +26,7 @@
 #include <time.h>
 #include <string>
 
+#include <lib/Latch.h>
 #include <lib/LogUtils.h>
 DECLARE_LOG_OBJECT()
 
@@ -577,3 +578,47 @@ TEST(ReaderTest, testIsConnected) {
     ASSERT_EQ(ResultOk, reader.close());
     ASSERT_FALSE(reader.isConnected());
 }
+
+TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
+    const std::string topic = "testHasMessageAvailableWhenCreated-" + 
std::to_string(time(nullptr));
+    Client client(serviceUrl);
+
+    ProducerConfiguration producerConf;
+    producerConf.setBatchingMaxMessages(3);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+
+    std::vector<MessageId> messageIds;
+    constexpr int numMessages = 7;
+    Latch latch(numMessages);
+    for (int i = 0; i < numMessages; i++) {
+        producer.sendAsync(MessageBuilder().setContent("msg-" + 
std::to_string(i)).build(),
+                           [i, &messageIds, &latch](Result result, const 
MessageId& messageId) {
+                               if (result == ResultOk) {
+                                   LOG_INFO("Send " << i << " to " << 
messageId);
+                                   messageIds.emplace_back(messageId);
+                               } else {
+                                   LOG_ERROR("Failed to send " << i << ": " << 
messageId);
+                               }
+                               latch.countdown();
+                           });
+    }
+    latch.wait(std::chrono::seconds(3));
+    ASSERT_EQ(messageIds.size(), numMessages);
+
+    Reader reader;
+    bool hasMessageAvailable;
+
+    for (size_t i = 0; i < messageIds.size() - 1; i++) {
+        ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, 
reader));
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+        EXPECT_TRUE(hasMessageAvailable);
+    }
+
+    // The start message ID is exclusive by default, so when we start at the 
last message, there should be no
+    // message available.
+    ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, 
reader));
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    EXPECT_FALSE(hasMessageAvailable);
+    client.close();
+}

Reply via email to