This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a1e2b4a  Fix multi-topics consumer could receive old messages after 
seek  (#388)
a1e2b4a is described below

commit a1e2b4a2be5bcf9424768da7d2161a42e16f57bf
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Feb 2 19:36:47 2024 +0800

    Fix multi-topics consumer could receive old messages after seek  (#388)
    
    ### Motivation
    
    See https://github.com/apache/pulsar/pull/21945
    
    ### Modifications
    
    In C++ client, the multi-topics consumer receives messages by
    configuring internal consumers with a message listener that adds
    messages to `incomingMessages_`. So this patch pauses the listeners
    before seek and resumes them after seek.
    
    Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
---
 lib/ConsumerImpl.h               |   2 -
 lib/MultiResultCallback.h        |  51 -------------------
 lib/MultiTopicsConsumerImpl.cc   |  38 ++++++++++++--
 lib/MultiTopicsConsumerImpl.h    |   1 +
 tests/MultiTopicsConsumerTest.cc | 105 +++++++++++++++++++++++++++++++++++++++
 tests/ThreadSafeMessages.h       |  75 ++++++++++++++++++++++++++++
 6 files changed, 215 insertions(+), 57 deletions(-)

diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 5612091..524acb8 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -333,8 +333,6 @@ class ConsumerImpl : public ConsumerImplBase {
                                                       const 
ClientConnectionPtr& cnx, MessageId& messageId);
 
     friend class PulsarFriend;
-
-    // these two declared friend to access 
setNegativeAcknowledgeEnabledForTesting
     friend class MultiTopicsConsumerImpl;
 
     FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
diff --git a/lib/MultiResultCallback.h b/lib/MultiResultCallback.h
deleted file mode 100644
index 739bc4a..0000000
--- a/lib/MultiResultCallback.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <pulsar/ConsumerConfiguration.h>  // for ResultCallback
-
-#include <atomic>
-#include <memory>
-
-namespace pulsar {
-
-class MultiResultCallback {
-   public:
-    MultiResultCallback(ResultCallback callback, int numToComplete)
-        : callback_(callback),
-          numToComplete_(numToComplete),
-          numCompletedPtr_(std::make_shared<std::atomic_int>(0)) {}
-
-    void operator()(Result result) {
-        if (result == ResultOk) {
-            if (++(*numCompletedPtr_) == numToComplete_) {
-                callback_(result);
-            }
-        } else {
-            callback_(result);
-        }
-    }
-
-   private:
-    ResultCallback callback_;
-    const int numToComplete_;
-    const std::shared_ptr<std::atomic_int> numCompletedPtr_;
-};
-
-}  // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index af70623..a0854cf 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -28,7 +28,6 @@
 #include "LookupService.h"
 #include "MessageImpl.h"
 #include "MessagesImpl.h"
-#include "MultiResultCallback.h"
 #include "MultiTopicsBrokerConsumerStatsImpl.h"
 #include "TopicName.h"
 #include "UnAckedMessageTrackerDisabled.h"
@@ -521,6 +520,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
originalCallback) {
 }
 
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
+    if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
+        return;
+    }
     LOG_DEBUG("Received Message from one of the topic - " << 
consumer.getTopic()
                                                           << " message:" << 
msg.getDataAsString());
     msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
@@ -907,9 +909,37 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t 
timestamp, ResultCallback callb
         return;
     }
 
-    MultiResultCallback multiResultCallback(callback, consumers_.size());
-    consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr 
consumer) {
-        consumer->seekAsync(timestamp, multiResultCallback);
+    duringSeek_.store(true, std::memory_order_release);
+    consumers_.forEachValue([](const ConsumerImplPtr& consumer) { 
consumer->pauseMessageListener(); });
+    unAckedMessageTrackerPtr_->clear();
+    incomingMessages_.clear();
+    incomingMessagesSize_ = 0L;
+
+    auto weakSelf = weak_from_this();
+    auto numConsumersLeft = 
std::make_shared<std::atomic<int64_t>>(consumers_.size());
+    auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result 
result) {
+        auto self = weakSelf.lock();
+        if (PULSAR_UNLIKELY(!self)) {
+            callback(result);
+            return;
+        }
+        if (result != ResultOk) {
+            *numConsumersLeft = 0;  // skip the following callbacks
+            callback(result);
+            return;
+        }
+        if (--*numConsumersLeft > 0) {
+            return;
+        }
+        duringSeek_.store(false, std::memory_order_release);
+        listenerExecutor_->postWork([this, self] {
+            consumers_.forEachValue(
+                [](const ConsumerImplPtr& consumer) { 
consumer->resumeMessageListener(); });
+        });
+        callback(ResultOk);
+    };
+    consumers_.forEachValue([timestamp, &wrappedCallback](const 
ConsumerImplPtr& consumer) {
+        consumer->seekAsync(timestamp, wrappedCallback);
     });
 }
 
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index c5834ea..9d71a04 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -131,6 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     const Commands::SubscriptionMode subscriptionMode_;
     boost::optional<MessageId> startMessageId_;
     ConsumerInterceptorsPtr interceptors_;
