This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c42239d72edbdb802767ddcc8cd119c002548f29 Author: Cong Zhao <[email protected]> AuthorDate: Sat Aug 13 10:36:51 2022 +0800 [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940) ### Motivation Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance. ### Modifications Use an atomic `state_` instead of the lock to improve performance. (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267) --- pulsar-client-cpp/lib/ConsumerImpl.cc | 75 +++++---------------- pulsar-client-cpp/lib/HandlerBase.cc | 11 ++-- pulsar-client-cpp/lib/HandlerBase.h | 2 +- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 71 ++++++-------------- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 +- pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 77 ++++++---------------- pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 3 +- .../lib/PatternMultiTopicsConsumerImpl.cc | 5 +- pulsar-client-cpp/lib/ProducerImpl.cc | 63 +++++++----------- 9 files changed, 91 insertions(+), 221 deletions(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index b5b5ceb046f..19ef3501d6c 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -160,10 +160,7 @@ void ConsumerImpl::start() { } void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - Lock lock(mutex_); - const auto state = state_; - lock.unlock(); - if (state == Closed) { + if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } @@ -202,7 +199,6 @@ void ConsumerImpl::connectionFailed(Result result) { ConsumerImplPtr ptr = shared_from_this(); if (consumerCreatedPromise_.setFailed(result)) { - Lock lock(mutex_); state_ = Failed; } } @@ -274,15 +270,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO(getName() << "Unsubscribing"); - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); callback(ResultAlreadyClosed); LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and " "then call unsubscribe"); return; } + Lock lock(mutex_); + ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_); @@ -303,7 +299,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Unsubscribed successfully"); } else { @@ -749,12 +744,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed - Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } - stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { @@ -772,12 +765,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { } Result ConsumerImpl::receiveHelper(Message& msg) { - { - Lock lock(mutex_); - if (state_ != Ready) { - return ResultAlreadyClosed; - } + if (state_ != Ready) { + return ResultAlreadyClosed; } + if (messageListener_) { LOG_ERROR(getName() << "Can not receive when a listener has been set"); return ResultInvalidConfiguration; @@ -804,11 +795,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { return ResultInvalidConfiguration; } - { - Lock lock(mutex_); - if (state_ != Ready) { - return ResultAlreadyClosed; - } + if (state_ != Ready) { + return ResultAlreadyClosed; } if (messageListener_) { @@ -983,13 +971,10 @@ void ConsumerImpl::disconnectConsumer() { } void ConsumerImpl::closeAsync(ResultCallback callback) { - Lock lock(mutex_); - // Keep a reference to ensure object is kept alive ConsumerImplPtr ptr = shared_from_this(); if (state_ != Ready) { - lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -1007,7 +992,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; - lock.unlock(); // If connection is gone, also the consumer is closed on the broker side if (callback) { callback(ResultOk); @@ -1018,7 +1002,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; - lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -1026,8 +1009,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { return; } - // Lock is no longer required - lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); @@ -1043,9 +1024,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1065,22 +1044,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI const std::string& ConsumerImpl::getName() const { return consumerStr_; } void ConsumerImpl::shutdown() { - Lock lock(mutex_); state_ = Closed; - lock.unlock(); consumerCreatedPromise_.setFailed(ResultAlreadyClosed); } -bool ConsumerImpl::isClosed() { - Lock lock(mutex_); - return state_ == Closed; -} +bool ConsumerImpl::isClosed() { return state_ == Closed; } -bool ConsumerImpl::isOpen() { - Lock lock(mutex_); - return state_ == Ready; -} +bool ConsumerImpl::isOpen() { return state_ == Ready; } Result ConsumerImpl::pauseMessageListener() { if (!messageListener_) { @@ -1143,14 +1114,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") - lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } + Lock lock(mutex_); if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; @@ -1210,16 +1180,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) { } void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } - lock.unlock(); this->ackGroupingTrackerPtr_->flushAndClean(); ClientConnectionPtr cnx = getCnx().lock(); @@ -1243,16 +1211,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1316,16 +1282,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed, MessageId()); } return; } - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1371,10 +1335,7 @@ void ConsumerImpl::trackMessage(const MessageId& messageId) { } } -bool ConsumerImpl::isConnected() const { - Lock lock(mutex_); - return !getCnx().expired() && state_ == Ready; -} +bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index d7025ad004b..5d2244f7552 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, HandlerBase::~HandlerBase() { timer_->cancel(); } void HandlerBase::start() { - Lock lock(mutex_); // guard against concurrent state changes such as closing - if (state_ == NotStarted) { - state_ = Pending; - lock.unlock(); - + State state = NotStarted; + if (state_.compare_exchange_strong(state, Pending)) { grabCnx(); } } @@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con return; } - Lock lock(handler->mutex_); State state = handler->state_; ClientConnectionPtr currentConnection = handler->connection_.lock(); @@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) { } void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { - if (handler->state_ == Pending || handler->state_ == Ready) { + const auto state = handler->state_.load(); + if (state == Pending || state == Ready) { TimeDuration delay = handler->backoff_.next(); LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index eeb8ebe1c5e..1184746da21 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -105,7 +105,7 @@ class HandlerBase { Failed }; - State state_; + std::atomic<State> state_; Backoff backoff_; uint64_t epoch_; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 0ae86d5879a..293ece4c838 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -56,7 +56,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std void MultiTopicsConsumerImpl::start() { if (topics_.empty()) { - if (compareAndSetState(Pending, Ready)) { + MultiTopicsConsumerState state = Pending; + if (state_.compare_exchange_strong(state, Ready)) { LOG_DEBUG("No topics passed in when create MultiTopicsConsumer."); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); return; @@ -84,14 +85,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c (*topicsNeedCreate)--; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); } LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); if (topicsNeedCreate->load() == 0) { - if (compareAndSetState(Pending, Ready)) { + MultiTopicsConsumerState state = Pending; + if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); } else { @@ -115,7 +117,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s return topicPromise->getFuture(); } - if (state_ == Closed || state_ == Closing) { + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR("MultiTopicsConsumer already closed when subscribe."); topicPromise->setFailed(ResultAlreadyClosed); return topicPromise->getFuture(); @@ -222,15 +225,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing"); - Lock lock(mutex_); - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_INFO(consumerStr_ << " already closed"); - lock.unlock(); callback(ResultAlreadyClosed); return; } state_ = Closing; - lock.unlock(); std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); auto self = shared_from_this(); @@ -254,7 +255,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, (*consumerUnsubed)++; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " subscription - " << subscriptionName_); } @@ -266,7 +267,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, unAckedMessageTrackerPtr_->clear(); Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; - setState(Closed); + state_ = Closed; callback(result1); return; } @@ -281,7 +282,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, return; } - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - " << subscriptionName_); callback(ResultAlreadyClosed); @@ -318,7 +320,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( (*consumerUnsubed)++; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " topicPartitionName - " << topicPartitionName); } @@ -350,7 +352,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( } void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_ERROR("TopicsConsumer already closed " << " topic" << topic_ << " consumer - " << consumerStr_); if (callback) { @@ -359,7 +362,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { return; } - setState(Closing); + state_ = Closing; auto self = shared_from_this(); int numConsumers = 0; @@ -373,7 +376,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { if (numConsumers == 0) { LOG_DEBUG("TopicsConsumer have no consumers to close " << " topic" << topic_ << " subscription - " << subscriptionName_); - setState(Closed); + state_ = Closed; if (callback) { callback(ResultAlreadyClosed); } @@ -395,7 +398,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri numberTopicPartitions_->fetch_sub(1); if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " << result); } @@ -457,18 +460,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { } Result MultiTopicsConsumerImpl::receive(Message& msg) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { - lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } - lock.unlock(); messages_.pop(msg); unAckedMessageTrackerPtr_->add(msg.getMessageId()); @@ -476,19 +475,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) { } Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { - lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } - lock.unlock(); if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; @@ -501,12 +496,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed - Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } - stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (messages_.pop(msg, std::chrono::milliseconds(0))) { @@ -571,30 +564,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; } const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } -void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) { - Lock lock(mutex_); - state_ = state; -} - -bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect, - MultiTopicsConsumerState update) { - Lock lock(mutex_); - if (state_ == expect) { - state_ = update; - return true; - } else { - return false; - } -} - void MultiTopicsConsumerImpl::shutdown() {} bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; } -bool MultiTopicsConsumerImpl::isOpen() { - Lock lock(mutex_); - return state_ == Ready; -} +bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; } void MultiTopicsConsumerImpl::receiveMessages() { const auto receiverQueueSize = conf_.getReceiverQueueSize(); @@ -644,12 +618,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } + Lock lock(mutex_); MultiTopicsBrokerConsumerStatsPtr statsPtr = std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load()); LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load()); @@ -715,11 +688,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl } bool MultiTopicsConsumerImpl::isConnected() const { - Lock lock(mutex_); if (state_ != Ready) { return false; } - lock.unlock(); return consumers_ .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); }) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 98b2f318af9..d2ede4a8770 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -99,7 +99,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::map<std::string, int> topicsPartitions_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - MultiTopicsConsumerState state_ = Pending; + std::atomic<MultiTopicsConsumerState> state_{Pending}; BlockingQueue<Message> messages_; ExecutorServicePtr listenerExecutor_; MessageListener messageListener_; @@ -111,9 +111,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::queue<ReceiveCallback> pendingReceives_; /* methods */ - void setState(MultiTopicsConsumerState state); - bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update); - void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback); diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index e43b5090e43..0dc9135be59 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -68,13 +68,10 @@ const std::string& PartitionedConsumerImpl::getSubscriptionName() const { return const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; } Result PartitionedConsumerImpl::receive(Message& msg) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } // See comments in `receive(Message&, int)` - lock.unlock(); if (messageListener_) { LOG_ERROR("Can not receive when a listener has been set"); @@ -87,15 +84,9 @@ Result PartitionedConsumerImpl::receive(Message& msg) { } Result PartitionedConsumerImpl::receive(Message& msg, int timeout) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } - // We unlocked `mutex_` here to avoid starvation of methods which are trying to acquire `mutex_`. - // In addition, `messageListener_` won't change once constructed, `BlockingQueue::pop` and - // `UnAckedMessageTracker::add` are thread-safe, so they don't need `mutex_` to achieve thread-safety. - lock.unlock(); if (messageListener_) { LOG_ERROR("Can not receive when a listener has been set"); @@ -114,12 +105,10 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed - Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } - stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (messages_.pop(msg, std::chrono::milliseconds(0))) { @@ -134,29 +123,23 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing"); // change state to Closing, so that no Ready state operation is permitted during unsubscribe - setState(Closing); + state_ = Closing; // do not accept un subscribe until we have subscribe to all of the partitions of a topic // it's a logical single topic so it should behave like a single topic, even if it's sharded - Lock lock(mutex_); - if (state_ != Ready) { - lock.unlock(); - unsigned int index = 0; - for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); - consumer++) { - LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_ - << " for Topic - " << topicName_->toString()); - (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync, - shared_from_this(), std::placeholders::_1, index++, - callback)); - } + unsigned int index = 0; + for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); + consumer++) { + LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_ + << " for Topic - " << topicName_->toString()); + (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync, + shared_from_this(), std::placeholders::_1, index++, + callback)); } } void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback) { - Lock lock(mutex_); if (state_ == Failed) { - lock.unlock(); // we have already informed the client that unsubcribe has failed so, ignore this callbacks // or do we still go ahead and check how many could we close successfully? LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State for consumerIndex - " @@ -164,9 +147,8 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int << subscriptionName_ << " for Topic - " << topicName_->toString()); return; } - lock.unlock(); if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Error Closing one of the parition consumers, consumerIndex - " << consumerIndex); callback(ResultUnknownError); return; @@ -181,7 +163,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int unsubscribedSoFar_++; if (unsubscribedSoFar_ == numPartitions) { LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_); - setState(Closed); + state_ = Closed; callback(ResultOk); return; } @@ -276,7 +258,6 @@ void PartitionedConsumerImpl::start() { void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex) { ResultCallback nullCallbackForCleanup = NULL; - Lock lock(mutex_); if (state_ == Failed) { // one of the consumer creation failed, and we are cleaning up return; @@ -286,7 +267,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( if (result != ResultOk) { state_ = Failed; - lock.unlock(); partitionedConsumerCreatedPromise_.setFailed(result); // unsubscribed all of the successfully subscribed partitioned consumers closeAsync(nullCallbackForCleanup); @@ -295,12 +275,13 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( } assert(partitionIndex < numPartitions && partitionIndex >= 0); + Lock lock(mutex_); numConsumersCreated_++; + lock.unlock(); if (numConsumersCreated_ == numPartitions) { LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - " << numPartitions << " Partitions."); state_ = Ready; - lock.unlock(); if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } @@ -312,7 +293,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex, CloseCallback callback) { - Lock lock(mutex_); if (state_ == Failed) { // we should have already notified the client by callback return; @@ -320,7 +300,6 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, if (result != ResultOk) { state_ = Failed; LOG_ERROR("Closing the consumer failed for partition - " << partitionIndex); - lock.unlock(); partitionedConsumerCreatedPromise_.setFailed(result); if (callback) { callback(result); @@ -328,13 +307,14 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, return; } assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0); + Lock lock(mutex_); if (numConsumersCreated_ > 0) { numConsumersCreated_--; } + lock.unlock(); // closed all successfully if (!numConsumersCreated_) { state_ = Closed; - lock.unlock(); // set the producerCreatedPromise to failure partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); if (callback) { @@ -344,11 +324,12 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, } } void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { + Lock lock(consumersMutex_); if (consumers_.empty()) { notifyResult(callback); return; } - setState(Closed); + state_ = Closed; unsigned int consumerAlreadyClosed = 0; // close successfully subscribed consumers // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased @@ -376,29 +357,20 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) { if (closeCallback) { // this means client invoked the closeAsync with a valid callback - setState(Closed); + state_ = Closed; closeCallback(ResultOk); } else { // consumer create failed, closeAsync called to cleanup the successfully created producers - setState(Failed); + state_ = Failed; partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); } } -void PartitionedConsumerImpl::setState(const PartitionedConsumerState state) { - Lock lock(mutex_); - state_ = state; - lock.unlock(); -} - void PartitionedConsumerImpl::shutdown() {} bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; } -bool PartitionedConsumerImpl::isOpen() { - Lock lock(mutex_); - return state_ == Ready; -} +bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; } void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition()); @@ -502,9 +474,7 @@ const std::string& PartitionedConsumerImpl::getName() const { return partitionSt int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } @@ -513,7 +483,6 @@ void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions); LatchPtr latchPtr = std::make_shared<Latch>(numPartitions); ConsumerList consumerList = consumers_; - lock.unlock(); for (int i = 0; i < consumerList.size(); i++) { consumerList[i]->getBrokerConsumerStatsAsync( std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, shared_from_this(), @@ -545,16 +514,13 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c } void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - Lock stateLock(mutex_); if (state_ != Ready) { - stateLock.unlock(); callback(ResultAlreadyClosed); return; } // consumers_ could only be modified when state_ is Ready, so we needn't lock consumersMutex_ here ConsumerList consumerList = consumers_; - stateLock.unlock(); MultiResultCallback multiResultCallback(callback, consumers_.size()); for (ConsumerList::const_iterator i = consumerList.begin(); i != consumerList.end(); i++) { @@ -577,7 +543,6 @@ void PartitionedConsumerImpl::getPartitionMetadata() { void PartitionedConsumerImpl::handleGetPartitions(Result result, const LookupDataResultPtr& lookupDataResult) { - Lock stateLock(mutex_); if (state_ != Ready) { return; } @@ -614,11 +579,9 @@ void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl } bool PartitionedConsumerImpl::isConnected() const { - Lock stateLock(mutex_); if (state_ != Ready) { return false; } - stateLock.unlock(); Lock consumersLock(consumersMutex_); const auto consumers = consumers_; diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 7fa0ccdd1f4..8f4faf09954 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -92,7 +92,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, mutable std::mutex consumersMutex_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - PartitionedConsumerState state_ = Pending; + std::atomic<PartitionedConsumerState> state_{Pending}; unsigned int unsubscribedSoFar_ = 0; BlockingQueue<Message> messages_; ExecutorServicePtr listenerExecutor_; @@ -109,7 +109,6 @@ class PartitionedConsumerImpl : public ConsumerImplBase, unsigned int getNumPartitionsWithLock() const; ConsumerConfiguration getSinglePartitionConsumerConfig() const; ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const; - void setState(PartitionedConsumerState state); void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback); void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc index 34e912d4ddc..79ed1969d78 100644 --- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc @@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system: return; } - if (state_ != Ready) { - LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_); + const auto state = state_.load(); + if (state != Ready) { + LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state); resetAutoDiscoveryTimer(); return; } diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index a539889ac00..a9868bca97a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -137,13 +137,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) { } void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - Lock lock(mutex_); if (state_ == Closed) { - lock.unlock(); LOG_DEBUG(getName() << "connectionOpened : Producer is already closed"); return; } - lock.unlock(); ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); @@ -165,7 +162,6 @@ void ProducerImpl::connectionFailed(Result result) { // so don't change the state and allow reconnections return; } else if (producerCreatedPromise_.setFailed(result)) { - Lock lock(mutex_); state_ = Failed; } } @@ -176,14 +172,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // make sure we're still in the Pending/Ready state, closeAsync could have been invoked // while waiting for this response if using lazy producers - Lock lock(mutex_); - if (state_ != Ready && state_ != Pending) { + const auto state = state_.load(); + if (state != Ready && state != Pending) { LOG_DEBUG("Producer created response received but producer already closed"); failPendingMessages(ResultAlreadyClosed, false); return; } if (result == ResultOk) { + Lock lock(mutex_); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); @@ -218,8 +215,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r producerCreatedPromise_.setValue(shared_from_this()); } else { - lock.unlock(); - // Producer creation failed if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer @@ -249,7 +244,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); failPendingMessages(result, true); producerCreatedPromise_.setFailed(result); - Lock lock(mutex_); state_ = Failed; } } @@ -327,9 +321,8 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { - Lock lock(mutex_); - if (state_ == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(callback); lock.unlock(); failures.complete(); @@ -343,8 +336,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) { void ProducerImpl::triggerFlush() { if (batchMessageContainer_) { - Lock lock(mutex_); if (state_ == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -353,9 +346,7 @@ void ProducerImpl::triggerFlush() { } bool ProducerImpl::isValidProducerState(const SendCallback& callback) const { - Lock lock(mutex_); - const auto state = state_; - lock.unlock(); + const auto state = state_.load(); switch (state) { case HandlerBase::Ready: // OK @@ -614,8 +605,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e LOG_DEBUG(getName() << " - Batch Message Timer expired"); // ignore if the producer is already closing/closed - Lock lock(mutex_); - if (state_ == Pending || state_ == Ready) { + const auto state = state_.load(); + if (state == Pending || state == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -632,11 +624,9 @@ void ProducerImpl::printStats() { } void ProducerImpl::closeAsync(CloseCallback callback) { - Lock lock(mutex_); - // if the producer was never started then there is nothing to clean up - if (state_ == NotStarted) { - state_ = Closed; + State expectedState = NotStarted; + if (state_.compare_exchange_strong(expectedState, Closed)) { callback(ResultOk); return; } @@ -649,9 +639,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) { // ensure any remaining send callbacks are called before calling the close callback failPendingMessages(ResultAlreadyClosed, false); - if (state_ != Ready && state_ != Pending) { + // TODO maybe we need a loop here to implement CAS for a condition, + // just like Java's `getAndUpdate` method on an atomic variable + const auto state = state_.load(); + if (state != Ready && state != Pending) { state_ = Closed; - lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -664,7 +656,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; - lock.unlock(); + if (callback) { callback(ResultOk); } @@ -678,7 +670,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; - lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -686,7 +677,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { return; } - lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); @@ -699,7 +689,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Closed producer"); ClientConnectionPtr cnx = getCnx().lock(); @@ -722,10 +711,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() uint64_t ProducerImpl::getProducerId() const { return producerId_; } void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { - Lock lock(mutex_); - if (state_ != Pending && state_ != Ready) { + const auto state = state_.load(); + if (state != Pending && state != Ready) { return; } + Lock lock(mutex_); if (err == boost::asio::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); @@ -896,22 +886,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr return a->getProducerId() < b->getProducerId(); } -bool ProducerImpl::isClosed() { - Lock lock(mutex_); - return state_ == Closed; -} +bool ProducerImpl::isClosed() { return state_ == Closed; } -bool ProducerImpl::isConnected() const { - Lock lock(mutex_); - return !getCnx().expired() && state_ == Ready; -} +bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } -bool ProducerImpl::isStarted() const { - Lock lock(mutex_); - return state_ != NotStarted; -} +bool ProducerImpl::isStarted() const { return state_ != NotStarted; } void ProducerImpl::startSendTimeoutTimer() { // Initialize the sendTimer only once per producer and only when producer timeout is // configured. Set the timeout as configured value and asynchronously wait for the
