This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a1e2b4a Fix multi-topics consumer could receive old messages after
seek (#388)
a1e2b4a is described below
commit a1e2b4a2be5bcf9424768da7d2161a42e16f57bf
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Feb 2 19:36:47 2024 +0800
Fix multi-topics consumer could receive old messages after seek (#388)
### Motivation
See https://github.com/apache/pulsar/pull/21945
### Modifications
In C++ client, the multi-topics consumer receives messages by
configuring internal consumers with a message listener that adds
messages to `incomingMessages_`. So this patch pauses the listeners
before seek and resumes them after seek.
Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
---
lib/ConsumerImpl.h | 2 -
lib/MultiResultCallback.h | 51 -------------------
lib/MultiTopicsConsumerImpl.cc | 38 ++++++++++++--
lib/MultiTopicsConsumerImpl.h | 1 +
tests/MultiTopicsConsumerTest.cc | 105 +++++++++++++++++++++++++++++++++++++++
tests/ThreadSafeMessages.h | 75 ++++++++++++++++++++++++++++
6 files changed, 215 insertions(+), 57 deletions(-)
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 5612091..524acb8 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -333,8 +333,6 @@ class ConsumerImpl : public ConsumerImplBase {
const
ClientConnectionPtr& cnx, MessageId& messageId);
friend class PulsarFriend;
-
- // these two declared friend to access
setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
FRIEND_TEST(ConsumerTest, testRedeliveryOfDecryptionFailedMessages);
diff --git a/lib/MultiResultCallback.h b/lib/MultiResultCallback.h
deleted file mode 100644
index 739bc4a..0000000
--- a/lib/MultiResultCallback.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <pulsar/ConsumerConfiguration.h> // for ResultCallback
-
-#include <atomic>
-#include <memory>
-
-namespace pulsar {
-
-class MultiResultCallback {
- public:
- MultiResultCallback(ResultCallback callback, int numToComplete)
- : callback_(callback),
- numToComplete_(numToComplete),
- numCompletedPtr_(std::make_shared<std::atomic_int>(0)) {}
-
- void operator()(Result result) {
- if (result == ResultOk) {
- if (++(*numCompletedPtr_) == numToComplete_) {
- callback_(result);
- }
- } else {
- callback_(result);
- }
- }
-
- private:
- ResultCallback callback_;
- const int numToComplete_;
- const std::shared_ptr<std::atomic_int> numCompletedPtr_;
-};
-
-} // namespace pulsar
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index af70623..a0854cf 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -28,7 +28,6 @@
#include "LookupService.h"
#include "MessageImpl.h"
#include "MessagesImpl.h"
-#include "MultiResultCallback.h"
#include "MultiTopicsBrokerConsumerStatsImpl.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
@@ -521,6 +520,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback
originalCallback) {
}
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const
Message& msg) {
+ if (PULSAR_UNLIKELY(duringSeek_.load(std::memory_order_acquire))) {
+ return;
+ }
LOG_DEBUG("Received Message from one of the topic - " <<
consumer.getTopic()
<< " message:" <<
msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
@@ -907,9 +909,37 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t
timestamp, ResultCallback callb
return;
}
- MultiResultCallback multiResultCallback(callback, consumers_.size());
- consumers_.forEachValue([×tamp, &multiResultCallback](ConsumerImplPtr
consumer) {
- consumer->seekAsync(timestamp, multiResultCallback);
+ duringSeek_.store(true, std::memory_order_release);
+ consumers_.forEachValue([](const ConsumerImplPtr& consumer) {
consumer->pauseMessageListener(); });
+ unAckedMessageTrackerPtr_->clear();
+ incomingMessages_.clear();
+ incomingMessagesSize_ = 0L;
+
+ auto weakSelf = weak_from_this();
+ auto numConsumersLeft =
std::make_shared<std::atomic<int64_t>>(consumers_.size());
+ auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result
result) {
+ auto self = weakSelf.lock();
+ if (PULSAR_UNLIKELY(!self)) {
+ callback(result);
+ return;
+ }
+ if (result != ResultOk) {
+ *numConsumersLeft = 0; // skip the following callbacks
+ callback(result);
+ return;
+ }
+ if (--*numConsumersLeft > 0) {
+ return;
+ }
+ duringSeek_.store(false, std::memory_order_release);
+ listenerExecutor_->postWork([this, self] {
+ consumers_.forEachValue(
+ [](const ConsumerImplPtr& consumer) {
consumer->resumeMessageListener(); });
+ });
+ callback(ResultOk);
+ };
+ consumers_.forEachValue([timestamp, &wrappedCallback](const
ConsumerImplPtr& consumer) {
+ consumer->seekAsync(timestamp, wrappedCallback);
});
}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index c5834ea..9d71a04 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -131,6 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
const Commands::SubscriptionMode subscriptionMode_;
boost::optional<MessageId> startMessageId_;
ConsumerInterceptorsPtr interceptors_;
+ std::atomic_bool duringSeek_{false};
/* methods */
void handleSinglePartitionConsumerCreated(Result result,
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
new file mode 100644
index 0000000..5aae1eb
--- /dev/null
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <chrono>
+
+#include "ThreadSafeMessages.h"
+#include "lib/LogUtils.h"
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+extern std::string unique_str();
+
+TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
+ const std::string topicPrefix =
"multi-topics-consumer-seek-to-newer-position";
+ Client client{lookupUrl};
+ std::vector<std::string> topics{topicPrefix + unique_str(), topicPrefix +
unique_str()};
+ Producer producer1;
+ ASSERT_EQ(ResultOk, client.createProducer(topics[0], producer1));
+ Producer producer2;
+ ASSERT_EQ(ResultOk, client.createProducer(topics[1], producer2));
+ producer1.send(MessageBuilder().setContent("1-0").build());
+ producer2.send(MessageBuilder().setContent("2-0").build());
+ producer1.send(MessageBuilder().setContent("1-1").build());
+ producer2.send(MessageBuilder().setContent("2-1").build());
+
+ Consumer consumer;
+ ConsumerConfiguration conf;
+ conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
+ std::vector<int64_t> timestamps;
+ Message msg;
+ for (int i = 0; i < 4; i++) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ timestamps.emplace_back(msg.getPublishTimestamp());
+ }
+ std::sort(timestamps.begin(), timestamps.end());
+ const auto timestamp = timestamps[2];
+ consumer.close();
+
+ ThreadSafeMessages messages{2};
+
+ // Test synchronous receive after seek
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-2", conf, consumer));
+ consumer.seek(timestamp);
+ for (int i = 0; i < 2; i++) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ messages.add(msg);
+ }
+ ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1",
"2-1"}));
+ consumer.close();
+
+ // Test asynchronous receive after seek
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-3", conf, consumer));
+ messages.clear();
+ consumer.seek(timestamp);
+ for (int i = 0; i < 2; i++) {
+ consumer.receiveAsync([&messages](Result result, const Message& msg) {
+ if (result == ResultOk) {
+ messages.add(msg);
+ } else {
+ LOG_ERROR("Failed to receive: " << result);
+ }
+ });
+ }
+ ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+ ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1",
"2-1"}));
+ consumer.close();
+
+ // Test message listener
+ conf.setMessageListener([&messages](Consumer consumer, Message msg) {
messages.add(msg); });
+ messages.clear();
+ messages.setMinNumMsgs(4);
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub-4", conf, consumer));
+ ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+ ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-0",
"1-1", "2-0", "2-1"}));
+ messages.clear();
+ messages.setMinNumMsgs(2);
+ consumer.seek(timestamp);
+ ASSERT_TRUE(messages.wait(std::chrono::seconds(3)));
+ ASSERT_EQ(messages.getSortedValues(), (std::vector<std::string>{"1-1",
"2-1"}));
+
+ client.close();
+}
diff --git a/tests/ThreadSafeMessages.h b/tests/ThreadSafeMessages.h
new file mode 100644
index 0000000..f30dbf0
--- /dev/null
+++ b/tests/ThreadSafeMessages.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/Message.h>
+
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace pulsar {
+
+// When we receive messages in the message listener or the callback of
receiveAsync(), we need to verify the
+// received messages in the test thread. This class is a helper class for
thread-safe access to the messages.
+class ThreadSafeMessages {
+ public:
+ ThreadSafeMessages(size_t minNumMsgs) : minNumMsgs_(minNumMsgs) {}
+
+ template <typename Duration>
+ bool wait(Duration duration) {
+ std::unique_lock<std::mutex> lock{mutex_};
+ return cond_.wait_for(lock, duration, [this] { return msgs_.size() >=
minNumMsgs_; });
+ }
+
+ void add(const Message& msg) {
+ std::lock_guard<std::mutex> lock{mutex_};
+ msgs_.emplace_back(msg);
+ if (msgs_.size() >= minNumMsgs_) {
+ cond_.notify_all();
+ }
+ }
+
+ void clear() {
+ std::lock_guard<std::mutex> lock{mutex_};
+ msgs_.clear();
+ }
+
+ std::vector<std::string> getSortedValues() const {
+ std::unique_lock<std::mutex> lock{mutex_};
+ std::vector<std::string> values(msgs_.size());
+ std::transform(msgs_.cbegin(), msgs_.cend(), values.begin(),
+ [](const Message& msg) { return msg.getDataAsString();
});
+ lock.unlock();
+ std::sort(values.begin(), values.end());
+ return values;
+ }
+
+ void setMinNumMsgs(size_t minNumMsgs) noexcept { minNumMsgs_ = minNumMsgs;
}
+
+ private:
+ std::atomic_size_t minNumMsgs_;
+ std::vector<Message> msgs_;
+ mutable std::mutex mutex_;
+ mutable std::condition_variable cond_;
+};
+
+} // namespace pulsar