This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 20cfa08 [C++] Fix hasMessageAvailable returns wrong value for last
message (#13883)
20cfa08 is described below
commit 20cfa08580b4d31a9dbda50fb7fa0c398fea0bb4
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jan 24 12:06:00 2022 +0800
[C++] Fix hasMessageAvailable returns wrong value for last message (#13883)
In C++ client, there is a corner case that when a reader's start message ID
is the last message of a topic, `hasMessageAvailable` returns true. However, it
should return false because the start message ID is exclusive and in this case
`readNext` would never return a message unless new messages arrived.
The current C++ implementation of `hasMessageAvailable` is from long days
ago and has many problems. So this PR migrates the Java implementation of
`hasMessageAvailable` to C++ client.
Since after the modifications we need to access `startMessageId` in
`hasMessageAvailable`, which is called in a different thread from
`connectionOpened` that might modify `startMessageId`. We use a common mutex
`mutexForMessageIds` to protect the access to `lastDequedMessageId_` and
`lastMessageIdInBroker_`.
To fix the original tests when `startMessageId` is latest, this PR adds a
`GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The
`GetLastMessageIdResponse` contains the `consumer_mark_delete_position`
introduced from https://github.com/apache/pulsar/pull/9652 to compare with
`last_message_id` when `startMessageId` is latest.
This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and
`MessageIdTest# testCompareLedgerAndEntryId`.
(cherry picked from commit e50493ea17dd5f2f9d4527d74cc4f40e12439df2)
Fix the conflicts by
- Remove new fields introduced from
https://github.com/apache/pulsar/pull/13627
---
pulsar-client-cpp/lib/ClientConnection.cc | 23 +++--
pulsar-client-cpp/lib/ClientConnection.h | 5 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 112 +++++++++++++--------
pulsar-client-cpp/lib/ConsumerImpl.h | 23 ++---
pulsar-client-cpp/lib/GetLastMessageIdResponse.h | 56 +++++++++++
.../MessageIdTest.cc => lib/MessageIdUtil.h} | 33 +++---
pulsar-client-cpp/lib/ReaderImpl.cc | 4 +-
pulsar-client-cpp/tests/MessageIdTest.cc | 22 ++++
pulsar-client-cpp/tests/ReaderTest.cc | 45 +++++++++
9 files changed, 239 insertions(+), 84 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index 3ad6f40..24cf11c 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -37,6 +37,7 @@
#include "ProducerImpl.h"
#include "ConsumerImpl.h"
#include "checksum/ChecksumProvider.h"
+#include "MessageIdUtil.h"
DECLARE_LOG_OBJECT()
@@ -1072,7 +1073,7 @@ void ClientConnection::handleIncomingCommand() {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
- Promise<Result, MessageId> getLastMessageIdPromise
= it->second;
+ auto getLastMessageIdPromise = it->second;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
@@ -1191,15 +1192,18 @@ void ClientConnection::handleIncomingCommand() {
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
- Promise<Result, MessageId> getLastMessageIdPromise =
it->second;
+ auto getLastMessageIdPromise = it->second;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
- MessageIdData messageIdData =
getLastMessageIdResponse.last_message_id();
- MessageId messageId =
MessageId(messageIdData.partition(), messageIdData.ledgerid(),
-
messageIdData.entryid(), messageIdData.batch_index());
-
- getLastMessageIdPromise.setValue(messageId);
+ if
(getLastMessageIdResponse.has_consumer_mark_delete_position()) {
+ getLastMessageIdPromise.setValue(
+
{toMessageId(getLastMessageIdResponse.last_message_id()),
+
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
+ } else {
+ getLastMessageIdPromise.setValue(
+
{toMessageId(getLastMessageIdResponse.last_message_id())});
+ }
} else {
lock.unlock();
LOG_WARN(
@@ -1608,9 +1612,10 @@ Commands::ChecksumType
ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c :
Commands::None;
}
-Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t
consumerId, uint64_t requestId) {
+Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(uint64_t consumerId,
+
uint64_t requestId) {
Lock lock(mutex_);
- Promise<Result, MessageId> promise;
+ Promise<Result, GetLastMessageIdResponse> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
diff --git a/pulsar-client-cpp/lib/ClientConnection.h
b/pulsar-client-cpp/lib/ClientConnection.h
index 48e6d57..7ca5f37 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -46,6 +46,7 @@
#include <set>
#include <lib/BrokerConsumerStatsImpl.h>
#include "lib/PeriodicTask.h"
+#include "lib/GetLastMessageIdResponse.h"
using namespace pulsar;
@@ -156,7 +157,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t
consumerId, uint64_t requestId);
- Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId,
uint64_t requestId);
+ Future<Result, GetLastMessageIdResponse> newGetLastMessageId(uint64_t
consumerId, uint64_t requestId);
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const
std::string& nsName, uint64_t requestId);
@@ -306,7 +307,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>>
PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;
- typedef std::map<long, Promise<Result, MessageId>>
PendingGetLastMessageIdRequestsMap;
+ typedef std::map<long, Promise<Result, GetLastMessageIdResponse>>
PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>>
PendingGetNamespaceTopicsMap;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 77c0fa9..84583d5 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -25,6 +25,7 @@
#include "pulsar/Result.h"
#include "pulsar/MessageId.h"
#include "Utils.h"
+#include "MessageIdUtil.h"
#include "AckGroupingTracker.h"
#include "AckGroupingTrackerEnabled.h"
#include "AckGroupingTrackerDisabled.h"
@@ -51,7 +52,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
- startMessageId_(startMessageId),
// This is the initial capacity of the queue
incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)),
availablePermits_(0),
@@ -63,7 +63,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
readCompacted_(conf.isReadCompacted()),
- lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
+ startMessageId_(startMessageId) {
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " <<
consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
@@ -159,8 +159,9 @@ void ConsumerImpl::start() {
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
- if (state_ == Closed) {
- lock.unlock();
+ const auto state = state_;
+ lock.unlock();
+ if (state == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already
closed");
return;
}
@@ -169,23 +170,24 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
// sending the subscribe request.
cnx->registerConsumer(consumerId_, shared_from_this());
+ Lock lockForMessageId(mutexForMessageId_);
Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
- unAckedMessageTrackerPtr_->clear();
- batchAcknowledgementTracker_.clear();
-
if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
// Update startMessageId so that we can discard messages after delivery
// restarts
startMessageId_ = firstMessageInQueue;
}
+ const auto startMessageId = startMessageId_;
+ lockForMessageId.unlock();
- lock.unlock();
+ unAckedMessageTrackerPtr_->clear();
+ batchAcknowledgementTracker_.clear();
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
- startMessageId_, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition(),
+ startMessageId, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition(),
config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(),
config_.getPriorityLevel());
cnx->sendRequestWithId(cmd, requestId)
@@ -440,6 +442,9 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
batchAcknowledgementTracker_.receivedMessage(batchedMessage);
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " <<
batchedMessage.getMessageId());
+ Lock lock(mutexForMessageId_);
+ const auto startMessageId = startMessageId_;
+ lock.unlock();
int skippedMessages = 0;
@@ -449,14 +454,14 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.getTopicName());
- if (startMessageId_.is_present()) {
+ if (startMessageId.is_present()) {
const MessageId& msgId = msg.getMessageId();
// If we are receiving a batch message, we need to discard
messages that were prior
// to the startMessageId
- if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
- msgId.entryId() == startMessageId_.value().entryId() &&
- msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
+ if (msgId.ledgerId() == startMessageId.value().ledgerId() &&
+ msgId.entryId() == startMessageId.value().entryId() &&
+ msgId.batchIndex() <= startMessageId.value().batchIndex()) {
LOG_DEBUG(getName() << "Ignoring message from before the
startMessageId"
<< msg.getMessageId());
++skippedMessages;
@@ -587,7 +592,7 @@ void ConsumerImpl::internalListener() {
trackMessage(msg);
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
- lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
+ lastDequedMessageId_ = msg.getMessageId();
messageListener_(Consumer(shared_from_this()), msg);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
@@ -721,8 +726,9 @@ Result ConsumerImpl::receiveHelper(Message& msg, int
timeout) {
}
void ConsumerImpl::messageProcessed(Message& msg, bool track) {
- Lock lock(mutex_);
- lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
+ Lock lock(mutexForMessageId_);
+ lastDequedMessageId_ = msg.getMessageId();
+ lock.unlock();
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
@@ -754,11 +760,11 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
previousMessageId = MessageId(-1, nextMessageId.ledgerId(),
nextMessageId.entryId() - 1, -1);
}
return Optional<MessageId>::of(previousMessageId);
- } else if (lastDequedMessage_.is_present()) {
+ } else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just
after the last one that has been
// dequeued
// in the past
- return lastDequedMessage_;
+ return Optional<MessageId>::of(lastDequedMessageId_);
} else {
// No message was received or dequeued by this consumer. Next message
would still be the
// startMessageId
@@ -1094,6 +1100,9 @@ void ConsumerImpl::brokerConsumerStatsListener(Result
res, BrokerConsumerStatsIm
void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
if (result == ResultOk) {
+ Lock lock(mutexForMessageId_);
+ lastDequedMessageId_ = MessageId::earliest();
+ lock.unlock();
LOG_INFO(getName() << "Seek successfully");
} else {
LOG_ERROR(getName() << "Failed to seek: " << strResult(result));
@@ -1168,37 +1177,42 @@ void ConsumerImpl::seekAsync(uint64_t timestamp,
ResultCallback callback) {
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
+inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const
MessageId& messageId) {
+ return lastMessageIdInBroker > messageId &&
lastMessageIdInBroker.entryId() != -1;
+}
+
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback
callback) {
- MessageId lastDequed = this->lastMessageIdDequed();
- MessageId lastInBroker = this->lastMessageIdInBroker();
- if (lastInBroker > lastDequed && lastInBroker.entryId() != -1) {
- callback(ResultOk, true);
- return;
- }
+ Lock lock(mutexForMessageId_);
+ const auto messageId =
+ (lastDequedMessageId_ == MessageId::earliest()) ?
startMessageId_.value() : lastDequedMessageId_;
- getLastMessageIdAsync([lastDequed, callback](Result result, MessageId
messageId) {
- if (result == ResultOk) {
- if (messageId > lastDequed && messageId.entryId() != -1) {
- callback(ResultOk, true);
+ if (messageId == MessageId::latest()) {
+ lock.unlock();
+ getLastMessageIdAsync([callback](Result result, const
GetLastMessageIdResponse& response) {
+ if (result != ResultOk) {
+ callback(result, {});
+ return;
+ }
+ if (response.hasMarkDeletePosition() &&
response.getLastMessageId().entryId() >= 0) {
+ // We only care about comparing ledger ids and entry ids as
mark delete position doesn't have
+ // other ids such as batch index
+ callback(ResultOk,
compareLedgerAndEntryId(response.getMarkDeletePosition(),
+
response.getLastMessageId()) < 0);
} else {
callback(ResultOk, false);
}
- } else {
- callback(result, false);
- }
- });
-}
-
-void ConsumerImpl::brokerGetLastMessageIdListener(Result res, MessageId
messageId,
-
BrokerGetLastMessageIdCallback callback) {
- Lock lock(mutex_);
- if (messageId > lastMessageIdInBroker()) {
- lastMessageInBroker_ = Optional<MessageId>::of(messageId);
- lock.unlock();
- callback(res, messageId);
+ });
} else {
+ if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
+ lock.unlock();
+ callback(ResultOk, true);
+ return;
+ }
lock.unlock();
- callback(res, lastMessageIdInBroker());
+
+ getLastMessageIdAsync([callback, messageId](Result result, const
GetLastMessageIdResponse& response) {
+ callback(result, (result == ResultOk) &&
hasMoreMessages(response.getLastMessageId(), messageId));
+ });
}
}
@@ -1222,9 +1236,19 @@ void
ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
LOG_DEBUG(getName() << " Sending getLastMessageId Command for
Consumer - " << getConsumerId()
<< ", requestId - " << requestId);
+ auto self = shared_from_this();
cnx->newGetLastMessageId(consumerId_, requestId)
-
.addListener(std::bind(&ConsumerImpl::brokerGetLastMessageIdListener,
shared_from_this(),
- std::placeholders::_1,
std::placeholders::_2, callback));
+ .addListener([this, self, callback](Result result, const
GetLastMessageIdResponse& response) {
+ if (result == ResultOk) {
+ LOG_DEBUG(getName() << "getLastMessageId: " <<
response);
+ Lock lock(mutexForMessageId_);
+ lastMessageIdInBroker_ = response.getLastMessageId();
+ lock.unlock();
+ } else {
+ LOG_ERROR(getName() << "Failed to getLastMessageId: "
<< result);
+ }
+ callback(result, response);
+ });
} else {
LOG_ERROR(getName() << " Operation not supported since server
protobuf version "
<< cnx->getServerProtocolVersion() << " is
older than proto::v12");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 0754a89..80d96d0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -33,6 +33,7 @@
#include "lib/UnAckedMessageTrackerDisabled.h"
#include "MessageCrypto.h"
#include "AckGroupingTracker.h"
+#include "GetLastMessageIdResponse.h"
#include "CompressionCodec.h"
#include <boost/dynamic_bitset.hpp>
@@ -53,7 +54,7 @@ class ExecutorService;
class ConsumerImpl;
class BatchAcknowledgementTracker;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
-typedef std::function<void(Result result, MessageId messageId)>
BrokerGetLastMessageIdCallback;
+typedef std::function<void(Result, const GetLastMessageIdResponse&)>
BrokerGetLastMessageIdCallback;
enum ConsumerTopicType
{
@@ -191,10 +192,8 @@ class ConsumerImpl : public ConsumerImplBase,
bool hasParent_;
ConsumerTopicType consumerTopicType_;
- Commands::SubscriptionMode subscriptionMode_;
- Optional<MessageId> startMessageId_;
+ const Commands::SubscriptionMode subscriptionMode_;
- Optional<MessageId> lastDequedMessage_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::queue<ReceiveCallback> pendingReceives_;
std::atomic_int availablePermits_;
@@ -215,17 +214,11 @@ class ConsumerImpl : public ConsumerImplBase,
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
- Optional<MessageId> lastMessageInBroker_;
- void brokerGetLastMessageIdListener(Result res, MessageId messageId,
- BrokerGetLastMessageIdCallback
callback);
-
- const MessageId& lastMessageIdDequed() {
- return lastDequedMessage_.is_present() ? lastDequedMessage_.value() :
MessageId::earliest();
- }
-
- const MessageId& lastMessageIdInBroker() {
- return lastMessageInBroker_.is_present() ?
lastMessageInBroker_.value() : MessageId::earliest();
- }
+ // Make the access to `startMessageId_`, `lastDequedMessageId_` and
`lastMessageIdInBroker_` thread safe
+ mutable std::mutex mutexForMessageId_;
+ Optional<MessageId> startMessageId_;
+ MessageId lastDequedMessageId_{MessageId::earliest()};
+ MessageId lastMessageIdInBroker_{MessageId::earliest()};
friend class PulsarFriend;
diff --git a/pulsar-client-cpp/lib/GetLastMessageIdResponse.h
b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h
new file mode 100644
index 0000000..0acb783
--- /dev/null
+++ b/pulsar-client-cpp/lib/GetLastMessageIdResponse.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/MessageId.h>
+#include <iostream>
+
+namespace pulsar {
+
+class GetLastMessageIdResponse {
+ friend std::ostream& operator<<(std::ostream& os, const
GetLastMessageIdResponse& response) {
+ os << "lastMessageId: " << response.lastMessageId_;
+ if (response.hasMarkDeletePosition_) {
+ os << ", markDeletePosition: " << response.markDeletePosition_;
+ }
+ return os;
+ }
+
+ public:
+ GetLastMessageIdResponse() = default;
+
+ GetLastMessageIdResponse(const MessageId& lastMessageId)
+ : lastMessageId_(lastMessageId), hasMarkDeletePosition_{false} {}
+
+ GetLastMessageIdResponse(const MessageId& lastMessageId, const MessageId&
markDeletePosition)
+ : lastMessageId_(lastMessageId),
+ markDeletePosition_(markDeletePosition),
+ hasMarkDeletePosition_(true) {}
+
+ const MessageId& getLastMessageId() const noexcept { return
lastMessageId_; }
+ const MessageId& getMarkDeletePosition() const noexcept { return
markDeletePosition_; }
+ bool hasMarkDeletePosition() const noexcept { return
hasMarkDeletePosition_; }
+
+ private:
+ MessageId lastMessageId_;
+ MessageId markDeletePosition_;
+ bool hasMarkDeletePosition_;
+};
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc
b/pulsar-client-cpp/lib/MessageIdUtil.h
similarity index 52%
copy from pulsar-client-cpp/tests/MessageIdTest.cc
copy to pulsar-client-cpp/lib/MessageIdUtil.h
index 06c2528..d6f80a1 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/lib/MessageIdUtil.h
@@ -17,21 +17,28 @@
* under the License.
*/
#include <pulsar/MessageId.h>
-#include "PulsarFriend.h"
+#include "PulsarApi.pb.h"
-#include <gtest/gtest.h>
+namespace pulsar {
-#include <string>
-
-using namespace pulsar;
-
-TEST(MessageIdTest, testSerialization) {
- MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3);
-
- std::string serialized;
- msgId.serialize(serialized);
+inline MessageId toMessageId(const proto::MessageIdData& messageIdData) {
+ return MessageId{messageIdData.partition(),
static_cast<int64_t>(messageIdData.ledgerid()),
+ static_cast<int64_t>(messageIdData.entryid()),
messageIdData.batch_index()};
+}
- MessageId deserialized = MessageId::deserialize(serialized);
+namespace internal {
+template <typename T>
+static int compare(T lhs, T rhs) {
+ return (lhs < rhs) ? -1 : ((lhs == rhs) ? 0 : 1);
+}
+} // namespace internal
- ASSERT_EQ(msgId, deserialized);
+inline int compareLedgerAndEntryId(const MessageId& lhs, const MessageId& rhs)
{
+ auto result = internal::compare(lhs.ledgerId(), rhs.ledgerId());
+ if (result != 0) {
+ return result;
+ }
+ return internal::compare(lhs.entryId(), rhs.entryId());
}
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 48f5d58..0a7b321 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -139,7 +139,9 @@ void ReaderImpl::seekAsync(uint64_t timestamp,
ResultCallback callback) {
}
void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) {
- consumer_->getLastMessageIdAsync(callback);
+ consumer_->getLastMessageIdAsync([callback](Result result, const
GetLastMessageIdResponse& response) {
+ callback(result, response.getLastMessageId());
+ });
}
ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return
readerImplWeakPtr_; }
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc
b/pulsar-client-cpp/tests/MessageIdTest.cc
index 06c2528..55fa181 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/tests/MessageIdTest.cc
@@ -17,6 +17,7 @@
* under the License.
*/
#include <pulsar/MessageId.h>
+#include "lib/MessageIdUtil.h"
#include "PulsarFriend.h"
#include <gtest/gtest.h>
@@ -35,3 +36,24 @@ TEST(MessageIdTest, testSerialization) {
ASSERT_EQ(msgId, deserialized);
}
+
+TEST(MessageIdTest, testCompareLedgerAndEntryId) {
+ MessageId id1(-1, 2L, 1L, 0);
+ MessageId id2(-1, 2L, 1L, 1);
+ MessageId id3(-1, 2L, 2L, 0);
+ MessageId id4(-1, 3L, 0L, 0);
+ ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0);
+ ASSERT_EQ(compareLedgerAndEntryId(id1, id2), 0);
+
+ ASSERT_EQ(compareLedgerAndEntryId(id1, id3), -1);
+ ASSERT_EQ(compareLedgerAndEntryId(id3, id1), 1);
+
+ ASSERT_EQ(compareLedgerAndEntryId(id1, id4), -1);
+ ASSERT_EQ(compareLedgerAndEntryId(id4, id1), 1);
+
+ ASSERT_EQ(compareLedgerAndEntryId(id2, id4), -1);
+ ASSERT_EQ(compareLedgerAndEntryId(id4, id2), 1);
+
+ ASSERT_EQ(compareLedgerAndEntryId(id3, id4), -1);
+ ASSERT_EQ(compareLedgerAndEntryId(id4, id3), 1);
+}
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc
b/pulsar-client-cpp/tests/ReaderTest.cc
index d95038b..8cd535c 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -26,6 +26,7 @@
#include <time.h>
#include <string>
+#include <lib/Latch.h>
#include <lib/LogUtils.h>
DECLARE_LOG_OBJECT()
@@ -577,3 +578,47 @@ TEST(ReaderTest, testIsConnected) {
ASSERT_EQ(ResultOk, reader.close());
ASSERT_FALSE(reader.isConnected());
}
+
+TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
+ const std::string topic = "testHasMessageAvailableWhenCreated-" +
std::to_string(time(nullptr));
+ Client client(serviceUrl);
+
+ ProducerConfiguration producerConf;
+ producerConf.setBatchingMaxMessages(3);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+
+ std::vector<MessageId> messageIds;
+ constexpr int numMessages = 7;
+ Latch latch(numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ producer.sendAsync(MessageBuilder().setContent("msg-" +
std::to_string(i)).build(),
+ [i, &messageIds, &latch](Result result, const
MessageId& messageId) {
+ if (result == ResultOk) {
+ LOG_INFO("Send " << i << " to " <<
messageId);
+ messageIds.emplace_back(messageId);
+ } else {
+ LOG_ERROR("Failed to send " << i << ": " <<
messageId);
+ }
+ latch.countdown();
+ });
+ }
+ latch.wait(std::chrono::seconds(3));
+ ASSERT_EQ(messageIds.size(), numMessages);
+
+ Reader reader;
+ bool hasMessageAvailable;
+
+ for (size_t i = 0; i < messageIds.size() - 1; i++) {
+ ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {},
reader));
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ EXPECT_TRUE(hasMessageAvailable);
+ }
+
+ // The start message ID is exclusive by default, so when we start at the
last message, there should be no
+ // message available.
+ ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {},
reader));
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ EXPECT_FALSE(hasMessageAvailable);
+ client.close();
+}