This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7285318a763fd57559e2b15f637fe2b95d70a1db Author: Yunze Xu <[email protected]> AuthorDate: Fri Aug 26 13:59:28 2022 +0800 Revert "[improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)" This reverts commit 27a4b17a896080452871f6d300358c3594e4e5d4. --- pulsar-client-cpp/lib/ConsumerImpl.cc | 75 ++++++++++++++++----- pulsar-client-cpp/lib/HandlerBase.cc | 11 ++-- pulsar-client-cpp/lib/HandlerBase.h | 2 +- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 71 ++++++++++++++------ pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 +- pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 77 ++++++++++++++++------ pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 3 +- .../lib/PatternMultiTopicsConsumerImpl.cc | 5 +- pulsar-client-cpp/lib/ProducerImpl.cc | 59 +++++++++++------ 9 files changed, 218 insertions(+), 90 deletions(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 19ef3501d6c..b5b5ceb046f 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -160,7 +160,10 @@ void ConsumerImpl::start() { } void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - if (state_ == Closed) { + Lock lock(mutex_); + const auto state = state_; + lock.unlock(); + if (state == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } @@ -199,6 +202,7 @@ void ConsumerImpl::connectionFailed(Result result) { ConsumerImplPtr ptr = shared_from_this(); if (consumerCreatedPromise_.setFailed(result)) { + Lock lock(mutex_); state_ = Failed; } } @@ -270,15 +274,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO(getName() << "Unsubscribing"); + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); callback(ResultAlreadyClosed); LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and " "then call unsubscribe"); return; } - Lock lock(mutex_); - ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_); @@ -299,6 +303,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) { if (result == ResultOk) { + Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Unsubscribed successfully"); } else { @@ -744,10 +749,12 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } + stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { @@ -765,10 +772,12 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { } Result ConsumerImpl::receiveHelper(Message& msg) { - if (state_ != Ready) { - return ResultAlreadyClosed; + { + Lock lock(mutex_); + if (state_ != Ready) { + return ResultAlreadyClosed; + } } - if (messageListener_) { LOG_ERROR(getName() << "Can not receive when a listener has been set"); return ResultInvalidConfiguration; @@ -795,8 +804,11 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { return ResultInvalidConfiguration; } - if (state_ != Ready) { - return ResultAlreadyClosed; + { + Lock lock(mutex_); + if (state_ != Ready) { + return ResultAlreadyClosed; + } } if (messageListener_) { @@ -971,10 +983,13 @@ void ConsumerImpl::disconnectConsumer() { } void ConsumerImpl::closeAsync(ResultCallback callback) { + Lock lock(mutex_); + // Keep a reference to ensure object is kept alive ConsumerImplPtr ptr = shared_from_this(); if (state_ != Ready) { + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -992,6 +1007,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; + lock.unlock(); // If connection is gone, also the consumer is closed on the broker side if (callback) { callback(ResultOk); @@ -1002,6 +1018,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; + lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -1009,6 +1026,8 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { return; } + // Lock is no longer required + lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); @@ -1024,7 +1043,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) { if (result == ResultOk) { + Lock lock(mutex_); state_ = Closed; + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1044,14 +1065,22 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI const std::string& ConsumerImpl::getName() const { return consumerStr_; } void ConsumerImpl::shutdown() { + Lock lock(mutex_); state_ = Closed; + lock.unlock(); consumerCreatedPromise_.setFailed(ResultAlreadyClosed); } -bool ConsumerImpl::isClosed() { return state_ == Closed; } +bool ConsumerImpl::isClosed() { + Lock lock(mutex_); + return state_ == Closed; +} -bool ConsumerImpl::isOpen() { return state_ == Ready; } +bool ConsumerImpl::isOpen() { + Lock lock(mutex_); + return state_ == Ready; +} Result ConsumerImpl::pauseMessageListener() { if (!messageListener_) { @@ -1114,13 +1143,14 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") + lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } - Lock lock(mutex_); if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; @@ -1180,14 +1210,16 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) { } void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - const auto state = state_.load(); - if (state == Closed || state == Closing) { + Lock lock(mutex_); + if (state_ == Closed || state_ == Closing) { + lock.unlock(); LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } + lock.unlock(); this->ackGroupingTrackerPtr_->flushAndClean(); ClientConnectionPtr cnx = getCnx().lock(); @@ -1211,14 +1243,16 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - const auto state = state_.load(); - if (state == Closed || state == Closing) { + Lock lock(mutex_); + if (state_ == Closed || state_ == Closing) { + lock.unlock(); LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1282,14 +1316,16 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { - const auto state = state_.load(); - if (state == Closed || state == Closing) { + Lock lock(mutex_); + if (state_ == Closed || state_ == Closing) { + lock.unlock(); LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed, MessageId()); } return; } + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1335,7 +1371,10 @@ void ConsumerImpl::trackMessage(const MessageId& messageId) { } } -bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +bool ConsumerImpl::isConnected() const { + Lock lock(mutex_); + return !getCnx().expired() && state_ == Ready; +} uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index 5d2244f7552..d7025ad004b 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -43,9 +43,12 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, HandlerBase::~HandlerBase() { timer_->cancel(); } void HandlerBase::start() { + Lock lock(mutex_); // guard against concurrent state changes such as closing - State state = NotStarted; - if (state_.compare_exchange_strong(state, Pending)) { + if (state_ == NotStarted) { + state_ = Pending; + lock.unlock(); + grabCnx(); } } @@ -94,6 +97,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con return; } + Lock lock(handler->mutex_); State state = handler->state_; ClientConnectionPtr currentConnection = handler->connection_.lock(); @@ -131,8 +135,7 @@ bool HandlerBase::isRetriableError(Result result) { } void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { - const auto state = handler->state_.load(); - if (state == Pending || state == Ready) { + if (handler->state_ == Pending || handler->state_ == Ready) { TimeDuration delay = handler->backoff_.next(); LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 1184746da21..eeb8ebe1c5e 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -105,7 +105,7 @@ class HandlerBase { Failed }; - std::atomic<State> state_; + State state_; Backoff backoff_; uint64_t epoch_; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 293ece4c838..0ae86d5879a 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -56,8 +56,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std void MultiTopicsConsumerImpl::start() { if (topics_.empty()) { - MultiTopicsConsumerState state = Pending; - if (state_.compare_exchange_strong(state, Ready)) { + if (compareAndSetState(Pending, Ready)) { LOG_DEBUG("No topics passed in when create MultiTopicsConsumer."); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); return; @@ -85,15 +84,14 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c (*topicsNeedCreate)--; if (result != ResultOk) { - state_ = Failed; + setState(Failed); LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); } LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); if (topicsNeedCreate->load() == 0) { - MultiTopicsConsumerState state = Pending; - if (state_.compare_exchange_strong(state, Ready)) { + if (compareAndSetState(Pending, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); } else { @@ -117,8 +115,7 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s return topicPromise->getFuture(); } - const auto state = state_.load(); - if (state == Closed || state == Closing) { + if (state_ == Closed || state_ == Closing) { LOG_ERROR("MultiTopicsConsumer already closed when subscribe."); topicPromise->setFailed(ResultAlreadyClosed); return topicPromise->getFuture(); @@ -225,13 +222,15 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing"); - const auto state = state_.load(); - if (state == Closing || state == Closed) { + Lock lock(mutex_); + if (state_ == Closing || state_ == Closed) { LOG_INFO(consumerStr_ << " already closed"); + lock.unlock(); callback(ResultAlreadyClosed); return; } state_ = Closing; + lock.unlock(); std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); auto self = shared_from_this(); @@ -255,7 +254,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, (*consumerUnsubed)++; if (result != ResultOk) { - state_ = Failed; + setState(Failed); LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " subscription - " << subscriptionName_); } @@ -267,7 +266,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, unAckedMessageTrackerPtr_->clear(); Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; - state_ = Closed; + setState(Closed); callback(result1); return; } @@ -282,8 +281,7 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, return; } - const auto state = state_.load(); - if (state == Closing || state == Closed) { + if (state_ == Closing || state_ == Closed) { LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - " << subscriptionName_); callback(ResultAlreadyClosed); @@ -320,7 +318,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( (*consumerUnsubed)++; if (result != ResultOk) { - state_ = Failed; + setState(Failed); LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " topicPartitionName - " << topicPartitionName); } @@ -352,8 +350,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( } void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { - const auto state = state_.load(); - if (state == Closing || state == Closed) { + if (state_ == Closing || state_ == Closed) { LOG_ERROR("TopicsConsumer already closed " << " topic" << topic_ << " consumer - " << consumerStr_); if (callback) { @@ -362,7 +359,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { return; } - state_ = Closing; + setState(Closing); auto self = shared_from_this(); int numConsumers = 0; @@ -376,7 +373,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { if (numConsumers == 0) { LOG_DEBUG("TopicsConsumer have no consumers to close " << " topic" << topic_ << " subscription - " << subscriptionName_); - state_ = Closed; + setState(Closed); if (callback) { callback(ResultAlreadyClosed); } @@ -398,7 +395,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri numberTopicPartitions_->fetch_sub(1); if (result != ResultOk) { - state_ = Failed; + setState(Failed); LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " << result); } @@ -460,14 +457,18 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { } Result MultiTopicsConsumerImpl::receive(Message& msg) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { + lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } + lock.unlock(); messages_.pop(msg); unAckedMessageTrackerPtr_->add(msg.getMessageId()); @@ -475,15 +476,19 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) { } Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { + lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } + lock.unlock(); if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; @@ -496,10 +501,12 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } + stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (messages_.pop(msg, std::chrono::milliseconds(0))) { @@ -564,11 +571,30 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; } const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } +void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) { + Lock lock(mutex_); + state_ = state; +} + +bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect, + MultiTopicsConsumerState update) { + Lock lock(mutex_); + if (state_ == expect) { + state_ = update; + return true; + } else { + return false; + } +} + void MultiTopicsConsumerImpl::shutdown() {} bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; } -bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; } +bool MultiTopicsConsumerImpl::isOpen() { + Lock lock(mutex_); + return state_ == Ready; +} void MultiTopicsConsumerImpl::receiveMessages() { const auto receiverQueueSize = conf_.getReceiverQueueSize(); @@ -618,11 +644,12 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } - Lock lock(mutex_); MultiTopicsBrokerConsumerStatsPtr statsPtr = std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load()); LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load()); @@ -688,9 +715,11 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl } bool MultiTopicsConsumerImpl::isConnected() const { + Lock lock(mutex_); if (state_ != Ready) { return false; } + lock.unlock(); return consumers_ .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); }) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index d2ede4a8770..98b2f318af9 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -99,7 +99,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::map<std::string, int> topicsPartitions_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - std::atomic<MultiTopicsConsumerState> state_{Pending}; + MultiTopicsConsumerState state_ = Pending; BlockingQueue<Message> messages_; ExecutorServicePtr listenerExecutor_; MessageListener messageListener_; @@ -111,6 +111,9 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::queue<ReceiveCallback> pendingReceives_; /* methods */ + void setState(MultiTopicsConsumerState state); + bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update); + void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback); diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 0dc9135be59..e43b5090e43 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -68,10 +68,13 @@ const std::string& PartitionedConsumerImpl::getSubscriptionName() const { return const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; } Result PartitionedConsumerImpl::receive(Message& msg) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); return ResultAlreadyClosed; } // See comments in `receive(Message&, int)` + lock.unlock(); if (messageListener_) { LOG_ERROR("Can not receive when a listener has been set"); @@ -84,9 +87,15 @@ Result PartitionedConsumerImpl::receive(Message& msg) { } Result PartitionedConsumerImpl::receive(Message& msg, int timeout) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); return ResultAlreadyClosed; } + // We unlocked `mutex_` here to avoid starvation of methods which are trying to acquire `mutex_`. + // In addition, `messageListener_` won't change once constructed, `BlockingQueue::pop` and + // `UnAckedMessageTracker::add` are thread-safe, so they don't need `mutex_` to achieve thread-safety. + lock.unlock(); if (messageListener_) { LOG_ERROR("Can not receive when a listener has been set"); @@ -105,10 +114,12 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } + stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (messages_.pop(msg, std::chrono::milliseconds(0))) { @@ -123,23 +134,29 @@ void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing"); // change state to Closing, so that no Ready state operation is permitted during unsubscribe - state_ = Closing; + setState(Closing); // do not accept un subscribe until we have subscribe to all of the partitions of a topic // it's a logical single topic so it should behave like a single topic, even if it's sharded - unsigned int index = 0; - for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); - consumer++) { - LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_ - << " for Topic - " << topicName_->toString()); - (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync, - shared_from_this(), std::placeholders::_1, index++, - callback)); + Lock lock(mutex_); + if (state_ != Ready) { + lock.unlock(); + unsigned int index = 0; + for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); + consumer++) { + LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_ + << " for Topic - " << topicName_->toString()); + (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync, + shared_from_this(), std::placeholders::_1, index++, + callback)); + } } } void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback) { + Lock lock(mutex_); if (state_ == Failed) { + lock.unlock(); // we have already informed the client that unsubcribe has failed so, ignore this callbacks // or do we still go ahead and check how many could we close successfully? LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State for consumerIndex - " @@ -147,8 +164,9 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int << subscriptionName_ << " for Topic - " << topicName_->toString()); return; } + lock.unlock(); if (result != ResultOk) { - state_ = Failed; + setState(Failed); LOG_ERROR("Error Closing one of the parition consumers, consumerIndex - " << consumerIndex); callback(ResultUnknownError); return; @@ -163,7 +181,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int unsubscribedSoFar_++; if (unsubscribedSoFar_ == numPartitions) { LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_); - state_ = Closed; + setState(Closed); callback(ResultOk); return; } @@ -258,6 +276,7 @@ void PartitionedConsumerImpl::start() { void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex) { ResultCallback nullCallbackForCleanup = NULL; + Lock lock(mutex_); if (state_ == Failed) { // one of the consumer creation failed, and we are cleaning up return; @@ -267,6 +286,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( if (result != ResultOk) { state_ = Failed; + lock.unlock(); partitionedConsumerCreatedPromise_.setFailed(result); // unsubscribed all of the successfully subscribed partitioned consumers closeAsync(nullCallbackForCleanup); @@ -275,13 +295,12 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( } assert(partitionIndex < numPartitions && partitionIndex >= 0); - Lock lock(mutex_); numConsumersCreated_++; - lock.unlock(); if (numConsumersCreated_ == numPartitions) { LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - " << numPartitions << " Partitions."); state_ = Ready; + lock.unlock(); if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } @@ -293,6 +312,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated( void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex, CloseCallback callback) { + Lock lock(mutex_); if (state_ == Failed) { // we should have already notified the client by callback return; @@ -300,6 +320,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, if (result != ResultOk) { state_ = Failed; LOG_ERROR("Closing the consumer failed for partition - " << partitionIndex); + lock.unlock(); partitionedConsumerCreatedPromise_.setFailed(result); if (callback) { callback(result); @@ -307,14 +328,13 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, return; } assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0); - Lock lock(mutex_); if (numConsumersCreated_ > 0) { numConsumersCreated_--; } - lock.unlock(); // closed all successfully if (!numConsumersCreated_) { state_ = Closed; + lock.unlock(); // set the producerCreatedPromise to failure partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); if (callback) { @@ -324,12 +344,11 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, } } void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { - Lock lock(consumersMutex_); if (consumers_.empty()) { notifyResult(callback); return; } - state_ = Closed; + setState(Closed); unsigned int consumerAlreadyClosed = 0; // close successfully subscribed consumers // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased @@ -357,20 +376,29 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) { if (closeCallback) { // this means client invoked the closeAsync with a valid callback - state_ = Closed; + setState(Closed); closeCallback(ResultOk); } else { // consumer create failed, closeAsync called to cleanup the successfully created producers - state_ = Failed; + setState(Failed); partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError); } } +void PartitionedConsumerImpl::setState(const PartitionedConsumerState state) { + Lock lock(mutex_); + state_ = state; + lock.unlock(); +} + void PartitionedConsumerImpl::shutdown() {} bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; } -bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; } +bool PartitionedConsumerImpl::isOpen() { + Lock lock(mutex_); + return state_ == Ready; +} void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition()); @@ -474,7 +502,9 @@ const std::string& PartitionedConsumerImpl::getName() const { return partitionSt int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); if (state_ != Ready) { + lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } @@ -483,6 +513,7 @@ void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions); LatchPtr latchPtr = std::make_shared<Latch>(numPartitions); ConsumerList consumerList = consumers_; + lock.unlock(); for (int i = 0; i < consumerList.size(); i++) { consumerList[i]->getBrokerConsumerStatsAsync( std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, shared_from_this(), @@ -514,13 +545,16 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c } void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { + Lock stateLock(mutex_); if (state_ != Ready) { + stateLock.unlock(); callback(ResultAlreadyClosed); return; } // consumers_ could only be modified when state_ is Ready, so we needn't lock consumersMutex_ here ConsumerList consumerList = consumers_; + stateLock.unlock(); MultiResultCallback multiResultCallback(callback, consumers_.size()); for (ConsumerList::const_iterator i = consumerList.begin(); i != consumerList.end(); i++) { @@ -543,6 +577,7 @@ void PartitionedConsumerImpl::getPartitionMetadata() { void PartitionedConsumerImpl::handleGetPartitions(Result result, const LookupDataResultPtr& lookupDataResult) { + Lock stateLock(mutex_); if (state_ != Ready) { return; } @@ -579,9 +614,11 @@ void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl } bool PartitionedConsumerImpl::isConnected() const { + Lock stateLock(mutex_); if (state_ != Ready) { return false; } + stateLock.unlock(); Lock consumersLock(consumersMutex_); const auto consumers = consumers_; diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 8f4faf09954..7fa0ccdd1f4 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -92,7 +92,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, mutable std::mutex consumersMutex_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - std::atomic<PartitionedConsumerState> state_{Pending}; + PartitionedConsumerState state_ = Pending; unsigned int unsubscribedSoFar_ = 0; BlockingQueue<Message> messages_; ExecutorServicePtr listenerExecutor_; @@ -109,6 +109,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, unsigned int getNumPartitionsWithLock() const; ConsumerConfiguration getSinglePartitionConsumerConfig() const; ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const; + void setState(PartitionedConsumerState state); void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback); void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc index 79ed1969d78..34e912d4ddc 100644 --- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc @@ -55,9 +55,8 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system: return; } - const auto state = state_.load(); - if (state != Ready) { - LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state); + if (state_ != Ready) { + LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_); resetAutoDiscoveryTimer(); return; } diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 352b36a0b8f..a539889ac00 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -137,10 +137,13 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) { } void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { + Lock lock(mutex_); if (state_ == Closed) { + lock.unlock(); LOG_DEBUG(getName() << "connectionOpened : Producer is already closed"); return; } + lock.unlock(); ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); @@ -162,6 +165,7 @@ void ProducerImpl::connectionFailed(Result result) { // so don't change the state and allow reconnections return; } else if (producerCreatedPromise_.setFailed(result)) { + Lock lock(mutex_); state_ = Failed; } } @@ -172,15 +176,14 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // make sure we're still in the Pending/Ready state, closeAsync could have been invoked // while waiting for this response if using lazy producers - const auto state = state_.load(); - if (state != Ready && state != Pending) { + Lock lock(mutex_); + if (state_ != Ready && state_ != Pending) { LOG_DEBUG("Producer created response received but producer already closed"); failPendingMessages(ResultAlreadyClosed, false); return; } if (result == ResultOk) { - Lock lock(mutex_); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); @@ -215,6 +218,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r producerCreatedPromise_.setValue(shared_from_this()); } else { + lock.unlock(); + // Producer creation failed if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer @@ -244,6 +249,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); failPendingMessages(result, true); producerCreatedPromise_.setFailed(result); + Lock lock(mutex_); state_ = Failed; } } @@ -321,8 +327,9 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { + Lock lock(mutex_); + if (state_ == Ready) { - Lock lock(mutex_); auto failures = batchMessageAndSend(callback); lock.unlock(); failures.complete(); @@ -336,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) { void ProducerImpl::triggerFlush() { if (batchMessageContainer_) { + Lock lock(mutex_); if (state_ == Ready) { - Lock lock(mutex_); auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -607,9 +614,8 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e LOG_DEBUG(getName() << " - Batch Message Timer expired"); // ignore if the producer is already closing/closed - const auto state = state_.load(); - if (state == Pending || state == Ready) { - Lock lock(mutex_); + Lock lock(mutex_); + if (state_ == Pending || state_ == Ready) { auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -626,9 +632,11 @@ void ProducerImpl::printStats() { } void ProducerImpl::closeAsync(CloseCallback callback) { + Lock lock(mutex_); + // if the producer was never started then there is nothing to clean up - State expectedState = NotStarted; - if (state_.compare_exchange_strong(expectedState, Closed)) { + if (state_ == NotStarted) { + state_ = Closed; callback(ResultOk); return; } @@ -641,11 +649,9 @@ void ProducerImpl::closeAsync(CloseCallback callback) { // ensure any remaining send callbacks are called before calling the close callback failPendingMessages(ResultAlreadyClosed, false); - // TODO maybe we need a loop here to implement CAS for a condition, - // just like Java's `getAndUpdate` method on an atomic variable - const auto state = state_.load(); - if (state != Ready && state != Pending) { + if (state_ != Ready && state_ != Pending) { state_ = Closed; + lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -658,7 +664,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; - + lock.unlock(); if (callback) { callback(ResultOk); } @@ -672,6 +678,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; + lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -679,6 +686,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { return; } + lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); @@ -691,6 +699,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) { if (result == ResultOk) { + Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Closed producer"); ClientConnectionPtr cnx = getCnx().lock(); @@ -713,11 +722,10 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() uint64_t ProducerImpl::getProducerId() const { return producerId_; } void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { - const auto state = state_.load(); - if (state != Pending && state != Ready) { + Lock lock(mutex_); + if (state_ != Pending && state_ != Ready) { return; } - Lock lock(mutex_); if (err == boost::asio::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); @@ -888,13 +896,22 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr return a->getProducerId() < b->getProducerId(); } -bool ProducerImpl::isClosed() { return state_ == Closed; } +bool ProducerImpl::isClosed() { + Lock lock(mutex_); + return state_ == Closed; +} -bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } +bool ProducerImpl::isConnected() const { + Lock lock(mutex_); + return !getCnx().expired() && state_ == Ready; +} uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } -bool ProducerImpl::isStarted() const { return state_ != NotStarted; } +bool ProducerImpl::isStarted() const { + Lock lock(mutex_); + return state_ != NotStarted; +} void ProducerImpl::startSendTimeoutTimer() { // Initialize the sendTimer only once per producer and only when producer timeout is // configured. Set the timeout as configured value and asynchronously wait for the
