This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ee1d7b9 Fix hasMessageAvailable might return true after seeking to
latest (#409)
ee1d7b9 is described below
commit ee1d7b9c5b1eed6fe0a279203648a06c30a3feb0
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Mar 11 19:09:18 2024 +0800
Fix hasMessageAvailable might return true after seeking to latest (#409)
* ### Motivation
After a seek operation is done, the `startMessageId` will be updated
until the reconnection due to the seek is done in `connectionOpened`.
So before it's updated, `hasMessageAvailable` could compare with an
outdated `startMessageId` and return a wrong value.
### Modifications
Replace `duringSeek` with a `SeekStatus` field:
- `NOT_STARTED`: initial, or a seek operation is done. `seek` could only
succeed in this status.
- `IN_PROGRESS`: A seek operation has started but the client does not
receive the response from broker.
- `COMPLETED`: The client has received the seek response but the seek
future is not done.
After the status becomes `COMPLETED`, if the connection is not ready,
next time the connection is established, the status will change from
`COMPLETED` to `NOT_STARTED` and then seek future will be completed
in the internal executor.
Add `testHasMessageAvailableAfterSeekToEnd` and `testSeekInProgress`.
---
lib/ConsumerImpl.cc | 87 ++++++++++++++++++++++++++++++-----------------------
lib/ConsumerImpl.h | 31 +++++++++++++++++--
lib/Synchronized.h | 5 +++
tests/ReaderTest.cc | 55 +++++++++++++++++++++++++++++++++
4 files changed, 139 insertions(+), 39 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index fa33e28..ebc8518 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -236,16 +236,14 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
// sending the subscribe request.
cnx->registerConsumer(consumerId_, get_shared_this_ptr());
- if (duringSeek_) {
+ if (duringSeek()) {
ackGroupingTrackerPtr_->flushAndClean();
}
Lock lockForMessageId(mutexForMessageId_);
- // Update startMessageId so that we can discard messages after delivery
restarts
- const auto startMessageId = clearReceiveQueue();
+ clearReceiveQueue();
const auto subscribeMessageId =
- (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId : boost::none;
- startMessageId_ = startMessageId;
+ (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId_.get() : boost::none;
lockForMessageId.unlock();
unAckedMessageTrackerPtr_->clear();
@@ -1048,14 +1046,21 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
* Clear the internal receiver queue and returns the message id of what was
the 1st message in the queue that
* was
* not seen by the application
+ * `startMessageId_` is updated so that we can discard messages after delivery
restarts.
*/
-boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
- bool expectedDuringSeek = true;
- if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
- return seekMessageId_.get();
+void ConsumerImpl::clearReceiveQueue() {
+ if (duringSeek()) {
+ startMessageId_ = seekMessageId_.get();
+ SeekStatus expected = SeekStatus::COMPLETED;
+ if (seekStatus_.compare_exchange_strong(expected,
SeekStatus::NOT_STARTED)) {
+ auto seekCallback = seekCallback_.release();
+ executor_->postWork([seekCallback] { seekCallback(ResultOk); });
+ }
+ return;
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
- return startMessageId_.get();
+ return;
}
+
Message nextMessageInQueue;
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
// There was at least one message pending in the queue
@@ -1071,16 +1076,12 @@ boost::optional<MessageId>
ConsumerImpl::clearReceiveQueue() {
.ledgerId(nextMessageId.ledgerId())
.entryId(nextMessageId.entryId() -
1)
.build();
- return previousMessageId;
+ startMessageId_ = previousMessageId;
} else if (lastDequedMessageId_ != MessageId::earliest()) {
// If the queue was empty we need to restart from the message just
after the last one that has been
// dequeued
// in the past
- return lastDequedMessageId_;
- } else {
- // No message was received or dequeued by this consumer. Next message
would still be the
- // startMessageId
- return startMessageId_.get();
+ startMessageId_ = lastDequedMessageId_;
}
}
@@ -1500,18 +1501,15 @@ void ConsumerImpl::seekAsync(uint64_t timestamp,
ResultCallback callback) {
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
-inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const
MessageId& messageId) {
- return lastMessageIdInBroker > messageId &&
lastMessageIdInBroker.entryId() != -1;
-}
-
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback
callback) {
- const auto startMessageId = startMessageId_.get();
- Lock lock(mutexForMessageId_);
- const auto messageId =
- (lastDequedMessageId_ == MessageId::earliest()) ?
startMessageId.value() : lastDequedMessageId_;
-
- if (messageId == MessageId::latest()) {
- lock.unlock();
+ bool compareMarkDeletePosition;
+ {
+ std::lock_guard<std::mutex> lock{mutexForMessageId_};
+ compareMarkDeletePosition =
+ (lastDequedMessageId_ == MessageId::earliest()) &&
+ (startMessageId_.get().value_or(MessageId::earliest()) ==
MessageId::latest());
+ }
+ if (compareMarkDeletePosition) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const
GetLastMessageIdResponse& response) {
if (result != ResultOk) {
@@ -1543,16 +1541,15 @@ void
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
}
});
} else {
- if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
- lock.unlock();
+ if (hasMoreMessages()) {
callback(ResultOk, true);
return;
}
- lock.unlock();
-
- getLastMessageIdAsync([callback, messageId](Result result, const
GetLastMessageIdResponse& response) {
- callback(result, (result == ResultOk) &&
hasMoreMessages(response.getLastMessageId(), messageId));
- });
+ auto self = get_shared_this_ptr();
+ getLastMessageIdAsync(
+ [this, self, callback](Result result, const
GetLastMessageIdResponse& response) {
+ callback(result, (result == ResultOk) && hasMoreMessages());
+ });
}
}
@@ -1656,9 +1653,18 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
SharedBuffer seek, const Me
return;
}
+ auto expected = SeekStatus::NOT_STARTED;
+ if (!seekStatus_.compare_exchange_strong(expected,
SeekStatus::IN_PROGRESS)) {
+ LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " <<
timestamp << " when the status is "
+ << static_cast<int>(expected));
+ callback(ResultNotAllowedError);
+ return;
+ }
+
const auto originalSeekMessageId = seekMessageId_.get();
seekMessageId_ = seekId;
- duringSeek_ = true;
+ seekStatus_ = SeekStatus::IN_PROGRESS;
+ seekCallback_ = std::move(callback);
if (timestamp > 0) {
LOG_INFO(getName() << " Seeking subscription to " << timestamp);
} else {
@@ -1682,12 +1688,19 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
SharedBuffer seek, const Me
Lock lock(mutexForMessageId_);
lastDequedMessageId_ = MessageId::earliest();
lock.unlock();
+ if (getCnx().expired()) {
+ // It's during reconnection, complete the seek future
after connection is established
+ seekStatus_ = SeekStatus::COMPLETED;
+ } else {
+ startMessageId_ = seekMessageId_.get();
+ seekCallback_.release()(result);
+ }
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
seekMessageId_ = originalSeekMessageId;
- duringSeek_ = false;
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ seekCallback_.release()(result);
}
- callback(result);
});
}
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 524acb8..82e323b 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -75,6 +75,13 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC =
"REAL_TOPIC";
const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
+enum class SeekStatus : std::uint8_t
+{
+ NOT_STARTED,
+ IN_PROGRESS,
+ COMPLETED
+};
+
class ConsumerImpl : public ConsumerImplBase {
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const
std::string& subscriptionName,
@@ -193,7 +200,7 @@ class ConsumerImpl : public ConsumerImplBase {
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback
callback);
- boost::optional<MessageId> clearReceiveQueue();
+ void clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId&
seekId, long timestamp,
ResultCallback callback);
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack
cb);
@@ -239,10 +246,13 @@ class ConsumerImpl : public ConsumerImplBase {
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};
- std::atomic_bool duringSeek_{false};
+ std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
+ Synchronized<ResultCallback> seekCallback_{[](Result) {}};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
+ bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
+
class ChunkedMessageCtx {
public:
ChunkedMessageCtx() : totalChunks_(0) {}
@@ -332,6 +342,23 @@ class ConsumerImpl : public ConsumerImplBase {
const
proto::MessageIdData& messageIdData,
const
ClientConnectionPtr& cnx, MessageId& messageId);
+ bool hasMoreMessages() const {
+ std::lock_guard<std::mutex> lock{mutexForMessageId_};
+ if (lastMessageIdInBroker_.entryId() == -1L) {
+ return false;
+ }
+
+ const auto inclusive = config_.isStartMessageIdInclusive();
+ if (lastDequedMessageId_ == MessageId::earliest()) {
+ // If startMessageId_ is none, use latest so that this method will
return false
+ const auto startMessageId =
startMessageId_.get().value_or(MessageId::latest());
+ return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
+ : (lastMessageIdInBroker_ > startMessageId);
+ } else {
+ return lastMessageIdInBroker_ > lastDequedMessageId_;
+ }
+ }
+
friend class PulsarFriend;
friend class MultiTopicsConsumerImpl;
diff --git a/lib/Synchronized.h b/lib/Synchronized.h
index a98c08d..5449a9f 100644
--- a/lib/Synchronized.h
+++ b/lib/Synchronized.h
@@ -30,6 +30,11 @@ class Synchronized {
return value_;
}
+ T&& release() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return std::move(value_);
+ }
+
Synchronized& operator=(const T& value) {
std::lock_guard<std::mutex> lock(mutex_);
value_ = value;
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 723972d..9d2fe5f 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -752,6 +752,8 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
producer.close();
}
+class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
+
TEST(ReaderSeekTest, testStartAtLatestMessageId) {
Client client(serviceUrl);
@@ -784,4 +786,57 @@ TEST(ReaderSeekTest, testStartAtLatestMessageId) {
producer.close();
}
+TEST(ReaderTest, testSeekInProgress) {
+ Client client(serviceUrl);
+ const auto topic = "test-seek-in-progress-" +
std::to_string(time(nullptr));
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {},
reader));
+
+ reader.seekAsync(MessageId::earliest(), [](Result) {});
+ Promise<Result, Result> promise;
+ reader.seekAsync(MessageId::earliest(), [promise](Result result) {
promise.setValue(result); });
+ Result result;
+ promise.getFuture().get(result);
+ ASSERT_EQ(result, ResultNotAllowedError);
+ client.close();
+}
+
+TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
+ Client client(serviceUrl);
+ const auto topic = "test-has-message-available-after-seek-to-end-" +
std::to_string(time(nullptr));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {},
reader));
+
+ producer.send(MessageBuilder().setContent("msg-0").build());
+ producer.send(MessageBuilder().setContent("msg-1").build());
+
+ bool hasMessageAvailable;
+ if (GetParam()) {
+ // Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been
initialized
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ }
+
+ ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ ASSERT_FALSE(hasMessageAvailable);
+
+ producer.send(MessageBuilder().setContent("msg-2").build());
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ ASSERT_TRUE(hasMessageAvailable);
+
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 1000));
+ ASSERT_EQ("msg-2", msg.getDataAsString());
+
+ // Test the 2nd seek
+ ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+ ASSERT_FALSE(hasMessageAvailable);
+
+ client.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true,
false));