This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 17fcf2a  Fix hasMessageAvailable incorrectly returns true when read to 
latest after seeking by timestamp (#498)
17fcf2a is described below

commit 17fcf2af1ec81c1ab372c23a0716c92872d2c0d8
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 21 19:30:18 2025 +0800

    Fix hasMessageAvailable incorrectly returns true when read to latest after 
seeking by timestamp (#498)
---
 lib/ConsumerImpl.cc | 25 +++++++++++++++++--------
 lib/ConsumerImpl.h  |  1 +
 tests/ReaderTest.cc | 10 ++++++++++
 3 files changed, 28 insertions(+), 8 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index f423ccf..401d0a1 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1113,7 +1113,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  */
 void ConsumerImpl::clearReceiveQueue() {
     if (duringSeek()) {
-        if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+        if (hasSoughtByTimestamp()) {
+            // Invalidate startMessageId_ so that isPriorBatchIndex and 
isPriorEntryIndex checks will be
+            // skipped, and hasMessageAvailableAsync won't use startMessageId_ 
in compare.
+            startMessageId_ = boost::none;
+        } else {
             startMessageId_ = seekMessageId_.get();
         }
         SeekStatus expected = SeekStatus::COMPLETED;
@@ -1578,10 +1582,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
     {
         std::lock_guard<std::mutex> lock{mutexForMessageId_};
         compareMarkDeletePosition =
-            (lastDequedMessageId_ == MessageId::earliest()) &&
-            (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest());
-    }
-    if (compareMarkDeletePosition || 
hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+            // there is no message received by consumer, so we cannot compare 
the last position with the last
+            // received position
+            lastDequedMessageId_ == MessageId::earliest() &&
+            // If the start message id is latest, we should seek to the actual 
last message first.
+            (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest() ||
+             // If there is a previous seek operation by timestamp, the start 
message id will be incorrect, so
+             // we cannot compare the start position with the last position.
+             hasSoughtByTimestamp());
+    }
+    if (compareMarkDeletePosition) {
         auto self = get_shared_this_ptr();
         getLastMessageIdAsync([self, callback](Result result, const 
GetLastMessageIdResponse& response) {
             if (result != ResultOk) {
@@ -1600,8 +1610,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
                     callback(ResultOk, false);
                 }
             };
-            if (self->config_.isStartMessageIdInclusive() &&
-                !self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+            if (self->config_.isStartMessageIdInclusive() && 
!self->hasSoughtByTimestamp()) {
                 self->seekAsync(response.getLastMessageId(), [callback, 
handleResponse](Result result) {
                     if (result != ResultOk) {
                         callback(result, {});
@@ -1766,7 +1775,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
                     // It's during reconnection, complete the seek future 
after connection is established
                     seekStatus_ = SeekStatus::COMPLETED;
                 } else {
-                    if 
(!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+                    if (!hasSoughtByTimestamp()) {
                         startMessageId_ = seekMessageId_.get();
                     }
                     seekCallback_.release()(result);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index ce0c415..055b487 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -266,6 +266,7 @@ class ConsumerImpl : public ConsumerImplBase {
     Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
     std::atomic<bool> hasSoughtByTimestamp_{false};
 
+    bool hasSoughtByTimestamp() const { return 
hasSoughtByTimestamp_.load(std::memory_order_acquire); }
     bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
 
     class ChunkedMessageCtx {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index fb0db03..0371bac 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -885,6 +885,16 @@ TEST_F(ReaderSeekTest, 
testHasMessageAvailableAfterSeekTimestamp) {
         createReader(reader, msgId);
         ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
         EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+
+        bool hasMessageAvailable;
+        while (true) {
+            ASSERT_EQ(ResultOk, 
reader.hasMessageAvailable(hasMessageAvailable));
+            if (!hasMessageAvailable) {
+                break;
+            }
+            Message msg;
+            ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+        }
     }
 
     // Test `hasMessageAvailableAsync` will complete immediately if the 
incoming message queue is non-empty

Reply via email to