This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3b8ebf879c4 [fix][cpp] Fix issue where unexpected ack timeout occurred
(#17503)
3b8ebf879c4 is described below
commit 3b8ebf879c418ef8605531c0764005691045d207
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Sep 15 13:12:43 2022 +0900
[fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
(cherry picked from commit a98f025a35935a7a27db3f0919aa6f4453e4fb02)
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 17 ++++--
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 +
pulsar-client-cpp/tests/ConsumerTest.cc | 71 ++++++++++++++++++++++++
3 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c35d84b6db6..c78c12eaec0 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -451,8 +451,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer
consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
- listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
+
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultOk,
msg, callback));
} else {
if (messages_.full()) {
lock.unlock();
@@ -469,7 +469,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer
consumer, const Message&
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);
-
+ unAckedMessageTrackerPtr_->add(m.getMessageId());
try {
messageListener_(Consumer(shared_from_this()), m);
} catch (const std::exception& e) {
@@ -535,11 +535,20 @@ void
MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
- listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed,
msg));
+
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(),
ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result,
Message& msg,
+ const
ReceiveCallback& callback) {
+ if (result == ResultOk) {
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ }
+ callback(result, msg);
+}
+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId,
ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 95c24f68c5b..8769d59b990 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
+ void notifyPendingReceivedCallback(Result result, Message& message, const
ReceiveCallback& callback);
void handleOneTopicSubscribed(Result result, Consumer consumer, const
std::string& topic,
std::shared_ptr<std::atomic<int>>
topicsNeedCreate);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index b1fc11cec8f..e672da33c09 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -475,6 +475,77 @@ TEST(ConsumerTest,
testPartitionedConsumerUnAckedMessageRedelivery) {
client.close();
}
+TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
+ ClientConfiguration clientConfig;
+ clientConfig.setMessageListenerThreads(1);
+ Client client(lookupUrl, clientConfig);
+
+ const std::string partitionedTopic =
+ "testPartitionedConsumerUnexpectedAckTimeout" +
std::to_string(time(nullptr));
+ std::string subName = "sub";
+ constexpr int numPartitions = 2;
+ constexpr int numOfMessages = 3;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+ pulsar::Latch latch(numOfMessages);
+ std::vector<Message> messages;
+ std::mutex mtx;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setConsumerType(ConsumerShared);
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
+ // acknowledge received messages immediately, so no ack timeout is
expected
+ ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
+ ASSERT_EQ(0, msg.getRedeliveryCount());
+
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ messages.emplace_back(msg);
+ }
+
+ if (latch.getCount() > 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(unAckedMessagesTimeoutMs +
tickDurationInMs * 2));
+ latch.countdown();
+ }
+ });
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName,
consumerConfig, consumer));
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setBlockIfQueueFull(true);
+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic,
producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+ producer.close();
+
+ bool wasUnblocked = latch.wait(
+ std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs
* 2) * numOfMessages + 5000));
+ ASSERT_TRUE(wasUnblocked);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+ // messages are expected not to be redelivered
+ ASSERT_EQ(numOfMessages, messages.size());
+
+ consumer.close();
+ client.close();
+}
+
TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
Client client(lookupUrl);
const std::string nonPartitionedTopic =