This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 17fcf2a Fix hasMessageAvailable incorrectly returns true when read to
latest after seeking by timestamp (#498)
17fcf2a is described below
commit 17fcf2af1ec81c1ab372c23a0716c92872d2c0d8
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 21 19:30:18 2025 +0800
Fix hasMessageAvailable incorrectly returns true when read to latest after
seeking by timestamp (#498)
---
lib/ConsumerImpl.cc | 25 +++++++++++++++++--------
lib/ConsumerImpl.h | 1 +
tests/ReaderTest.cc | 10 ++++++++++
3 files changed, 28 insertions(+), 8 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index f423ccf..401d0a1 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1113,7 +1113,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
*/
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
- if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ if (hasSoughtByTimestamp()) {
+ // Invalidate startMessageId_ so that isPriorBatchIndex and
isPriorEntryIndex checks will be
+ // skipped, and hasMessageAvailableAsync won't use startMessageId_
in compare.
+ startMessageId_ = boost::none;
+ } else {
startMessageId_ = seekMessageId_.get();
}
SeekStatus expected = SeekStatus::COMPLETED;
@@ -1578,10 +1582,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
{
std::lock_guard<std::mutex> lock{mutexForMessageId_};
compareMarkDeletePosition =
- (lastDequedMessageId_ == MessageId::earliest()) &&
- (startMessageId_.get().value_or(MessageId::earliest()) ==
MessageId::latest());
- }
- if (compareMarkDeletePosition ||
hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ // there is no message received by consumer, so we cannot compare
the last position with the last
+ // received position
+ lastDequedMessageId_ == MessageId::earliest() &&
+ // If the start message id is latest, we should seek to the actual
last message first.
+ (startMessageId_.get().value_or(MessageId::earliest()) ==
MessageId::latest() ||
+ // If there is a previous seek operation by timestamp, the start
message id will be incorrect, so
+ // we cannot compare the start position with the last position.
+ hasSoughtByTimestamp());
+ }
+ if (compareMarkDeletePosition) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const
GetLastMessageIdResponse& response) {
if (result != ResultOk) {
@@ -1600,8 +1610,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
callback(ResultOk, false);
}
};
- if (self->config_.isStartMessageIdInclusive() &&
- !self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ if (self->config_.isStartMessageIdInclusive() &&
!self->hasSoughtByTimestamp()) {
self->seekAsync(response.getLastMessageId(), [callback,
handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
@@ -1766,7 +1775,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
const SharedBuffer& seek, c
// It's during reconnection, complete the seek future
after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
- if
(!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ if (!hasSoughtByTimestamp()) {
startMessageId_ = seekMessageId_.get();
}
seekCallback_.release()(result);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ce0c415..055b487 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -266,6 +266,7 @@ class ConsumerImpl : public ConsumerImplBase {
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
std::atomic<bool> hasSoughtByTimestamp_{false};
+ bool hasSoughtByTimestamp() const { return
hasSoughtByTimestamp_.load(std::memory_order_acquire); }
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
class ChunkedMessageCtx {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index fb0db03..0371bac 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -885,6 +885,16 @@ TEST_F(ReaderSeekTest,
testHasMessageAvailableAfterSeekTimestamp) {
createReader(reader, msgId);
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+
+ bool hasMessageAvailable;
+ while (true) {
+ ASSERT_EQ(ResultOk,
reader.hasMessageAvailable(hasMessageAvailable));
+ if (!hasMessageAvailable) {
+ break;
+ }
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+ }
}
// Test `hasMessageAvailableAsync` will complete immediately if the
incoming message queue is non-empty