This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ee1d7b9  Fix hasMessageAvailable might return true after seeking to 
latest (#409)
ee1d7b9 is described below

commit ee1d7b9c5b1eed6fe0a279203648a06c30a3feb0
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Mar 11 19:09:18 2024 +0800

    Fix hasMessageAvailable might return true after seeking to latest (#409)
    
    * ### Motivation
    
    After a seek operation is done, the `startMessageId` will be updated
    until the reconnection due to the seek is done in `connectionOpened`.
    So before it's updated, `hasMessageAvailable` could compare with an
    outdated `startMessageId` and return a wrong value.
    
    ### Modifications
    
    Replace `duringSeek` with a `SeekStatus` field:
    - `NOT_STARTED`: initial, or a seek operation is done. `seek` could only 
succeed in this status.
    - `IN_PROGRESS`: A seek operation has started but the client does not 
receive the response from broker.
    - `COMPLETED`: The client has received the seek response but the seek 
future is not done.
    
    After the status becomes `COMPLETED`, if the connection is not ready,
    next time the connection is established, the status will change from
    `COMPLETED` to `NOT_STARTED` and then seek future will be completed
    in the internal executor.
    
    Add `testHasMessageAvailableAfterSeekToEnd` and `testSeekInProgress`.
---
 lib/ConsumerImpl.cc | 87 ++++++++++++++++++++++++++++++-----------------------
 lib/ConsumerImpl.h  | 31 +++++++++++++++++--
 lib/Synchronized.h  |  5 +++
 tests/ReaderTest.cc | 55 +++++++++++++++++++++++++++++++++
 4 files changed, 139 insertions(+), 39 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index fa33e28..ebc8518 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -236,16 +236,14 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     // sending the subscribe request.
     cnx->registerConsumer(consumerId_, get_shared_this_ptr());
 
-    if (duringSeek_) {
+    if (duringSeek()) {
         ackGroupingTrackerPtr_->flushAndClean();
     }
 
     Lock lockForMessageId(mutexForMessageId_);
-    // Update startMessageId so that we can discard messages after delivery 
restarts
-    const auto startMessageId = clearReceiveQueue();
+    clearReceiveQueue();
     const auto subscribeMessageId =
-        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId : boost::none;
-    startMessageId_ = startMessageId;
+        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId_.get() : boost::none;
     lockForMessageId.unlock();
 
     unAckedMessageTrackerPtr_->clear();
@@ -1048,14 +1046,21 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  * Clear the internal receiver queue and returns the message id of what was 
the 1st message in the queue that
  * was
  * not seen by the application
+ * `startMessageId_` is updated so that we can discard messages after delivery 
restarts.
  */
-boost::optional<MessageId> ConsumerImpl::clearReceiveQueue() {
-    bool expectedDuringSeek = true;
-    if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
-        return seekMessageId_.get();
+void ConsumerImpl::clearReceiveQueue() {
+    if (duringSeek()) {
+        startMessageId_ = seekMessageId_.get();
+        SeekStatus expected = SeekStatus::COMPLETED;
+        if (seekStatus_.compare_exchange_strong(expected, 
SeekStatus::NOT_STARTED)) {
+            auto seekCallback = seekCallback_.release();
+            executor_->postWork([seekCallback] { seekCallback(ResultOk); });
+        }
+        return;
     } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
-        return startMessageId_.get();
+        return;
     }
+
     Message nextMessageInQueue;
     if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
         // There was at least one message pending in the queue
@@ -1071,16 +1076,12 @@ boost::optional<MessageId> 
ConsumerImpl::clearReceiveQueue() {
                                            .ledgerId(nextMessageId.ledgerId())
                                            .entryId(nextMessageId.entryId() - 
1)
                                            .build();
-        return previousMessageId;
+        startMessageId_ = previousMessageId;
     } else if (lastDequedMessageId_ != MessageId::earliest()) {
         // If the queue was empty we need to restart from the message just 
after the last one that has been
         // dequeued
         // in the past
-        return lastDequedMessageId_;
-    } else {
-        // No message was received or dequeued by this consumer. Next message 
would still be the
-        // startMessageId
-        return startMessageId_.get();
+        startMessageId_ = lastDequedMessageId_;
     }
 }
 
@@ -1500,18 +1501,15 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
 
-inline bool hasMoreMessages(const MessageId& lastMessageIdInBroker, const 
MessageId& messageId) {
-    return lastMessageIdInBroker > messageId && 
lastMessageIdInBroker.entryId() != -1;
-}
-
 void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback 
callback) {
-    const auto startMessageId = startMessageId_.get();
-    Lock lock(mutexForMessageId_);
-    const auto messageId =
-        (lastDequedMessageId_ == MessageId::earliest()) ? 
startMessageId.value() : lastDequedMessageId_;
-
-    if (messageId == MessageId::latest()) {
-        lock.unlock();
+    bool compareMarkDeletePosition;
+    {
+        std::lock_guard<std::mutex> lock{mutexForMessageId_};
+        compareMarkDeletePosition =
+            (lastDequedMessageId_ == MessageId::earliest()) &&
+            (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest());
+    }
+    if (compareMarkDeletePosition) {
         auto self = get_shared_this_ptr();
         getLastMessageIdAsync([self, callback](Result result, const 
GetLastMessageIdResponse& response) {
             if (result != ResultOk) {
@@ -1543,16 +1541,15 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
             }
         });
     } else {
-        if (hasMoreMessages(lastMessageIdInBroker_, messageId)) {
-            lock.unlock();
+        if (hasMoreMessages()) {
             callback(ResultOk, true);
             return;
         }
-        lock.unlock();
-
-        getLastMessageIdAsync([callback, messageId](Result result, const 
GetLastMessageIdResponse& response) {
-            callback(result, (result == ResultOk) && 
hasMoreMessages(response.getLastMessageId(), messageId));
-        });
+        auto self = get_shared_this_ptr();
+        getLastMessageIdAsync(
+            [this, self, callback](Result result, const 
GetLastMessageIdResponse& response) {
+                callback(result, (result == ResultOk) && hasMoreMessages());
+            });
     }
 }
 
@@ -1656,9 +1653,18 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
SharedBuffer seek, const Me
         return;
     }
 
+    auto expected = SeekStatus::NOT_STARTED;
+    if (!seekStatus_.compare_exchange_strong(expected, 
SeekStatus::IN_PROGRESS)) {
+        LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << 
timestamp << " when the status is "
+                            << static_cast<int>(expected));
+        callback(ResultNotAllowedError);
+        return;
+    }
+
     const auto originalSeekMessageId = seekMessageId_.get();
     seekMessageId_ = seekId;
-    duringSeek_ = true;
+    seekStatus_ = SeekStatus::IN_PROGRESS;
+    seekCallback_ = std::move(callback);
     if (timestamp > 0) {
         LOG_INFO(getName() << " Seeking subscription to " << timestamp);
     } else {
@@ -1682,12 +1688,19 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
SharedBuffer seek, const Me
                 Lock lock(mutexForMessageId_);
                 lastDequedMessageId_ = MessageId::earliest();
                 lock.unlock();
+                if (getCnx().expired()) {
+                    // It's during reconnection, complete the seek future 
after connection is established
+                    seekStatus_ = SeekStatus::COMPLETED;
+                } else {
+                    startMessageId_ = seekMessageId_.get();
+                    seekCallback_.release()(result);
+                }
             } else {
                 LOG_ERROR(getName() << "Failed to seek: " << result);
                 seekMessageId_ = originalSeekMessageId;
-                duringSeek_ = false;
+                seekStatus_ = SeekStatus::NOT_STARTED;
+                seekCallback_.release()(result);
             }
-            callback(result);
         });
 }
 
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 524acb8..82e323b 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -75,6 +75,13 @@ const static std::string SYSTEM_PROPERTY_REAL_TOPIC = 
"REAL_TOPIC";
 const static std::string PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
 const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
 
