This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 673ed2ce70a Revert "[fix][client-c++] Close `messages_` when 
PartitionedConsumer is closed (#16887)"
673ed2ce70a is described below

commit 673ed2ce70a7936473390f174c3a2a7f204a0699
Author: Yunze Xu <[email protected]>
AuthorDate: Sat Aug 6 01:42:49 2022 +0800

    Revert "[fix][client-c++] Close `messages_` when PartitionedConsumer is 
closed (#16887)"
    
    This reverts commit 62a3590028988243372a0ab9f4bcb163bcd95135.
---
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  4 ---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc |  4 ---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  7 ----
 pulsar-client-cpp/tests/ConsumerTest.cc          | 43 ------------------------
 4 files changed, 58 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index ec145a247ae..1f4a6b5b988 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -701,10 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
         messageProcessed(msg);
         return ResultOk;
     } else {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 361a863d247..0ae86d5879a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -493,10 +493,6 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int 
timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
-        lock.lock();
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 48f1e4a398b..e43b5090e43 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -106,10 +106,6 @@ Result PartitionedConsumerImpl::receive(Message& msg, int 
timeout) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
-        lock.lock();
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
         return ResultTimeout;
     }
 }
@@ -432,9 +428,6 @@ void PartitionedConsumerImpl::messageReceived(Consumer 
consumer, const Message&
 
 void PartitionedConsumerImpl::failPendingReceiveCallback() {
     Message msg;
-
-    messages_.close();
-
     Lock lock(pendingReceiveMutex_);
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index 3d9fc291ff7..4680281478b 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -505,47 +505,4 @@ TEST(ConsumerTest, testIsConnected) {
     ASSERT_FALSE(consumer.isConnected());
 }
 
-TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
-    Client client(lookupUrl);
-    const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + 
std::to_string(time(nullptr));
-    constexpr int numPartitions = 2;
-
-    int res =
-        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + 
partitionedTopic + "/partitions",
-                       std::to_string(numPartitions));
-    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
-
-    Consumer consumer;
-    ConsumerConfiguration consumerConfig;
-    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", 
consumerConfig, consumer));
-
-    // send messages
-    ProducerConfiguration producerConfig;
-    Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, 
producerConfig, producer));
-    Message msg = MessageBuilder().setContent("message").build();
-    ASSERT_EQ(ResultOk, producer.send(msg));
-
-    producer.close();
-
-    // receive message on another thread
-    pulsar::Latch latch(1);
-    auto thread = std::thread([&]() {
-        Message msg;
-        ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000));
-        consumer.acknowledge(msg.getMessageId());
-        ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000));
-        latch.countdown();
-    });
-
-    std::this_thread::sleep_for(std::chrono::seconds(1));
-
-    consumer.close();
-
-    bool wasUnblocked = latch.wait(std::chrono::milliseconds(100));
-
-    ASSERT_TRUE(wasUnblocked);
-    thread.join();
-}
-
 }  // namespace pulsar

Reply via email to