This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c0eef81e6a5c7b0f409b501a677dec58336d2a41 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 6 23:21:24 2022 +0800 [fix][C++ client] Fix the close of Client might stuck or return a wrong result (#16285) Fixes https://github.com/apache/pulsar/issues/15976 ### Motivation Currently even if the producer, consumer, or reader failed to create, it would still be added to the producers or consumers in `Client`. `Client::close` first closes the internal producers and consumers, if the producers or consumers to close include failed producers or consumers, `Client::close` would return `ResultAlreadyClosed`. Even worse, closing a failed partitioned producer might stuck. It also makes the Python test `test_listener_name_client` flaky because `client.close()` will throw an exception if the underlying `Client::close` call in C++ client doesn't return `ResultOk`. ### Modifications - Only adding the created producer or consumer to the internal list of `Client` after the creation succeeded. - Add `ClientTest.testWrongListener` to verify when producer, consumer, reader failed to create, the internal producer list and consumer list are both empty. And `client.close()` would return `ResultOk`. (cherry picked from commit e23d312c04da1d82d35f9e2faf8a446f8e8a4eeb) --- pulsar-client-cpp/lib/ClientImpl.cc | 41 ++++++++++++++++++++--------------- pulsar-client-cpp/lib/ReaderImpl.cc | 21 ++++++++++-------- pulsar-client-cpp/lib/ReaderImpl.h | 4 +--- pulsar-client-cpp/tests/ClientTest.cc | 32 +++++++++++++++++++++++++++ 4 files changed, 69 insertions(+), 29 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 32c4a4d067f..d2955cf223f 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -174,9 +174,6 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul producer->getProducerCreatedFuture().addListener( std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, producer)); - Lock lock(mutex_); - producers_.push_back(producer); - lock.unlock(); producer->start(); } else { LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " @@ -187,7 +184,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, CreateProducerCallback callback, ProducerImplBasePtr producer) { - callback(result, Producer(producer)); + if (result == ResultOk) { + Lock lock(mutex_); + producers_.push_back(producer); + lock.unlock(); + callback(result, Producer(producer)); + } else { + callback(result, {}); + } } void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, @@ -230,10 +234,13 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf, getListenerExecutorProvider()->get(), callback); - reader->start(startMessageId); - - Lock lock(mutex_); - consumers_.push_back(reader->getConsumer()); + ConsumerImplBasePtr consumer = reader->getConsumer().lock(); + auto self = shared_from_this(); + reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) { + Lock lock(mutex_); + consumers_.push_back(weakConsumerPtr); + lock.unlock(); + }); } void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, @@ -280,9 +287,6 @@ void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const Nam consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, consumer)); - Lock lock(mutex_); - consumers_.push_back(consumer); - lock.unlock(); consumer->start(); } else { LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result); @@ -306,6 +310,7 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st return; } } + lock.unlock(); if (topicNamePtr) { std::string randomName = generateRandomName(); @@ -320,8 +325,6 @@ void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const st consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, consumer)); - consumers_.push_back(consumer); - lock.unlock(); consumer->start(); } @@ -378,9 +381,6 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr consumer->getConsumerCreatedFuture().addListener( std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, consumer)); - Lock lock(mutex_); - consumers_.push_back(consumer); - lock.unlock(); consumer->start(); } else { LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() @@ -391,7 +391,14 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, SubscribeCallback callback, ConsumerImplBasePtr consumer) { - callback(result, Consumer(consumer)); + if (result == ResultOk) { + Lock lock(mutex_); + consumers_.push_back(consumer); + lock.unlock(); + callback(result, Consumer(consumer)); + } else { + callback(result, {}); + } } Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) { diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index 9401c120119..c660c01ab2a 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -35,7 +35,8 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback) : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {} -void ReaderImpl::start(const MessageId& startMessageId) { +void ReaderImpl::start(const MessageId& startMessageId, + std::function<void(const ConsumerImplBaseWeakPtr&)> callback) { ConsumerConfiguration consumerConf; consumerConf.setConsumerType(ConsumerExclusive); consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize()); @@ -79,19 +80,21 @@ void ReaderImpl::start(const MessageId& startMessageId) { client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId)); consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_)); - consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated, - shared_from_this(), std::placeholders::_1, - std::placeholders::_2)); + auto self = shared_from_this(); + consumer_->getConsumerCreatedFuture().addListener( + [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) { + if (result == ResultOk) { + readerCreatedCallback_(result, Reader(self)); + callback(weakConsumerPtr); + } else { + readerCreatedCallback_(result, {}); + } + }); consumer_->start(); } const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); } -void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) { - auto self = shared_from_this(); - readerCreatedCallback_(result, Reader(self)); -} - Result ReaderImpl::readNext(Message& msg) { Result res = consumer_->receive(msg); acknowledgeIfNecessary(res, msg); diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h index 6de6c02e460..b0d8a6bc40a 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.h +++ b/pulsar-client-cpp/lib/ReaderImpl.h @@ -42,7 +42,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback); - void start(const MessageId& startMessageId); + void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback); const std::string& getTopic() const; @@ -65,8 +65,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> bool isConnected() const; private: - void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer); - void messageListener(Consumer consumer, const Message& msg); void acknowledgeIfNecessary(Result result, const Message& msg); diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 364e170f896..dd073fb6bfc 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -223,3 +223,35 @@ TEST(ClientTest, testReferenceCount) { ASSERT_EQ(readerWeakPtr.use_count(), 0); client.close(); } + +TEST(ClientTest, testWrongListener) { + const std::string topic = "client-test-wrong-listener-" + std::to_string(time(nullptr)); + auto httpCode = makePutRequest( + "http://localhost:8080/admin/v2/persistent/public/default/" + topic + "/partitions", "3"); + LOG_INFO("create " << topic << ": " << httpCode); + + Client client(lookupUrl, ClientConfiguration().setListenerName("test")); + Producer producer; + ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer)); + ASSERT_EQ(ResultProducerNotInitialized, producer.close()); + + Consumer consumer; + ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer)); + ASSERT_EQ(ResultConsumerNotInitialized, consumer.close()); + + ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); + ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); + ASSERT_EQ(ResultOk, client.close()); + + // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the + // creation of Reader would fail with ResultConnectError. + client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); + + // Currently Reader can only read a non-partitioned topic in C++ client + Reader reader; + ASSERT_EQ(ResultServiceUnitNotReady, + client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader)); + ASSERT_EQ(ResultConsumerNotInitialized, reader.close()); + ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); + ASSERT_EQ(ResultOk, client.close()); +}
