This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2e761b8a0ddfc6a0097f85f2f326f41aefb43a44 Author: Cong Zhao <[email protected]> AuthorDate: Wed Aug 3 09:56:21 2022 +0800 [fix][client-c++] Close `messages_` when PartitionedConsumer is closed (#16887) (cherry picked from commit 7c8cd7bdefbe277af07768b5d2fdb01809bf9404) --- pulsar-client-cpp/lib/ConsumerImpl.cc | 4 +++ pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 +++ pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 7 ++++ pulsar-client-cpp/tests/ConsumerTest.cc | 43 ++++++++++++++++++++++++ 4 files changed, 58 insertions(+) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 84583d521cc..598d2c1ae19 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -721,6 +721,10 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { messageProcessed(msg); return ResultOk; } else { + Lock lock(mutex_); + if (state_ != Ready) { + return ResultAlreadyClosed; + } return ResultTimeout; } } diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 0ae86d5879a..361a863d247 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -493,6 +493,10 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; } else { + lock.lock(); + if (state_ != Ready) { + return ResultAlreadyClosed; + } return ResultTimeout; } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index e43b5090e43..48f1e4a398b 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -106,6 +106,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) { unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; } else { + lock.lock(); + if (state_ != Ready) { + return ResultAlreadyClosed; + } return ResultTimeout; } } @@ -428,6 +432,9 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& void PartitionedConsumerImpl::failPendingReceiveCallback() { Message msg; + + messages_.close(); + Lock lock(pendingReceiveMutex_); while (!pendingReceives_.empty()) { ReceiveCallback callback = pendingReceives_.front(); diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc index b61c15a8866..adb94a4ad13 100644 --- a/pulsar-client-cpp/tests/ConsumerTest.cc +++ b/pulsar-client-cpp/tests/ConsumerTest.cc @@ -720,4 +720,47 @@ TEST(ConsumerTest, testIsConnected) { ASSERT_FALSE(consumer.isConnected()); } +TEST(ConsumerTest, testPartitionsWithCloseUnblock) { + Client client(lookupUrl); + const std::string partitionedTopic = "testPartitionsWithCloseUnblock" + std::to_string(time(nullptr)); + constexpr int numPartitions = 2; + + int res = + makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", + std::to_string(numPartitions)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + Consumer consumer; + ConsumerConfiguration consumerConfig; + ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, "SubscriptionName", consumerConfig, consumer)); + + // send messages + ProducerConfiguration producerConfig; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer)); + Message msg = MessageBuilder().setContent("message").build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + + producer.close(); + + // receive message on another thread + pulsar::Latch latch(1); + auto thread = std::thread([&]() { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 10 * 1000)); + consumer.acknowledge(msg.getMessageId()); + ASSERT_EQ(ResultAlreadyClosed, consumer.receive(msg, 10 * 1000)); + latch.countdown(); + }); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + consumer.close(); + + bool wasUnblocked = latch.wait(std::chrono::milliseconds(100)); + + ASSERT_TRUE(wasUnblocked); + thread.join(); +} + } // namespace pulsar
