This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new b5cfde16720 [cherry-pick][branch-2.10] Fix issue where unexpected ack 
timeout (#18906)
b5cfde16720 is described below

commit b5cfde16720160f3adc7d19bbab35559be4b61d8
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 14 12:26:44 2022 +0800

    [cherry-pick][branch-2.10] Fix issue where unexpected ack timeout (#18906)
    
    ### Motivation
    Cherry-pick https://github.com/apache/pulsar/pull/17503 to release 2.10.3 
and run tests.
    
    ### Modifications
    
    Cherry-pick https://github.com/apache/pulsar/pull/17503 to release 2.10.3.
---
 .../broker/admin/impl/SchemasResourceBase.java     |  1 -
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 18 ++++--
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  1 +
 pulsar-client-cpp/tests/ConsumerTest.cc            | 71 ++++++++++++++++++++++
 4 files changed, 85 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
index f0d9e2bf241..b9533c6f291 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Clock;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.MediaType;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c35d84b6db6..18723684afe 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -451,15 +451,14 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer 
consumer, const Message&
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
+        
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+                                              shared_from_this(), ResultOk, 
msg, callback));
     } else {
         if (messages_.full()) {
             lock.unlock();
         }
         messages_.push(msg);
         if (messageListener_) {
-            unAckedMessageTrackerPtr_->add(msg.getMessageId());
             listenerExecutor_->postWork(
                 std::bind(&MultiTopicsConsumerImpl::internalListener, 
shared_from_this(), consumer));
         }
@@ -469,7 +468,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer 
consumer, const Message&
 void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
     Message m;
     messages_.pop(m);
-
+    unAckedMessageTrackerPtr_->add(m.getMessageId());
     try {
         messageListener_(Consumer(shared_from_this()), m);
     } catch (const std::exception& e) {
@@ -535,11 +534,20 @@ void 
MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, 
msg));
+        
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
+                                              shared_from_this(), 
ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, 
Message& msg,
+                                                            const 
ReceiveCallback& callback) {
+    if (result == ResultOk) {
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    }
+    callback(result, msg);
+}
+
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, 
ResultCallback callback) {
     if (state_ != Ready) {
         callback(ResultAlreadyClosed);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 95c24f68c5b..8769d59b990 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void internalListener(Consumer consumer);
     void receiveMessages();
     void failPendingReceiveCallback();
+    void notifyPendingReceivedCallback(Result result, Message& message, const 
ReceiveCallback& callback);
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const 
std::string& topic,
                                   std::shared_ptr<std::atomic<int>> 
topicsNeedCreate);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index b1fc11cec8f..e672da33c09 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -475,6 +475,77 @@ TEST(ConsumerTest, 
testPartitionedConsumerUnAckedMessageRedelivery) {
     client.close();
 }
 
+TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
+    ClientConfiguration clientConfig;
+    clientConfig.setMessageListenerThreads(1);
+    Client client(lookupUrl, clientConfig);
+
+    const std::string partitionedTopic =
+        "testPartitionedConsumerUnexpectedAckTimeout" + 
std::to_string(time(nullptr));
+    std::string subName = "sub";
+    constexpr int numPartitions = 2;
+    constexpr int numOfMessages = 3;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+    pulsar::Latch latch(numOfMessages);
+    std::vector<Message> messages;
+    std::mutex mtx;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + 
partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setConsumerType(ConsumerShared);
+    consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    consumerConfig.setTickDurationInMs(tickDurationInMs);
+    consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
+        // acknowledge received messages immediately, so no ack timeout is 
expected
+        ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
+        ASSERT_EQ(0, msg.getRedeliveryCount());
+
+        {
+            std::lock_guard<std::mutex> lock(mtx);
+            messages.emplace_back(msg);
+        }
+
+        if (latch.getCount() > 0) {
+            std::this_thread::sleep_for(
+                std::chrono::milliseconds(unAckedMessagesTimeoutMs + 
tickDurationInMs * 2));
+            latch.countdown();
+        }
+    });
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, 
consumerConfig, consumer));
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setBlockIfQueueFull(true);
+    
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, 
producerConfig, producer));
+    std::string prefix = "message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+    producer.close();
+
+    bool wasUnblocked = latch.wait(
+        std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs 
* 2) * numOfMessages + 5000));
+    ASSERT_TRUE(wasUnblocked);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
+    // messages are expected not to be redelivered
+    ASSERT_EQ(numOfMessages, messages.size());
+
+    consumer.close();
+    client.close();
+}
+
 TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
     Client client(lookupUrl);
     const std::string nonPartitionedTopic =

Reply via email to