This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6e840ed11552e1f50c2d0214a6c90f204c437d6f Author: Cong Zhao <[email protected]> AuthorDate: Sat Aug 13 10:36:51 2022 +0800 [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940) ### Motivation Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance. ### Modifications Use an atomic `state_` instead of the lock to improve performance. (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267) --- pulsar-client-cpp/lib/ConsumerImpl.cc | 75 ++++++---------------- pulsar-client-cpp/lib/HandlerBase.cc | 11 ++-- pulsar-client-cpp/lib/HandlerBase.h | 2 +- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 71 ++++++-------------- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 +- .../lib/PatternMultiTopicsConsumerImpl.cc | 5 +- pulsar-client-cpp/lib/ProducerImpl.cc | 59 ++++++----------- 7 files changed, 69 insertions(+), 159 deletions(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 1f4a6b5b988..7c1c89d93f8 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -157,10 +157,7 @@ void ConsumerImpl::start() { } void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - Lock lock(mutex_); - const auto state = state_; - lock.unlock(); - if (state == Closed) { + if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); return; } @@ -198,7 +195,6 @@ void ConsumerImpl::connectionFailed(Result result) { ConsumerImplPtr ptr = shared_from_this(); if (consumerCreatedPromise_.setFailed(result)) { - Lock lock(mutex_); state_ = Failed; } } @@ -271,15 +267,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO(getName() << "Unsubscribing"); - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); callback(ResultAlreadyClosed); LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and " "then call unsubscribe"); return; } + Lock lock(mutex_); + ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_); @@ -300,7 +296,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) { void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Unsubscribed successfully"); } else { @@ -630,12 +625,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed - Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } - stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) { @@ -653,12 +646,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { } Result ConsumerImpl::receiveHelper(Message& msg) { - { - Lock lock(mutex_); - if (state_ != Ready) { - return ResultAlreadyClosed; - } + if (state_ != Ready) { + return ResultAlreadyClosed; } + if (messageListener_) { LOG_ERROR(getName() << "Can not receive when a listener has been set"); return ResultInvalidConfiguration; @@ -685,11 +676,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { return ResultInvalidConfiguration; } - { - Lock lock(mutex_); - if (state_ != Ready) { - return ResultAlreadyClosed; - } + if (state_ != Ready) { + return ResultAlreadyClosed; } if (messageListener_) { @@ -864,13 +852,10 @@ void ConsumerImpl::disconnectConsumer() { } void ConsumerImpl::closeAsync(ResultCallback callback) { - Lock lock(mutex_); - // Keep a reference to ensure object is kept alive ConsumerImplPtr ptr = shared_from_this(); if (state_ != Ready) { - lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -888,7 +873,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; - lock.unlock(); // If connection is gone, also the consumer is closed on the broker side if (callback) { callback(ResultOk); @@ -899,7 +883,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; - lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -907,8 +890,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { return; } - // Lock is no longer required - lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId); @@ -924,9 +905,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -946,22 +925,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI const std::string& ConsumerImpl::getName() const { return consumerStr_; } void ConsumerImpl::shutdown() { - Lock lock(mutex_); state_ = Closed; - lock.unlock(); consumerCreatedPromise_.setFailed(ResultAlreadyClosed); } -bool ConsumerImpl::isClosed() { - Lock lock(mutex_); - return state_ == Closed; -} +bool ConsumerImpl::isClosed() { return state_ == Closed; } -bool ConsumerImpl::isOpen() { - Lock lock(mutex_); - return state_ == Ready; -} +bool ConsumerImpl::isOpen() { return state_ == Ready; } Result ConsumerImpl::pauseMessageListener() { if (!messageListener_) { @@ -1024,14 +995,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) { int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") - lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } + Lock lock(mutex_); if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; @@ -1091,16 +1061,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) { } void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } - lock.unlock(); this->ackGroupingTrackerPtr_->flushAndClean(); ClientConnectionPtr cnx = getCnx().lock(); @@ -1124,16 +1092,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { } void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed); } return; } - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1197,16 +1163,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback } void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { - Lock lock(mutex_); - if (state_ == Closed || state_ == Closing) { - lock.unlock(); + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR(getName() << "Client connection already closed."); if (callback) { callback(ResultAlreadyClosed, MessageId()); } return; } - lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -1252,10 +1216,7 @@ void ConsumerImpl::trackMessage(const Message& msg) { } } -bool ConsumerImpl::isConnected() const { - Lock lock(mutex_); - return !getCnx().expired() && state_ == Ready; -} +bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; } diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index d7025ad004b..5d2244f7552 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, HandlerBase::~HandlerBase() { timer_->cancel(); } void HandlerBase::start() { - Lock lock(mutex_); // guard against concurrent state changes such as closing - if (state_ == NotStarted) { - state_ = Pending; - lock.unlock(); - + State state = NotStarted; + if (state_.compare_exchange_strong(state, Pending)) { grabCnx(); } } @@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con return; } - Lock lock(handler->mutex_); State state = handler->state_; ClientConnectionPtr currentConnection = handler->connection_.lock(); @@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) { } void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { - if (handler->state_ == Pending || handler->state_ == Ready) { + const auto state = handler->state_.load(); + if (state == Pending || state == Ready) { TimeDuration delay = handler->backoff_.next(); LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index eeb8ebe1c5e..1184746da21 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -105,7 +105,7 @@ class HandlerBase { Failed }; - State state_; + std::atomic<State> state_; Backoff backoff_; uint64_t epoch_; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 87166d0d360..d4b48681039 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -63,7 +63,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std void MultiTopicsConsumerImpl::start() { if (topics_.empty()) { - if (compareAndSetState(Pending, Ready)) { + MultiTopicsConsumerState state = Pending; + if (state_.compare_exchange_strong(state, Ready)) { LOG_DEBUG("No topics passed in when create MultiTopicsConsumer."); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); return; @@ -91,14 +92,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c (*topicsNeedCreate)--; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); } LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); if (topicsNeedCreate->load() == 0) { - if (compareAndSetState(Pending, Ready)) { + MultiTopicsConsumerState state = Pending; + if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); } else { @@ -122,7 +124,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s return topicPromise->getFuture(); } - if (state_ == Closed || state_ == Closing) { + const auto state = state_.load(); + if (state == Closed || state == Closing) { LOG_ERROR("MultiTopicsConsumer already closed when subscribe."); topicPromise->setFailed(ResultAlreadyClosed); return topicPromise->getFuture(); @@ -238,15 +241,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing"); - Lock lock(mutex_); - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_INFO(consumerStr_ << " already closed"); - lock.unlock(); callback(ResultAlreadyClosed); return; } state_ = Closing; - lock.unlock(); std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); auto self = shared_from_this(); @@ -270,7 +271,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, (*consumerUnsubed)++; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " subscription - " << subscriptionName_); } @@ -282,7 +283,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, unAckedMessageTrackerPtr_->clear(); Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; - setState(Closed); + state_ = Closed; callback(result1); return; } @@ -301,7 +302,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, int numberPartitions = it->second; lock.unlock(); - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - " << subscriptionName_); callback(ResultAlreadyClosed); @@ -337,7 +339,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( (*consumerUnsubed)++; if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " << result << " topicPartitionName - " << topicPartitionName); } @@ -369,7 +371,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( } void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { - if (state_ == Closing || state_ == Closed) { + const auto state = state_.load(); + if (state == Closing || state == Closed) { LOG_ERROR("TopicsConsumer already closed " << " topic" << topic_ << " consumer - " << consumerStr_); if (callback) { @@ -378,7 +381,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { return; } - setState(Closing); + state_ = Closing; auto self = shared_from_this(); int numConsumers = 0; @@ -392,7 +395,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { if (numConsumers == 0) { LOG_DEBUG("TopicsConsumer have no consumers to close " << " topic" << topic_ << " subscription - " << subscriptionName_); - setState(Closed); + state_ = Closed; if (callback) { callback(ResultAlreadyClosed); } @@ -414,7 +417,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri numberTopicPartitions_->fetch_sub(1); if (result != ResultOk) { - setState(Failed); + state_ = Failed; LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " << result); } @@ -476,18 +479,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { } Result MultiTopicsConsumerImpl::receive(Message& msg) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { - lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } - lock.unlock(); messages_.pop(msg); unAckedMessageTrackerPtr_->add(msg.getMessageId()); @@ -495,19 +494,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) { } Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); return ResultAlreadyClosed; } if (messageListener_) { - lock.unlock(); LOG_ERROR("Can not receive when a listener has been set"); return ResultInvalidConfiguration; } - lock.unlock(); if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { unAckedMessageTrackerPtr_->add(msg.getMessageId()); return ResultOk; @@ -520,12 +515,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { Message msg; // fail the callback if consumer is closing or closed - Lock stateLock(mutex_); if (state_ != Ready) { callback(ResultAlreadyClosed, msg); return; } - stateLock.unlock(); Lock lock(pendingReceiveMutex_); if (messages_.pop(msg, std::chrono::milliseconds(0))) { @@ -590,30 +583,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; } const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } -void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) { - Lock lock(mutex_); - state_ = state; -} - -bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect, - MultiTopicsConsumerState update) { - Lock lock(mutex_); - if (state_ == expect) { - state_ = update; - return true; - } else { - return false; - } -} - void MultiTopicsConsumerImpl::shutdown() {} bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; } -bool MultiTopicsConsumerImpl::isOpen() { - Lock lock(mutex_); - return state_ == Ready; -} +bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; } void MultiTopicsConsumerImpl::receiveMessages() { const auto receiverQueueSize = conf_.getReceiverQueueSize(); @@ -663,12 +637,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { - Lock lock(mutex_); if (state_ != Ready) { - lock.unlock(); callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } + Lock lock(mutex_); MultiTopicsBrokerConsumerStatsPtr statsPtr = std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load()); LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load()); @@ -742,11 +715,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl } bool MultiTopicsConsumerImpl::isConnected() const { - Lock lock(mutex_); if (state_ != Ready) { return false; } - lock.unlock(); return consumers_ .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); }) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index f69c56343e4..bfd92582c75 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -106,7 +106,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::map<std::string, int> topicsPartitions_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; - MultiTopicsConsumerState state_ = Pending; + std::atomic<MultiTopicsConsumerState> state_{Pending}; BlockingQueue<Message> messages_; const ExecutorServicePtr listenerExecutor_; MessageListener messageListener_; @@ -120,9 +120,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, std::queue<ReceiveCallback> pendingReceives_; /* methods */ - void setState(MultiTopicsConsumerState state); - bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update); - void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback); diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc index 8e92fd318d6..1183adfc4f1 100644 --- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc @@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system: return; } - if (state_ != Ready) { - LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_); + const auto state = state_.load(); + if (state != Ready) { + LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state); resetAutoDiscoveryTimer(); return; } diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index e15d388ef64..a446c2b4b25 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -136,13 +136,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) { } void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { - Lock lock(mutex_); if (state_ == Closed) { - lock.unlock(); LOG_DEBUG(getName() << "connectionOpened : Producer is already closed"); return; } - lock.unlock(); ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); @@ -164,7 +161,6 @@ void ProducerImpl::connectionFailed(Result result) { // so don't change the state and allow reconnections return; } else if (producerCreatedPromise_.setFailed(result)) { - Lock lock(mutex_); state_ = Failed; } } @@ -175,14 +171,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // make sure we're still in the Pending/Ready state, closeAsync could have been invoked // while waiting for this response if using lazy producers - Lock lock(mutex_); - if (state_ != Ready && state_ != Pending) { + const auto state = state_.load(); + if (state != Ready && state != Pending) { LOG_DEBUG("Producer created response received but producer already closed"); failPendingMessages(ResultAlreadyClosed, false); return; } if (result == ResultOk) { + Lock lock(mutex_); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); @@ -217,8 +214,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r producerCreatedPromise_.setValue(shared_from_this()); } else { - lock.unlock(); - // Producer creation failed if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer @@ -248,7 +243,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); failPendingMessages(result, true); producerCreatedPromise_.setFailed(result); - Lock lock(mutex_); state_ = Failed; } } @@ -334,9 +328,8 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { - Lock lock(mutex_); - if (state_ == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(callback); lock.unlock(); failures.complete(); @@ -350,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) { void ProducerImpl::triggerFlush() { if (batchMessageContainer_) { - Lock lock(mutex_); if (state_ == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -551,8 +544,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e LOG_DEBUG(getName() << " - Batch Message Timer expired"); // ignore if the producer is already closing/closed - Lock lock(mutex_); - if (state_ == Pending || state_ == Ready) { + const auto state = state_.load(); + if (state == Pending || state == Ready) { + Lock lock(mutex_); auto failures = batchMessageAndSend(); lock.unlock(); failures.complete(); @@ -569,11 +563,9 @@ void ProducerImpl::printStats() { } void ProducerImpl::closeAsync(CloseCallback callback) { - Lock lock(mutex_); - // if the producer was never started then there is nothing to clean up - if (state_ == NotStarted) { - state_ = Closed; + State expectedState = NotStarted; + if (state_.compare_exchange_strong(expectedState, Closed)) { callback(ResultOk); return; } @@ -586,9 +578,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) { // ensure any remaining send callbacks are called before calling the close callback failPendingMessages(ResultAlreadyClosed, false); - if (state_ != Ready && state_ != Pending) { + // TODO maybe we need a loop here to implement CAS for a condition, + // just like Java's `getAndUpdate` method on an atomic variable + const auto state = state_.load(); + if (state != Ready && state != Pending) { state_ = Closed; - lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } @@ -601,7 +595,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientConnectionPtr cnx = getCnx().lock(); if (!cnx) { state_ = Closed; - lock.unlock(); + if (callback) { callback(ResultOk); } @@ -615,7 +609,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { ClientImplPtr client = client_.lock(); if (!client) { state_ = Closed; - lock.unlock(); // Client was already destroyed if (callback) { callback(ResultOk); @@ -623,7 +616,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { return; } - lock.unlock(); int requestId = client->newRequestId(); Future<Result, ResponseData> future = cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId); @@ -636,7 +628,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) { void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) { if (result == ResultOk) { - Lock lock(mutex_); state_ = Closed; LOG_INFO(getName() << "Closed producer"); ClientConnectionPtr cnx = getCnx().lock(); @@ -659,10 +650,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() uint64_t ProducerImpl::getProducerId() const { return producerId_; } void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { - Lock lock(mutex_); - if (state_ != Pending && state_ != Ready) { + const auto state = state_.load(); + if (state != Pending && state != Ready) { return; } + Lock lock(mutex_); if (err == boost::asio::error::operation_aborted) { LOG_DEBUG(getName() << "Timer cancelled: " << err.message()); @@ -837,22 +829,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr return a->getProducerId() < b->getProducerId(); } -bool ProducerImpl::isClosed() { - Lock lock(mutex_); - return state_ == Closed; -} +bool ProducerImpl::isClosed() { return state_ == Closed; } -bool ProducerImpl::isConnected() const { - Lock lock(mutex_); - return !getCnx().expired() && state_ == Ready; -} +bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; } uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } -bool ProducerImpl::isStarted() const { - Lock lock(mutex_); - return state_ != NotStarted; -} +bool ProducerImpl::isStarted() const { return state_ != NotStarted; } void ProducerImpl::startSendTimeoutTimer() { // Initialize the sendTimer only once per producer and only when producer timeout is // configured. Set the timeout as configured value and asynchronously wait for the
