This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 492d92b Change state_ to closed when resultOk is returned (#5446)
492d92b is described below
commit 492d92bd2bc14f8a4eed22bc9208d86141dc1c95
Author: hrsakai <[email protected]>
AuthorDate: Wed Oct 23 19:39:15 2019 +0900
Change state_ to closed when resultOk is returned (#5446)
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +++++-
pulsar-client-cpp/lib/ProducerImpl.cc | 6 +++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1c616fe..1313a05 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -837,8 +837,12 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
+ LOG_INFO(getName() << "Closing consumer for topic " << topic_);
+ state_ = Closing;
+
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
+ state_ = Closed;
lock.unlock();
// If connection is gone, also the consumer is closed on the broker
side
if (callback) {
@@ -847,9 +851,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- LOG_INFO(getName() << "Closing consumer for topic " << topic_);
ClientImplPtr client = client_.lock();
if (!client) {
+ state_ = Closed;
lock.unlock();
// Client was already destroyed
if (callback) {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 220a9f8..8159c8f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -492,6 +492,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
+ state_ = Closed;
lock.unlock();
if (callback) {
callback(ResultOk);
@@ -502,16 +503,19 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
// Detach the producer from the connection to avoid sending any other
// message from the producer
connection_.reset();
- lock.unlock();
ClientImplPtr client = client_.lock();
if (!client) {
+ state_ = Closed;
+ lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
}
return;
}
+
+ lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_,
requestId), requestId);