This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new b5cfde16720 [cherry-pick][branch-2.10] Fix issue where unexpected ack
timeout (#18906)
b5cfde16720 is described below
commit b5cfde16720160f3adc7d19bbab35559be4b61d8
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 14 12:26:44 2022 +0800
[cherry-pick][branch-2.10] Fix issue where unexpected ack timeout (#18906)
### Motivation
Cherry-pick https://github.com/apache/pulsar/pull/17503 to release 2.10.3
and run tests.
### Modifications
Cherry-pick https://github.com/apache/pulsar/pull/17503 to release 2.10.3.
---
.../broker/admin/impl/SchemasResourceBase.java | 1 -
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 18 ++++--
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 +
pulsar-client-cpp/tests/ConsumerTest.cc | 71 ++++++++++++++++++++++
4 files changed, 85 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index f0d9e2bf241..b9533c6f291 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c35d84b6db6..18723684afe 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -451,15 +451,14 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer
consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
- listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
+
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(), ResultOk,
msg, callback));
} else {
if (messages_.full()) {
lock.unlock();
}
messages_.push(msg);
if (messageListener_) {
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener,
shared_from_this(), consumer));
}
@@ -469,7 +468,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer
consumer, const Message&
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);
-
+ unAckedMessageTrackerPtr_->add(m.getMessageId());
try {
messageListener_(Consumer(shared_from_this()), m);
} catch (const std::exception& e) {
@@ -535,11 +534,20 @@ void
MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
- listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed,
msg));
+
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+ shared_from_this(),
ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result,
Message& msg,
+ const
ReceiveCallback& callback) {
+ if (result == ResultOk) {
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ }
+ callback(result, msg);
+}
+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId,
ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 95c24f68c5b..8769d59b990 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
+ void notifyPendingReceivedCallback(Result result, Message& message, const
ReceiveCallback& callback);
void handleOneTopicSubscribed(Result result, Consumer consumer, const
std::string& topic,
std::shared_ptr<std::atomic<int>>
topicsNeedCreate);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index b1fc11cec8f..e672da33c09 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -475,6 +475,77 @@ TEST(ConsumerTest,
testPartitionedConsumerUnAckedMessageRedelivery) {
client.close();
}
+TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
+ ClientConfiguration clientConfig;
+ clientConfig.setMessageListenerThreads(1);
+ Client client(lookupUrl, clientConfig);
+
+ const std::string partitionedTopic =
+ "testPartitionedConsumerUnexpectedAckTimeout" +
std::to_string(time(nullptr));
+ std::string subName = "sub";
+ constexpr int numPartitions = 2;
+ constexpr int numOfMessages = 3;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+ pulsar::Latch latch(numOfMessages);
+ std::vector<Message> messages;
+ std::mutex mtx;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setConsumerType(ConsumerShared);
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
+ // acknowledge received messages immediately, so no ack timeout is
expected
+ ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
+ ASSERT_EQ(0, msg.getRedeliveryCount());
+
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ messages.emplace_back(msg);
+ }
+
+ if (latch.getCount() > 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(unAckedMessagesTimeoutMs +
tickDurationInMs * 2));
+ latch.countdown();
+ }
+ });
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName,
consumerConfig, consumer));
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setBlockIfQueueFull(true);
+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic,
producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+ producer.close();
+
+ bool wasUnblocked = latch.wait(
+ std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs
* 2) * numOfMessages + 5000));
+ ASSERT_TRUE(wasUnblocked);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+ // messages are expected not to be redelivered
+ ASSERT_EQ(numOfMessages, messages.size());
+
+ consumer.close();
+ client.close();
+}
+
TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
Client client(lookupUrl);
const std::string nonPartitionedTopic =