This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c42239d72edbdb802767ddcc8cd119c002548f29
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Aug 13 10:36:51 2022 +0800

    [improve][client-c++] Use an atomic `state_` instead of the lock to improve 
performance (#16940)
    
    ### Motivation
    
    Now, Use a lot of locks to ensure the atomicity of `state_` in the 
`ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and 
`MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to 
improve performance.
    
    ### Modifications
    
    Use an atomic `state_` instead of the lock to improve performance.
    
    (cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 75 +++++----------------
 pulsar-client-cpp/lib/HandlerBase.cc               | 11 ++--
 pulsar-client-cpp/lib/HandlerBase.h                |  2 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 71 ++++++--------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  5 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 77 ++++++----------------
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  3 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          |  5 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              | 63 +++++++-----------
 9 files changed, 91 insertions(+), 221 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index b5b5ceb046f..19ef3501d6c 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -160,10 +160,7 @@ void ConsumerImpl::start() {
 }
 
 void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
-    const auto state = state_;
-    lock.unlock();
-    if (state == Closed) {
+    if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already 
closed");
         return;
     }
@@ -202,7 +199,6 @@ void ConsumerImpl::connectionFailed(Result result) {
     ConsumerImplPtr ptr = shared_from_this();
 
     if (consumerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -274,15 +270,15 @@ void ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result r
 void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO(getName() << "Unsubscribing");
 
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultAlreadyClosed);
         LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, 
please call subscribe again and "
                                "then call unsubscribe");
         return;
     }
 
+    Lock lock(mutex_);
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << 
consumerId_);
@@ -303,7 +299,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback 
callback) {
 
 void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Unsubscribed successfully");
     } else {
@@ -749,12 +744,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& 
callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -772,12 +765,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& 
callback) {
 }
 
 Result ConsumerImpl::receiveHelper(Message& msg) {
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
+
     if (messageListener_) {
         LOG_ERROR(getName() << "Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
@@ -804,11 +795,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
         return ResultInvalidConfiguration;
     }
 
-    {
-        Lock lock(mutex_);
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
+    if (state_ != Ready) {
+        return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
@@ -983,13 +971,10 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(mutex_);
-
     // Keep a reference to ensure object is kept alive
     ConsumerImplPtr ptr = shared_from_this();
 
     if (state_ != Ready) {
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -1007,7 +992,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
         // If connection is gone, also the consumer is closed on the broker 
side
         if (callback) {
             callback(ResultOk);
@@ -1018,7 +1002,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -1026,8 +1009,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    // Lock is no longer required
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
@@ -1043,9 +1024,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
 void ConsumerImpl::handleClose(Result result, ResultCallback callback, 
ConsumerImplPtr consumer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
-        lock.unlock();
 
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
@@ -1065,22 +1044,14 @@ void ConsumerImpl::handleClose(Result result, 
ResultCallback callback, ConsumerI
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
 
 void ConsumerImpl::shutdown() {
-    Lock lock(mutex_);
     state_ = Closed;
-    lock.unlock();
 
     consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
 }
 
-bool ConsumerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool ConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool ConsumerImpl::isOpen() { return state_ == Ready; }
 
 Result ConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
@@ -1143,14 +1114,13 @@ void ConsumerImpl::redeliverMessages(const 
std::set<MessageId>& messageIds) {
 int ConsumerImpl::getNumOfPrefetchedMessages() const { return 
incomingMessages_.size(); }
 
 void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback 
callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         LOG_ERROR(getName() << "Client connection is not open, please try 
again later.")
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
 
+    Lock lock(mutex_);
     if (brokerConsumerStats_.isValid()) {
         LOG_DEBUG(getName() << "Serving data from cache");
         BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1210,16 +1180,14 @@ void ConsumerImpl::handleSeek(Result result, 
ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     this->ackGroupingTrackerPtr_->flushAndClean();
     ClientConnectionPtr cnx = getCnx().lock();
@@ -1243,16 +1211,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
ResultCallback callback) {
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed);
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1316,16 +1282,14 @@ void 
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
 }
 
 void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback 
callback) {
-    Lock lock(mutex_);
-    if (state_ == Closed || state_ == Closing) {
-        lock.unlock();
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
         if (callback) {
             callback(ResultAlreadyClosed, MessageId());
         }
         return;
     }
-    lock.unlock();
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
@@ -1371,10 +1335,7 @@ void ConsumerImpl::trackMessage(const MessageId& 
messageId) {
     }
 }
 
-bool ConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ 
== Ready; }
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc 
b/pulsar-client-cpp/lib/HandlerBase.cc
index d7025ad004b..5d2244f7552 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
 void HandlerBase::start() {
-    Lock lock(mutex_);
     // guard against concurrent state changes such as closing
-    if (state_ == NotStarted) {
-        state_ = Pending;
-        lock.unlock();
-
+    State state = NotStarted;
+    if (state_.compare_exchange_strong(state, Pending)) {
         grabCnx();
     }
 }
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, 
ClientConnectionWeakPtr con
         return;
     }
 
-    Lock lock(handler->mutex_);
     State state = handler->state_;
 
     ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
 }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
-    if (handler->state_ == Pending || handler->state_ == Ready) {
+    const auto state = handler->state_.load();
+    if (state == Pending || state == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
         LOG_INFO(handler->getName() << "Schedule reconnection in " << 
(delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h 
b/pulsar-client-cpp/lib/HandlerBase.h
index eeb8ebe1c5e..1184746da21 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
         Failed
     };
 
-    State state_;
+    std::atomic<State> state_;
     Backoff backoff_;
     uint64_t epoch_;
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 0ae86d5879a..293ece4c838 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -56,7 +56,8 @@ 
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
 
 void MultiTopicsConsumerImpl::start() {
     if (topics_.empty()) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
             return;
@@ -84,14 +85,15 @@ void 
MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
     (*topicsNeedCreate)--;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Failed when subscribed to topic " << topic << " in 
TopicsConsumer. Error - " << result);
     }
 
     LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
 
     if (topicsNeedCreate->load() == 0) {
-        if (compareAndSetState(Pending, Ready)) {
+        MultiTopicsConsumerState state = Pending;
+        if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
             multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
         } else {
@@ -115,7 +117,8 @@ Future<Result, Consumer> 
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
         return topicPromise->getFuture();
     }
 
-    if (state_ == Closed || state_ == Closing) {
+    const auto state = state_.load();
+    if (state == Closed || state == Closing) {
         LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
         topicPromise->setFailed(ResultAlreadyClosed);
         return topicPromise->getFuture();
@@ -222,15 +225,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
 void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] 
Unsubscribing");
 
-    Lock lock(mutex_);
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_INFO(consumerStr_ << " already closed");
-        lock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
     state_ = Closing;
-    lock.unlock();
 
     std::shared_ptr<std::atomic<int>> consumerUnsubed = 
std::make_shared<std::atomic<int>>(0);
     auto self = shared_from_this();
@@ -254,7 +255,7 @@ void 
MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
                   << result << " subscription - " << subscriptionName_);
     }
@@ -266,7 +267,7 @@ void 
MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
         unAckedMessageTrackerPtr_->clear();
 
         Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
-        setState(Closed);
+        state_ = Closed;
         callback(result1);
         return;
     }
@@ -281,7 +282,8 @@ void 
MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         return;
     }
 
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << 
topic << " subscription - "
                                                                            << 
subscriptionName_);
         callback(ResultAlreadyClosed);
@@ -318,7 +320,7 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
     (*consumerUnsubed)++;
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, 
result: "
                   << result << " topicPartitionName - " << topicPartitionName);
     }
@@ -350,7 +352,8 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-    if (state_ == Closing || state_ == Closed) {
+    const auto state = state_.load();
+    if (state == Closing || state == Closed) {
         LOG_ERROR("TopicsConsumer already closed "
                   << " topic" << topic_ << " consumer - " << consumerStr_);
         if (callback) {
@@ -359,7 +362,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
         return;
     }
 
-    setState(Closing);
+    state_ = Closing;
 
     auto self = shared_from_this();
     int numConsumers = 0;
@@ -373,7 +376,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
     if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << 
subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -395,7 +398,7 @@ void 
MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
     numberTopicPartitions_->fetch_sub(1);
 
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << 
topicPartitionName << " with error - "
                                                                  << result);
     }
@@ -457,18 +460,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer 
consumer) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
-    lock.unlock();
     messages_.pop(msg);
 
     unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -476,19 +475,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
 }
 
 Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
 
     if (messageListener_) {
-        lock.unlock();
         LOG_ERROR("Can not receive when a listener has been set");
         return ResultInvalidConfiguration;
     }
 
-    lock.unlock();
     if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
@@ -501,12 +496,10 @@ void 
MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -571,30 +564,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() 
const { return topic_; }
 
 const std::string& MultiTopicsConsumerImpl::getName() const { return 
consumerStr_; }
 
-void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-}
-
-bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState 
expect,
-                                                 MultiTopicsConsumerState 
update) {
-    Lock lock(mutex_);
-    if (state_ == expect) {
-        state_ = update;
-        return true;
-    } else {
-        return false;
-    }
-}
-
 void MultiTopicsConsumerImpl::shutdown() {}
 
 bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool MultiTopicsConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
 
 void MultiTopicsConsumerImpl::receiveMessages() {
     const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -644,12 +618,11 @@ void 
MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
 
 void 
MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
 callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    Lock lock(mutex_);
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = 
std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -715,11 +688,9 @@ void 
MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
-    Lock lock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    lock.unlock();
 
     return consumers_
         .findFirstValueIf([](const ConsumerImplPtr& consumer) { return 
!consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 98b2f318af9..d2ede4a8770 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -99,7 +99,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    MultiTopicsConsumerState state_ = Pending;
+    std::atomic<MultiTopicsConsumerState> state_{Pending};
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
@@ -111,9 +111,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
-    void setState(MultiTopicsConsumerState state);
-    bool compareAndSetState(MultiTopicsConsumerState expect, 
MultiTopicsConsumerState update);
-
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
     void handleSingleConsumerClose(Result result, std::string 
topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index e43b5090e43..0dc9135be59 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -68,13 +68,10 @@ const std::string& 
PartitionedConsumerImpl::getSubscriptionName() const { return
 const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; }
 
 Result PartitionedConsumerImpl::receive(Message& msg) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
     // See comments in `receive(Message&, int)`
-    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -87,15 +84,9 @@ Result PartitionedConsumerImpl::receive(Message& msg) {
 }
 
 Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         return ResultAlreadyClosed;
     }
-    // We unlocked `mutex_` here to avoid starvation of methods which are 
trying to acquire `mutex_`.
-    // In addition, `messageListener_` won't change once constructed, 
`BlockingQueue::pop` and
-    // `UnAckedMessageTracker::add` are thread-safe, so they don't need 
`mutex_` to achieve thread-safety.
-    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -114,12 +105,10 @@ void 
PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     Message msg;
 
     // fail the callback if consumer is closing or closed
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         callback(ResultAlreadyClosed, msg);
         return;
     }
-    stateLock.unlock();
 
     Lock lock(pendingReceiveMutex_);
     if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -134,29 +123,23 @@ void 
PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
 void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] 
Unsubscribing");
     // change state to Closing, so that no Ready state operation is permitted 
during unsubscribe
-    setState(Closing);
+    state_ = Closing;
     // do not accept un subscribe until we have subscribe to all of the 
partitions of a topic
     // it's a logical single topic so it should behave like a single topic, 
even if it's sharded
-    Lock lock(mutex_);
-    if (state_ != Ready) {
-        lock.unlock();
-        unsigned int index = 0;
-        for (ConsumerList::const_iterator consumer = consumers_.begin(); 
consumer != consumers_.end();
-             consumer++) {
-            LOG_DEBUG("Unsubcribing Consumer - " << index << " for 
Subscription - " << subscriptionName_
-                                                 << " for Topic - " << 
topicName_->toString());
-            
(*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
-                                                    shared_from_this(), 
std::placeholders::_1, index++,
-                                                    callback));
-        }
+    unsigned int index = 0;
+    for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer 
!= consumers_.end();
+         consumer++) {
+        LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - 
" << subscriptionName_
+                                             << " for Topic - " << 
topicName_->toString());
+        
(*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
+                                                shared_from_this(), 
std::placeholders::_1, index++,
+                                                callback));
     }
 }
 
 void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned 