+    std::atomic_bool duringSeek_{false};
 
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
new file mode 100644
index 0000000..5aae1eb
--- /dev/null
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <chrono>
+
+#include "ThreadSafeMessages.h"
+#include "lib/LogUtils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+extern std::string unique_str();
+
+TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
+    const std::string topicPrefix = 
"multi-topics-consumer-seek-to-newer-position";
+    Client client{lookupUrl};
+    std::vector<std::string> topics{topicPrefix + unique_str(), topicPrefix + 
unique_str()};
+    Producer producer1;
+    ASSERT_EQ(ResultOk, client.createProducer(topics[0], producer1));
+    Producer producer2;
+    ASSERT_EQ(ResultOk, client.createProducer(topics[1], producer2));
+    producer1.send(MessageBuilder().setContent("1-0").build());
+    producer2.send(MessageBuilder().setContent("2-0").build());
+    producer1.send(MessageBuilder().setContent("1-1").build());
+    producer2.send(MessageBuilder().setContent("2-1").build());
+
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
+    std::vector<int64_t> timestamps;
+    Message msg;
+    for (int i = 0; i < 4; i++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        timestamps.emplace_back(msg.getPublishTimestamp());
+    }
+    std::sort(timestamps.begin(), timestamps.end());
+    const auto timestamp = timestamps[2];
+    consumer.close();
+
+    ThreadSafeMessages messages{2};
+
+    // Test synchronous receive after seek
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-2", conf, consumer));
+    consumer.seek(timestamp);
+    for (int i = 0; i < 2; i++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        messages.add(msg);
+    }
+    ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", 
"2-1"}));
+    consumer.close();
+
+    // Test asynchronous receive after seek
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-3", conf, consumer));
+    messages.clear();
+    consumer.seek(timestamp);
+    for (int i = 0; i < 2; i++) {
+        consumer.receiveAsync([&messages](Result result, const Message& msg) {
+            if (result == ResultOk) {
+                messages.add(msg);
+            } else {
+                LOG_ERROR("Failed to receive: " << result);
+            }
+        });
+    }
+    ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+    ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", 
"2-1"}));
+    consumer.close();
+
+    // Test message listener
+    conf.setMessageListener([&messages](Consumer consumer, Message msg) { 
messages.add(msg); });
+    messages.clear();
+    messages.setMinNumMsgs(4);
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-4", conf, consumer));
+    ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+    ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-0", 
"1-1", "2-0", "2-1"}));
+    messages.clear();
+    messages.setMinNumMsgs(2);
+    consumer.seek(timestamp);
+    ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+    ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1", 
"2-1"}));
+
+    client.close();
+}
diff --git a/tests/ThreadSafeMessages.h b/tests/ThreadSafeMessages.h
new file mode 100644
index 0000000..f30dbf0
--- /dev/null
+++ b/tests/ThreadSafeMessages.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Message.h>
+
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace pulsar {
+
+// When we receive messages in the message listener or the callback of 
receiveAsync(), we need to verify the
+// received messages in the test thread. This class is a helper class for 
thread-safe access to the messages.
+class ThreadSafeMessages {
+   public:
+    ThreadSafeMessages(size_t minNumMsgs) : minNumMsgs_(minNumMsgs) {}
+
+    template <typename Duration>
+    bool wait(Duration duration) {
+        std::unique_lock<std::mutex> lock{mutex_};
+        return cond_.wait_for(lock, duration, [this] { return msgs_.size() >= 
minNumMsgs_; });
+    }
+
+    void add(const Message& msg) {
+        std::lock_guard<std::mutex> lock{mutex_};
+        msgs_.emplace_back(msg);
+        if (msgs_.size() >= minNumMsgs_) {
+            cond_.notify_all();
+        }
+    }
+
+    void clear() {
+        std::lock_guard<std::mutex> lock{mutex_};
+        msgs_.clear();
+    }
+
+    std::vector<std::string> getSortedValues() const {
+        std::unique_lock<std::mutex> lock{mutex_};
+        std::vector<std::string> values(msgs_.size());
+        std::transform(msgs_.cbegin(), msgs_.cend(), values.begin(),
+                       [](const Message& msg) { return msg.getDataAsString(); 
});
+        lock.unlock();
+        std::sort(values.begin(), values.end());
+        return values;
+    }
+
+    void setMinNumMsgs(size_t minNumMsgs) noexcept { minNumMsgs_ = minNumMsgs; 
}
+
+   private:
+    std::atomic_size_t minNumMsgs_;
+    std::vector<Message> msgs_;
+    mutable std::mutex mutex_;
+    mutable std::condition_variable cond_;
+};
+
+}  // namespace pulsar

Reply via email to