This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.7
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 95b5e71f72c2c7fcb4aa19e192dbf84da8053054
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jun 11 05:48:50 2025 +0800

    Avoid getLastMessageId RPC when calling hasMessageAvailable after seek by 
timestamp (#491)
    
    (cherry picked from commit 15e0b00d1115c05832f84491bd0a0f92560ee918)
---
 lib/ConsumerImpl.cc |  5 +++++
 tests/ReaderTest.cc | 30 +++++++++++++++++++++++-------
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index f429bfb..595183e 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1561,6 +1561,10 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
 
 void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback 
callback) {
+    if (!incomingMessages_.empty()) {
+        callback(ResultOk, true);
+        return;
+    }
     bool compareMarkDeletePosition;
     {
         std::lock_guard<std::mutex> lock{mutexForMessageId_};
@@ -1726,6 +1730,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
SharedBuffer seek, const Se
         hasSoughtByTimestamp_.store(true, std::memory_order_release);
     } else {
         seekMessageId_ = *boost::get<MessageId>(&seekArg);
+        hasSoughtByTimestamp_.store(false, std::memory_order_release);
     }
     seekStatus_ = SeekStatus::IN_PROGRESS;
     seekCallback_ = std::move(callback);
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index ad81949..3e9dd39 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -21,7 +21,9 @@
 #include <pulsar/Reader.h>
 #include <time.h>
 
+#include <future>
 #include <string>
+#include <thread>
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
@@ -850,7 +852,7 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekToEnd) {
     ASSERT_FALSE(hasMessageAvailable);
 }
 
-TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
+TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
     using namespace std::chrono;
     const auto topic = "test-has-message-available-after-seek-timestamp-" + 
std::to_string(time(nullptr));
     Producer producer;
@@ -862,12 +864,10 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekTimestamp) {
 
     auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) 
{
         ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
-        if (GetParam()) {
-            if (msgId == MessageId::earliest()) {
-                EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
-            } else {
-                EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
-            }
+        if (msgId == MessageId::earliest()) {
+            EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+        } else {
+            EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
         }
     };
 
@@ -886,6 +886,22 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekTimestamp) {
         ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
         EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
     }
+
+    // Test `hasMessageAvailableAsync` will complete immediately if the 
incoming message queue is non-empty
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, 
reader));
+    reader.seek(timestampBeforeSend);
+    std::promise<std::thread::id> threadIdPromise;
+
+    waitUntil(seconds(3),
+              [&reader] { return 
PulsarFriend::getConsumer(reader)->getNumOfPrefetchedMessages() > 0; });
+    reader.hasMessageAvailableAsync([&threadIdPromise](Result result, bool 
hasMessageAvailable) {
+        ASSERT_EQ(ResultOk, result);
+        ASSERT_TRUE(hasMessageAvailable);
+        threadIdPromise.set_value(std::this_thread::get_id());
+    });
+    auto threadId = threadIdPromise.get_future().get();
+    ASSERT_EQ(threadId, std::this_thread::get_id());
 }
 
 TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {

Reply via email to