int consumerIndex,
                                                      ResultCallback callback) {
-    Lock lock(mutex_);
     if (state_ == Failed) {
-        lock.unlock();
         // we have already informed the client that unsubcribe has failed so, 
ignore this callbacks
         // or do we still go ahead and check how many could we close 
successfully?
         LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State 
for consumerIndex - "
@@ -164,9 +147,8 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result 
result, unsigned int
                   << subscriptionName_ << " for Topic - " << 
topicName_->toString());
         return;
     }
-    lock.unlock();
     if (result != ResultOk) {
-        setState(Failed);
+        state_ = Failed;
         LOG_ERROR("Error Closing one of the parition consumers, consumerIndex 
- " << consumerIndex);
         callback(ResultUnknownError);
         return;
@@ -181,7 +163,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result 
result, unsigned int
     unsubscribedSoFar_++;
     if (unsubscribedSoFar_ == numPartitions) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for subscription 
- " << subscriptionName_);
-        setState(Closed);
+        state_ = Closed;
         callback(ResultOk);
         return;
     }
@@ -276,7 +258,6 @@ void PartitionedConsumerImpl::start() {
 void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned 
int partitionIndex) {
     ResultCallback nullCallbackForCleanup = NULL;
-    Lock lock(mutex_);
     if (state_ == Failed) {
         // one of the consumer creation failed, and we are cleaning up
         return;
@@ -286,7 +267,6 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
     if (result != ResultOk) {
         state_ = Failed;
-        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned 
consumers
         closeAsync(nullCallbackForCleanup);
@@ -295,12 +275,13 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
     }
 
     assert(partitionIndex < numPartitions && partitionIndex >= 0);
+    Lock lock(mutex_);
     numConsumersCreated_++;
+    lock.unlock();
     if (numConsumersCreated_ == numPartitions) {
         LOG_INFO("Successfully Subscribed to Partitioned Topic - " << 
topicName_->toString() << " with - "
                                                                    << 
numPartitions << " Partitions.");
         state_ = Ready;
-        lock.unlock();
         if (partitionsUpdateTimer_) {
             runPartitionUpdateTask();
         }
@@ -312,7 +293,6 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
 
 void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result 
result, unsigned int partitionIndex,
                                                                  CloseCallback 
callback) {
-    Lock lock(mutex_);
     if (state_ == Failed) {
         // we should have already notified the client by callback
         return;
@@ -320,7 +300,6 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     if (result != ResultOk) {
         state_ = Failed;
         LOG_ERROR("Closing the consumer failed for partition - " << 
partitionIndex);
-        lock.unlock();
         partitionedConsumerCreatedPromise_.setFailed(result);
         if (callback) {
             callback(result);
@@ -328,13 +307,14 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
         return;
     }
     assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
+    Lock lock(mutex_);
     if (numConsumersCreated_ > 0) {
         numConsumersCreated_--;
     }
+    lock.unlock();
     // closed all successfully
     if (!numConsumersCreated_) {
         state_ = Closed;
-        lock.unlock();
         // set the producerCreatedPromise to failure
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
         if (callback) {
@@ -344,11 +324,12 @@ void 
PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
     }
 }
 void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
+    Lock lock(consumersMutex_);
     if (consumers_.empty()) {
         notifyResult(callback);
         return;
     }
-    setState(Closed);
+    state_ = Closed;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because 
`consumers_` can only be increased
@@ -376,29 +357,20 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback 
callback) {
 void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
     if (closeCallback) {
         // this means client invoked the closeAsync with a valid callback
-        setState(Closed);
+        state_ = Closed;
         closeCallback(ResultOk);
     } else {
         // consumer create failed, closeAsync called to cleanup the 
successfully created producers
-        setState(Failed);
+        state_ = Failed;
         partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
     }
 }
 
-void PartitionedConsumerImpl::setState(const PartitionedConsumerState state) {
-    Lock lock(mutex_);
-    state_ = state;
-    lock.unlock();
-}
-
 void PartitionedConsumerImpl::shutdown() {}
 
 bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; }
 
-bool PartitionedConsumerImpl::isOpen() {
-    Lock lock(mutex_);
-    return state_ == Ready;
-}
+bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; }
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
     LOG_DEBUG("Received Message from one of the partition - " << 
msg.impl_->messageId.partition());
@@ -502,9 +474,7 @@ const std::string& PartitionedConsumerImpl::getName() const 
{ return partitionSt
 int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return 
messages_.size(); }
 
 void 
PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback
 callback) {
-    Lock lock(mutex_);
     if (state_ != Ready) {
-        lock.unlock();
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
@@ -513,7 +483,6 @@ void 
PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
         std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
     LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
     ConsumerList consumerList = consumers_;
-    lock.unlock();
     for (int i = 0; i < consumerList.size(); i++) {
         consumerList[i]->getBrokerConsumerStatsAsync(
             std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, 
shared_from_this(),
@@ -545,16 +514,13 @@ void PartitionedConsumerImpl::seekAsync(const MessageId& 
msgId, ResultCallback c
 }
 
 void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback 
callback) {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
-        stateLock.unlock();
         callback(ResultAlreadyClosed);
         return;
     }
 
     // consumers_ could only be modified when state_ is Ready, so we needn't 
lock consumersMutex_ here
     ConsumerList consumerList = consumers_;
-    stateLock.unlock();
 
     MultiResultCallback multiResultCallback(callback, consumers_.size());
     for (ConsumerList::const_iterator i = consumerList.begin(); i != 
consumerList.end(); i++) {
@@ -577,7 +543,6 @@ void PartitionedConsumerImpl::getPartitionMetadata() {
 
 void PartitionedConsumerImpl::handleGetPartitions(Result result,
                                                   const LookupDataResultPtr& 
lookupDataResult) {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return;
     }
@@ -614,11 +579,9 @@ void 
PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
 }
 
 bool PartitionedConsumerImpl::isConnected() const {
-    Lock stateLock(mutex_);
     if (state_ != Ready) {
         return false;
     }
-    stateLock.unlock();
 
     Lock consumersLock(consumersMutex_);
     const auto consumers = consumers_;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 7fa0ccdd1f4..8f4faf09954 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -92,7 +92,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     mutable std::mutex consumersMutex_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
-    PartitionedConsumerState state_ = Pending;
+    std::atomic<PartitionedConsumerState> state_{Pending};
     unsigned int unsubscribedSoFar_ = 0;
     BlockingQueue<Message> messages_;
     ExecutorServicePtr listenerExecutor_;
@@ -109,7 +109,6 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     unsigned int getNumPartitionsWithLock() const;
     ConsumerConfiguration getSinglePartitionConsumerConfig() const;
     ConsumerImplPtr newInternalConsumer(unsigned int partition, const 
ConsumerConfiguration& config) const;
-    void setState(PartitionedConsumerState state);
     void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, 
ResultCallback callback);
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 34e912d4ddc..79ed1969d78 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,8 +55,9 @@ void 
PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
         return;
     }
 
-    if (state_ != Ready) {
-        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state_);
+    const auto state = state_.load();
+    if (state != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state);
         resetAutoDiscoveryTimer();
         return;
     }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index a539889ac00..a9868bca97a 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -137,13 +137,10 @@ void ProducerImpl::refreshEncryptionKey(const 
boost::system::error_code& ec) {
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
-    Lock lock(mutex_);
     if (state_ == Closed) {
-        lock.unlock();
         LOG_DEBUG(getName() << "connectionOpened : Producer is already 
closed");
         return;
     }
-    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
@@ -165,7 +162,6 @@ void ProducerImpl::connectionFailed(Result result) {
         // so don't change the state and allow reconnections
         return;
     } else if (producerCreatedPromise_.setFailed(result)) {
-        Lock lock(mutex_);
         state_ = Failed;
     }
 }
@@ -176,14 +172,15 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
 
     // make sure we're still in the Pending/Ready state, closeAsync could have 
been invoked
     // while waiting for this response if using lazy producers
-    Lock lock(mutex_);
-    if (state_ != Ready && state_ != Pending) {
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already 
closed");
         failPendingMessages(ResultAlreadyClosed, false);
         return;
     }
 
     if (result == ResultOk) {
+        Lock lock(mutex_);
         // We are now reconnected to broker and clear to send messages. 
Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << 
cnx->cnxString());
@@ -218,8 +215,6 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
-        lock.unlock();
-
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the 
broker closes the producer
@@ -249,7 +244,6 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
                 LOG_ERROR(getName() << "Failed to create producer: " << 
strResult(result));
                 failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
-                Lock lock(mutex_);
                 state_ = Failed;
             }
         }
@@ -327,9 +321,8 @@ void ProducerImpl::setMessageMetadata(const Message& msg, 
const uint64_t& sequen
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
-
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend(callback);
             lock.unlock();
             failures.complete();
@@ -343,8 +336,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
-        Lock lock(mutex_);
         if (state_ == Ready) {
+            Lock lock(mutex_);
             auto failures = batchMessageAndSend();
             lock.unlock();
             failures.complete();
@@ -353,9 +346,7 @@ void ProducerImpl::triggerFlush() {
 }
 
 bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
-    Lock lock(mutex_);
-    const auto state = state_;
-    lock.unlock();
+    const auto state = state_.load();
     switch (state) {
         case HandlerBase::Ready:
             // OK
@@ -614,8 +605,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const 
boost::system::error_code& e
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
 
     // ignore if the producer is already closing/closed
-    Lock lock(mutex_);
-    if (state_ == Pending || state_ == Ready) {
+    const auto state = state_.load();
+    if (state == Pending || state == Ready) {
+        Lock lock(mutex_);
         auto failures = batchMessageAndSend();
         lock.unlock();
         failures.complete();
@@ -632,11 +624,9 @@ void ProducerImpl::printStats() {
 }
 
 void ProducerImpl::closeAsync(CloseCallback callback) {
-    Lock lock(mutex_);
-
     // if the producer was never started then there is nothing to clean up
-    if (state_ == NotStarted) {
-        state_ = Closed;
+    State expectedState = NotStarted;
+    if (state_.compare_exchange_strong(expectedState, Closed)) {
         callback(ResultOk);
         return;
     }
@@ -649,9 +639,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // ensure any remaining send callbacks are called before calling the close 
callback
     failPendingMessages(ResultAlreadyClosed, false);
 
-    if (state_ != Ready && state_ != Pending) {
+    // TODO  maybe we need a loop here to implement CAS for a condition,
+    // just like Java's `getAndUpdate` method on an atomic variable
+    const auto state = state_.load();
+    if (state != Ready && state != Pending) {
         state_ = Closed;
-        lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
@@ -664,7 +656,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         state_ = Closed;
-        lock.unlock();
+
         if (callback) {
             callback(ResultOk);
         }
@@ -678,7 +670,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     ClientImplPtr client = client_.lock();
     if (!client) {
         state_ = Closed;
-        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
@@ -686,7 +677,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
         return;
     }
 
-    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
@@ -699,7 +689,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
 void ProducerImpl::handleClose(Result result, ResultCallback callback, 
ProducerImplPtr producer) {
     if (result == ResultOk) {
-        Lock lock(mutex_);
         state_ = Closed;
         LOG_INFO(getName() << "Closed producer");
         ClientConnectionPtr cnx = getCnx().lock();
@@ -722,10 +711,11 @@ Future<Result, ProducerImplBaseWeakPtr> 
ProducerImpl::getProducerCreatedFuture()
 uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
-    Lock lock(mutex_);
-    if (state_ != Pending && state_ != Ready) {
+    const auto state = state_.load();
+    if (state != Pending && state != Ready) {
         return;
     }
+    Lock lock(mutex_);
 
     if (err == boost::asio::error::operation_aborted) {
         LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -896,22 +886,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& 
a, const ProducerImplPtr
     return a->getProducerId() < b->getProducerId();
 }
 
-bool ProducerImpl::isClosed() {
-    Lock lock(mutex_);
-    return state_ == Closed;
-}
+bool ProducerImpl::isClosed() { return state_ == Closed; }
 
-bool ProducerImpl::isConnected() const {
-    Lock lock(mutex_);
-    return !getCnx().expired() && state_ == Ready;
-}
+bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ 
== Ready; }
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 
1 : 0; }
 
-bool ProducerImpl::isStarted() const {
-    Lock lock(mutex_);
-    return state_ != NotStarted;
-}
+bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
 void ProducerImpl::startSendTimeoutTimer() {
     // Initialize the sendTimer only once per producer and only when producer 
timeout is
     // configured. Set the timeout as configured value and asynchronously wait 
for the

Reply via email to