This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7285318a763fd57559e2b15f637fe2b95d70a1db
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Aug 26 13:59:28 2022 +0800

    Revert "[improve][client-c++] Use an atomic `state_` instead of the lock to 
improve performance (#16940)"
    
    This reverts commit 27a4b17a896080452871f6d300358c3594e4e5d4.
---
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 75 ++++++++++++++++-----
 pulsar-client-cpp/lib/HandlerBase.cc               | 11 ++--
 pulsar-client-cpp/lib/HandlerBase.h                |  2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 71 ++++++++++++++------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  5 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 77 ++++++++++++++++------
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  3 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              | 59 +++++++++++------
 9 files changed, 218 insertions(+), 90 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 19ef3501d6c..b5b5ceb046f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -160,7 +160,10 @@ void ConsumerImpl::start() {
 }
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    if (state_ == Closed) {
+    Lock lock(mutex_);
+    const auto state = state_;
+    lock.unlock();
+    if (state == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already 
closed");
         return;
     }
@@ -199,6 +202,7 @@ void ConsumerImpl::connectionFailed(Result result) {
     ConsumerImplPtr ptr = shared_from_this();
 
     if (consumerCreatedPromise_.setFailed(result)) {
+        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -270,15 +274,15 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
 void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO(getName() << "Unsubscribing");
 
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         callback(ResultAlreadyClosed);
         LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, 
please call subscribe again and "
                                "then call unsubscribe");
         return;
     }
 
-    Lock lock(mutex_);
-
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << 
consumerId_);
@@ -299,6 +303,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback 
callback) {
 
 void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
     if (result == ResultOk) {
+        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Unsubscribed successfully");
     } else {
@@ -744,10 +749,12 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& 
callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
+    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -765,10 +772,12 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& 
callback) {
 }
 
 Result ConsumerImpl::receiveHelper(Message& msg) {
-    if (state_ != Ready) {
-        return ResultAlreadyClosed;
+    {
+        Lock lock(mutex_);
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
     }
-
     if (messageListener_) {
         LOG_ERROR(getName() << "Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
@@ -795,8 +804,11 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
         return ResultInvalidConfiguration;
     }
 
-    if (state_ != Ready) {
-        return ResultAlreadyClosed;
+    {
+        Lock lock(mutex_);
+        if (state_ != Ready) {
+            return ResultAlreadyClosed;
+        }
     }
 
     if (messageListener_) {
@@ -971,10 +983,13 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
+    Lock lock(mutex_);
+
     // Keep a reference to ensure object is kept alive
     ConsumerImplPtr ptr = shared_from_this();
 
     if (state_ != Ready) {
+        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -992,6 +1007,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
+        lock.unlock();
         // If connection is gone, also the consumer is closed on the broker 
side
         if (callback) {
             callback(ResultOk);
@@ -1002,6 +1018,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
+        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -1009,6 +1026,8 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
+    // Lock is no longer required
+    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
@@ -1024,7 +1043,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, 
ConsumerImplPtr consumer) {
     if (result == ResultOk) {
+        Lock lock(mutex_);
         state_ = Closed;
+        lock.unlock();
 
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
@@ -1044,14 +1065,22 @@ void ConsumerImpl::handleClose(Result result, 
ResultCallback callback, ConsumerI
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
+    Lock lock(mutex_);
     state_ = Closed;
+    lock.unlock();
 
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
 }
 
-bool ConsumerImpl::isClosed() { return state_ == Closed; }
+bool ConsumerImpl::isClosed() {
+    Lock lock(mutex_);
+    return state_ == Closed;
+}
 
-bool ConsumerImpl::isOpen() { return state_ == Ready; }
+bool ConsumerImpl::isOpen() {
+    Lock lock(mutex_);
+    return state_ == Ready;
+}
 
 Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
@@ -1114,13 +1143,14 @@ void ConsumerImpl::redeliverMessages(const 
std::set<MessageId>& messageIds) {
 int ConsumerImpl::getNumOfPrefetchedMessages() const { return 
incomingMessages_.size(); }
 
 void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
         LOG_ERROR(getName() << "Client connection is not open, please try 
again later.")
+        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
 
-    Lock lock(mutex_);
     if (brokerConsumerStats_.isValid()) {
         LOG_DEBUG(getName() << "Serving data from cache");
         BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1180,14 +1210,16 @@ void ConsumerImpl::handleSeek(Result result, 
ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    const auto state = state_.load();
-    if (state == Closed || state == Closing) {
+    Lock lock(mutex_);
+    if (state_ == Closed || state_ == Closing) {
+        lock.unlock();
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
+    lock.unlock();
 
     this->ackGroupingTrackerPtr_->flushAndClean();
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1211,14 +1243,16 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    const auto state = state_.load();
-    if (state == Closed || state == Closing) {
+    Lock lock(mutex_);
+    if (state_ == Closed || state_ == Closing) {
+        lock.unlock();
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
+    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1282,14 +1316,16 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 
 void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback 
callback) {
-    const auto state = state_.load();
-    if (state == Closed || state == Closing) {
+    Lock lock(mutex_);
+    if (state_ == Closed || state_ == Closing) {
+        lock.unlock();
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed, MessageId());
         }
         return;
     }
+    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1335,7 +1371,10 @@ void ConsumerImpl::trackMessage(const MessageId& 
messageId) {
     }
 }
 
-bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ 
== Ready; }
+bool ConsumerImpl::isConnected() const {
+    Lock lock(mutex_);
+    return !getCnx().expired() && state_ == Ready;
+}
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc 
b/pulsar-client-cpp/lib/HandlerBase.cc
index 5d2244f7552..d7025ad004b 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,9 +43,12 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
 void HandlerBase::start() {
+    Lock lock(mutex_);
     // guard against concurrent state changes such as closing
-    State state = NotStarted;
-    if (state_.compare_exchange_strong(state, Pending)) {
+    if (state_ == NotStarted) {
+        state_ = Pending;
+        lock.unlock();
+
         grabCnx();
     }
 }
@@ -94,6 +97,7 @@ void HandlerBase::handleDisconnection(Result result, 
ClientConnectionWeakPtr con
         return;
     }
 
+    Lock lock(handler->mutex_);
     State state = handler->state_;
 
     ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -131,8 +135,7 @@ bool HandlerBase::isRetriableError(Result result) {
 }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
-    const auto state = handler->state_.load();
-    if (state == Pending || state == Ready) {
+    if (handler->state_ == Pending || handler->state_ == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
         LOG_INFO(handler->getName() << "Schedule reconnection in " << 
(delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h 
b/pulsar-client-cpp/lib/HandlerBase.h
index 1184746da21..eeb8ebe1c5e 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
         Failed
     };
 
-    std::atomic<State> state_;
+    State state_;
     Backoff backoff_;
     uint64_t epoch_;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 293ece4c838..0ae86d5879a 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -56,8 +56,7 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        MultiTopicsConsumerState state = Pending;
-        if (state_.compare_exchange_strong(state, Ready)) {
+        if (compareAndSetState(Pending, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
             return;
@@ -85,15 +84,14 @@ void 
MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     (*topicsNeedCreate)--;
 
     if (result != ResultOk) {
-        state_ = Failed;
+        setState(Failed);
         LOG_ERROR("Failed when subscribed to topic " << topic << " in 
TopicsConsumer. Error - " << result);
     }
 
     LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
 
     if (topicsNeedCreate->load() == 0) {
-        MultiTopicsConsumerState state = Pending;
-        if (state_.compare_exchange_strong(state, Ready)) {
+        if (compareAndSetState(Pending, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
@@ -117,8 +115,7 @@ Future<Result, Consumer> 
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    const auto state = state_.load();
-    if (state == Closed || state == Closing) {
+    if (state_ == Closed || state_ == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
         return topicPromise->getFuture();
@@ -225,13 +222,15 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] 
Unsubscribing");
 
-    const auto state = state_.load();
-    if (state == Closing || state == Closed) {
+    Lock lock(mutex_);
+    if (state_ == Closing || state_ == Closed) {
         LOG_INFO(consumerStr_ << " already closed");
+        lock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
     state_ = Closing;
+    lock.unlock();
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = 
std::make_shared<std::atomic<int>>(0);
     auto self = shared_from_this();
@@ -255,7 +254,7 @@ void 
MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        state_ = Failed;
+        setState(Failed);
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
                   << result << " subscription - " << subscriptionName_);
     }
@@ -267,7 +266,7 @@ void 
MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
         unAckedMessageTrackerPtr_->clear();
 
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        state_ = Closed;
+        setState(Closed);
         callback(result1);
         return;
     }
@@ -282,8 +281,7 @@ void 
MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         return;
     }
 
-    const auto state = state_.load();
-    if (state == Closing || state == Closed) {
+    if (state_ == Closing || state_ == Closed) {
         LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << 
topic << " subscription - "
                                                                            << 
subscriptionName_);
         callback(ResultAlreadyClosed);
@@ -320,7 +318,7 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        state_ = Failed;
+        setState(Failed);
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
                   << result << " topicPartitionName - " << topicPartitionName);
     }
@@ -352,8 +350,7 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-    const auto state = state_.load();
-    if (state == Closing || state == Closed) {
+    if (state_ == Closing || state_ == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
         if (callback) {
@@ -362,7 +359,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
         return;
     }
 
-    state_ = Closing;
+    setState(Closing);
 
     auto self = shared_from_this();
     int numConsumers = 0;
@@ -376,7 +373,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << 
subscriptionName_);
-        state_ = Closed;
+        setState(Closed);
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -398,7 +395,7 @@ void 
MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
     numberTopicPartitions_->fetch_sub(1);
 
     if (result != ResultOk) {
-        state_ = Failed;
+        setState(Failed);
         LOG_ERROR("Closing the consumer failed for partition - " << 
topicPartitionName << " with error - "
                                                                  << result);
     }
@@ -460,14 +457,18 @@ void MultiTopicsConsumerImpl::internalListener(Consumer 
consumer) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
+        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
+    lock.unlock();
     messages_.pop(msg);
 
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -475,15 +476,19 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
+        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
 
+    lock.unlock();
     if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -496,10 +501,12 @@ void 
MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
+    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -564,11 +571,30 @@ const std::string& MultiTopicsConsumerImpl::getTopic() 
const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return 
consumerStr_; }
 
+void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
+    Lock lock(mutex_);
+    state_ = state;
+}
+
+bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState 
expect,
+                                                 MultiTopicsConsumerState 
update) {
+    Lock lock(mutex_);
+    if (state_ == expect) {
+        state_ = update;
+        return true;
+    } else {
+        return false;
+    }
+}
+
 void MultiTopicsConsumerImpl::shutdown() {}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
+bool MultiTopicsConsumerImpl::isOpen() {
+    Lock lock(mutex_);
+    return state_ == Ready;
+}
 
 void MultiTopicsConsumerImpl::receiveMessages() {
     const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -618,11 +644,12 @@ void 
MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
 
 void 
MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
 callback) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
-    Lock lock(mutex_);
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = 
std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -688,9 +715,11 @@ void 
MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
+    Lock lock(mutex_);
     if (state_ != Ready) {
         return false;
     }
+    lock.unlock();
 
     return consumers_
         .findFirstValueIf([](const ConsumerImplPtr& consumer) { return 
!consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d2ede4a8770..98b2f318af9 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -99,7 +99,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    std::atomic<MultiTopicsConsumerState> state_{Pending};
+    MultiTopicsConsumerState state_ = Pending;
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
@@ -111,6 +111,9 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
+    void setState(MultiTopicsConsumerState state);
+    bool compareAndSetState(MultiTopicsConsumerState expect, 
MultiTopicsConsumerState update);
+
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
     void handleSingleConsumerClose(Result result, std::string 
topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 0dc9135be59..e43b5090e43 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -68,10 +68,13 @@ const std::string& 
PartitionedConsumerImpl::getSubscriptionName() const { return
 const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; }
 
 Result PartitionedConsumerImpl::receive(Message& msg) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         return ResultAlreadyClosed;
     }
     // See comments in `receive(Message&, int)`
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -84,9 +87,15 @@ Result PartitionedConsumerImpl::receive(Message& msg) {
 }
 
 Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         return ResultAlreadyClosed;
     }
+    // We unlocked `mutex_` here to avoid starvation of methods which are 
trying to acquire `mutex_`.
+    // In addition, `messageListener_` won't change once constructed, 
`BlockingQueue::pop` and
+    // `UnAckedMessageTracker::add` are thread-safe, so they don't need 
`mutex_` to achieve thread-safety.
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -105,10 +114,12 @@ void 
PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
+    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -123,23 +134,29 @@ void 
PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] 
Unsubscribing");
     // change state to Closing, so that no Ready state operation is permitted 
during unsubscribe
-    state_ = Closing;
+    setState(Closing);
     // do not accept un subscribe until we have subscribe to all of the 
partitions of a topic
     // it's a logical single topic so it should behave like a single topic, 
even if it's sharded
-    unsigned int index = 0;
-    for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
-         consumer++) {
-        LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - 
" << subscriptionName_
-                                             << " for Topic - " << 
topicName_->toString());
-        
(*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
-                                                shared_from_this(), 
std::placeholders::_1, index++,
-                                                callback));
+    Lock lock(mutex_);
+    if (state_ != Ready) {
+        lock.unlock();
+        unsigned int index = 0;
+        for (ConsumerList::const_iterator consumer = consumers_.begin(); 
consumer != consumers_.end();
+             consumer++) {
+            LOG_DEBUG("Unsubcribing Consumer - " << index << " for 
Subscription - " << subscriptionName_
+                                                 << " for Topic - " << 
topicName_->toString());
+            
(*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
+                                                    shared_from_this(), 
std::placeholders::_1, index++,
+                                                    callback));
+        }
     }
 }
 
 void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned 
int consumerIndex,
                                                      ResultCallback callback) {
+    Lock lock(mutex_);
     if (state_ == Failed) {
+        lock.unlock();
         // we have already informed the client that unsubcribe has failed so, 
ignore this callbacks
         // or do we still go ahead and check how many could we close 
successfully?
         LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State 
for consumerIndex - "
@@ -147,8 +164,9 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result 
result, unsigned int
                   << subscriptionName_ << " for Topic - " << 
topicName_->toString());
         return;
     }
+    lock.unlock();
     if (result != ResultOk) {
-        state_ = Failed;
+        setState(Failed);
         LOG_ERROR("Error Closing one of the parition consumers, consumerIndex 
- " << consumerIndex);
         callback(ResultUnknownError);
         return;
@@ -163,7 +181,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result 
result, unsigned int
     unsubscribedSoFar_++;
     if (unsubscribedSoFar_ == numPartitions) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for subscription 
- " << subscriptionName_);
-        state_ = Closed;
+        setState(Closed);
         callback(ResultOk);
         return;
     }
@@ -258,6 +276,7 @@ void PartitionedConsumerImpl::start() {
 void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned 
int partitionIndex) {
     ResultCallback nullCallbackForCleanup = NULL;
+    Lock lock(mutex_);
     if (state_ == Failed) {
         // one of the consumer creation failed, and we are cleaning up
         return;
@@ -267,6 +286,7 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
     if (result != ResultOk) {
         state_ = Failed;
+        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned 
consumers
         closeAsync(nullCallbackForCleanup);
@@ -275,13 +295,12 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     }
 
     assert(partitionIndex < numPartitions && partitionIndex >= 0);
-    Lock lock(mutex_);
     numConsumersCreated_++;
-    lock.unlock();
     if (numConsumersCreated_ == numPartitions) {
         LOG_INFO("Successfully Subscribed to Partitioned Topic - " << 
topicName_->toString() << " with - "
                                                                    << 
numPartitions << " Partitions.");
         state_ = Ready;
+        lock.unlock();
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
@@ -293,6 +312,7 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
 void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result 
result, unsigned int partitionIndex,
                                                                  CloseCallback 
callback) {
+    Lock lock(mutex_);
     if (state_ == Failed) {
         // we should have already notified the client by callback
         return;
@@ -300,6 +320,7 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     if (result != ResultOk) {
         state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << 
partitionIndex);
+        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         if (callback) {
             callback(result);
@@ -307,14 +328,13 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
         return;
     }
     assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
-    Lock lock(mutex_);
     if (numConsumersCreated_ > 0) {
         numConsumersCreated_--;
     }
-    lock.unlock();
     // closed all successfully
     if (!numConsumersCreated_) {
         state_ = Closed;
+        lock.unlock();
         // set the producerCreatedPromise to failure
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
         if (callback) {
@@ -324,12 +344,11 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     }
 }
 void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(consumersMutex_);
     if (consumers_.empty()) {
         notifyResult(callback);
         return;
     }
-    state_ = Closed;
+    setState(Closed);
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because 
`consumers_` can only be increased
@@ -357,20 +376,29 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback 
callback) {
 void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
     if (closeCallback) {
         // this means client invoked the closeAsync with a valid callback
-        state_ = Closed;
+        setState(Closed);
         closeCallback(ResultOk);
     } else {
         // consumer create failed, closeAsync called to cleanup the 
successfully created producers
-        state_ = Failed;
+        setState(Failed);
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
     }
 }
 
+void PartitionedConsumerImpl::setState(const PartitionedConsumerState state) {
+    Lock lock(mutex_);
+    state_ = state;
+    lock.unlock();
+}
+
 void PartitionedConsumerImpl::shutdown() {}
 
 bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; }
+bool PartitionedConsumerImpl::isOpen() {
+    Lock lock(mutex_);
+    return state_ == Ready;
+}
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
     LOG_DEBUG("Received Message from one of the partition - " << 
msg.impl_->messageId.partition());
@@ -474,7 +502,9 @@ const std::string& PartitionedConsumerImpl::getName() const 
{ return partitionSt
 int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
 
 void 
PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
 callback) {
+    Lock lock(mutex_);
     if (state_ != Ready) {
+        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
@@ -483,6 +513,7 @@ void 
PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
         std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
     LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
     ConsumerList consumerList = consumers_;
+    lock.unlock();
     for (int i = 0; i < consumerList.size(); i++) {
         consumerList[i]->getBrokerConsumerStatsAsync(
             std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, 
shared_from_this(),
@@ -514,13 +545,16 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& 
msgId, ResultCallback c
 }
 
 void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback 
callback) {
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
+        stateLock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
 
     // consumers_ could only be modified when state_ is Ready, so we needn't 
lock consumersMutex_ here
     ConsumerList consumerList = consumers_;
+    stateLock.unlock();
 
     MultiResultCallback multiResultCallback(callback, consumers_.size());
     for (ConsumerList::const_iterator i = consumerList.begin(); i != 
consumerList.end(); i++) {
@@ -543,6 +577,7 @@ void PartitionedConsumerImpl::getPartitionMetadata() {
 
 void PartitionedConsumerImpl::handleGetPartitions(Result result,
                                                   const LookupDataResultPtr& 
lookupDataResult) {
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return;
     }
@@ -579,9 +614,11 @@ void 
PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool PartitionedConsumerImpl::isConnected() const {
+    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return false;
     }
+    stateLock.unlock();
 
     Lock consumersLock(consumersMutex_);
     const auto consumers = consumers_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 8f4faf09954..7fa0ccdd1f4 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -92,7 +92,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     mutable std::mutex consumersMutex_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    std::atomic<PartitionedConsumerState> state_{Pending};
+    PartitionedConsumerState state_ = Pending;
     unsigned int unsubscribedSoFar_ = 0;
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
@@ -109,6 +109,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     unsigned int getNumPartitionsWithLock() const;
     ConsumerConfiguration getSinglePartitionConsumerConfig() const;
     ConsumerImplPtr newInternalConsumer(unsigned int partition, const 
ConsumerConfiguration& config) const;
+    void setState(PartitionedConsumerState state);
     void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, 
ResultCallback callback);
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 79ed1969d78..34e912d4ddc 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,9 +55,8 @@ void 
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
         return;
     }
 
-    const auto state = state_.load();
-    if (state != Ready) {
-        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state);
+    if (state_ != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state_);
         resetAutoDiscoveryTimer();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 352b36a0b8f..a539889ac00 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -137,10 +137,13 @@ void ProducerImpl::refreshEncryptionKey(const 
boost::system::error_code& ec) {
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
+    Lock lock(mutex_);
     if (state_ == Closed) {
+        lock.unlock();
         LOG_DEBUG(getName() << "connectionOpened : Producer is already 
closed");
         return;
     }
+    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
@@ -162,6 +165,7 @@ void ProducerImpl::connectionFailed(Result result) {
         // so don't change the state and allow reconnections
         return;
     } else if (producerCreatedPromise_.setFailed(result)) {
+        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -172,15 +176,14 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
 
     // make sure we're still in the Pending/Ready state, closeAsync could have 
been invoked
     // while waiting for this response if using lazy producers
-    const auto state = state_.load();
-    if (state != Ready && state != Pending) {
+    Lock lock(mutex_);
+    if (state_ != Ready && state_ != Pending) {
         LOG_DEBUG("Producer created response received but producer already 
closed");
         failPendingMessages(ResultAlreadyClosed, false);
         return;
     }
 
     if (result == ResultOk) {
-        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. 
Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << 
cnx->cnxString());
@@ -215,6 +218,8 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
+        lock.unlock();
+
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the 
broker closes the producer
@@ -244,6 +249,7 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
                 LOG_ERROR(getName() << "Failed to create producer: " << 
strResult(result));
                 failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
+                Lock lock(mutex_);
                 state_ = Failed;
             }
         }
@@ -321,8 +327,9 @@ void ProducerImpl::setMessageMetadata(const Message& msg, 
const uint64_t& sequen
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
+        Lock lock(mutex_);
+
         if (state_ == Ready) {
-            Lock lock(mutex_);
             auto failures = batchMessageAndSend(callback);
             lock.unlock();
             failures.complete();
@@ -336,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
+        Lock lock(mutex_);
         if (state_ == Ready) {
-            Lock lock(mutex_);
             auto failures = batchMessageAndSend();
             lock.unlock();
             failures.complete();
@@ -607,9 +614,8 @@ void ProducerImpl::batchMessageTimeoutHandler(const 
boost::system::error_code& e
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
 
     // ignore if the producer is already closing/closed
-    const auto state = state_.load();
-    if (state == Pending || state == Ready) {
-        Lock lock(mutex_);
+    Lock lock(mutex_);
+    if (state_ == Pending || state_ == Ready) {
         auto failures = batchMessageAndSend();
         lock.unlock();
         failures.complete();
@@ -626,9 +632,11 @@ void ProducerImpl::printStats() {
 }
 
 void ProducerImpl::closeAsync(CloseCallback callback) {
+    Lock lock(mutex_);
+
     // if the producer was never started then there is nothing to clean up
-    State expectedState = NotStarted;
-    if (state_.compare_exchange_strong(expectedState, Closed)) {
+    if (state_ == NotStarted) {
+        state_ = Closed;
         callback(ResultOk);
         return;
     }
@@ -641,11 +649,9 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // ensure any remaining send callbacks are called before calling the close 
callback
     failPendingMessages(ResultAlreadyClosed, false);
 
-    // TODO  maybe we need a loop here to implement CAS for a condition,
-    // just like Java's `getAndUpdate` method on an atomic variable
-    const auto state = state_.load();
-    if (state != Ready && state != Pending) {
+    if (state_ != Ready && state_ != Pending) {
         state_ = Closed;
+        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -658,7 +664,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-
+        lock.unlock();
         if (callback) {
             callback(ResultOk);
         }
@@ -672,6 +678,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
+        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -679,6 +686,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
+    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
@@ -691,6 +699,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
 void ProducerImpl::handleClose(Result result, ResultCallback callback, 
ProducerImplPtr producer) {
     if (result == ResultOk) {
+        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Closed producer");
         ClientConnectionPtr cnx = getCnx().lock();
@@ -713,11 +722,10 @@ Future<Result, ProducerImplBaseWeakPtr> 
ProducerImpl::getProducerCreatedFuture()
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
-    const auto state = state_.load();
-    if (state != Pending && state != Ready) {
+    Lock lock(mutex_);
+    if (state_ != Pending && state_ != Ready) {
         return;
     }
-    Lock lock(mutex_);
 
     if (err == boost::asio::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -888,13 +896,22 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& 
a, const ProducerImplPtr
     return a->getProducerId() < b->getProducerId();
 }
 
-bool ProducerImpl::isClosed() { return state_ == Closed; }
+bool ProducerImpl::isClosed() {
+    Lock lock(mutex_);
+    return state_ == Closed;
+}
 
-bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ 
== Ready; }
+bool ProducerImpl::isConnected() const {
+    Lock lock(mutex_);
+    return !getCnx().expired() && state_ == Ready;
+}
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 
1 : 0; }
 
-bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
+bool ProducerImpl::isStarted() const {
+    Lock lock(mutex_);
+    return state_ != NotStarted;
+}
 void ProducerImpl::startSendTimeoutTimer() {
     // Initialize the sendTimer only once per producer and only when producer 
timeout is
     // configured. Set the timeout as configured value and asynchronously wait 
for the

Reply via email to