+enum class SeekStatus : std::uint8_t
+{
+    NOT_STARTED,
+    IN_PROGRESS,
+    COMPLETED
+};
+
 class ConsumerImpl : public ConsumerImplBase {
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const 
std::string& subscriptionName,
@@ -193,7 +200,7 @@ class ConsumerImpl : public ConsumerImplBase {
                                        const DeadlineTimerPtr& timer,
                                        BrokerGetLastMessageIdCallback 
callback);
 
-    boost::optional<MessageId> clearReceiveQueue();
+    void clearReceiveQueue();
     void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& 
seekId, long timestamp,
                            ResultCallback callback);
     void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack 
cb);
@@ -239,10 +246,13 @@ class ConsumerImpl : public ConsumerImplBase {
     MessageId lastDequedMessageId_{MessageId::earliest()};
     MessageId lastMessageIdInBroker_{MessageId::earliest()};
 
-    std::atomic_bool duringSeek_{false};
+    std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
+    Synchronized<ResultCallback> seekCallback_{[](Result) {}};
     Synchronized<boost::optional<MessageId>> startMessageId_;
     Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
 
+    bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
+
     class ChunkedMessageCtx {
        public:
         ChunkedMessageCtx() : totalChunks_(0) {}
@@ -332,6 +342,23 @@ class ConsumerImpl : public ConsumerImplBase {
                                                       const 
proto::MessageIdData& messageIdData,
                                                       const 
ClientConnectionPtr& cnx, MessageId& messageId);
 
+    bool hasMoreMessages() const {
+        std::lock_guard<std::mutex> lock{mutexForMessageId_};
+        if (lastMessageIdInBroker_.entryId() == -1L) {
+            return false;
+        }
+
+        const auto inclusive = config_.isStartMessageIdInclusive();
+        if (lastDequedMessageId_ == MessageId::earliest()) {
+            // If startMessageId_ is none, use latest so that this method will 
return false
+            const auto startMessageId = 
startMessageId_.get().value_or(MessageId::latest());
+            return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
+                             : (lastMessageIdInBroker_ > startMessageId);
+        } else {
+            return lastMessageIdInBroker_ > lastDequedMessageId_;
+        }
+    }
+
     friend class PulsarFriend;
     friend class MultiTopicsConsumerImpl;
 
diff --git a/lib/Synchronized.h b/lib/Synchronized.h
index a98c08d..5449a9f 100644
--- a/lib/Synchronized.h
+++ b/lib/Synchronized.h
@@ -30,6 +30,11 @@ class Synchronized {
         return value_;
     }
 
+    T&& release() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return std::move(value_);
+    }
+
     Synchronized& operator=(const T& value) {
         std::lock_guard<std::mutex> lock(mutex_);
         value_ = value;
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 723972d..9d2fe5f 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -752,6 +752,8 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
     producer.close();
 }
 
+class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
+
 TEST(ReaderSeekTest, testStartAtLatestMessageId) {
     Client client(serviceUrl);
 
@@ -784,4 +786,57 @@ TEST(ReaderSeekTest, testStartAtLatestMessageId) {
     producer.close();
 }
 
+TEST(ReaderTest, testSeekInProgress) {
+    Client client(serviceUrl);
+    const auto topic = "test-seek-in-progress-" + 
std::to_string(time(nullptr));
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, 
reader));
+
+    reader.seekAsync(MessageId::earliest(), [](Result) {});
+    Promise<Result, Result> promise;
+    reader.seekAsync(MessageId::earliest(), [promise](Result result) { 
promise.setValue(result); });
+    Result result;
+    promise.getFuture().get(result);
+    ASSERT_EQ(result, ResultNotAllowedError);
+    client.close();
+}
+
+TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
+    Client client(serviceUrl);
+    const auto topic = "test-has-message-available-after-seek-to-end-" + 
std::to_string(time(nullptr));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, 
reader));
+
+    producer.send(MessageBuilder().setContent("msg-0").build());
+    producer.send(MessageBuilder().setContent("msg-1").build());
+
+    bool hasMessageAvailable;
+    if (GetParam()) {
+        // Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been 
initialized
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    }
+
+    ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    ASSERT_FALSE(hasMessageAvailable);
+
+    producer.send(MessageBuilder().setContent("msg-2").build());
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    ASSERT_TRUE(hasMessageAvailable);
+
+    Message msg;
+    ASSERT_EQ(ResultOk, reader.readNext(msg, 1000));
+    ASSERT_EQ("msg-2", msg.getDataAsString());
+
+    // Test the 2nd seek
+    ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
+    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    ASSERT_FALSE(hasMessageAvailable);
+
+    client.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, 
false));

Reply via email to