RobertIndie commented on code in PR #54:
URL: https://github.com/apache/pulsar-client-cpp/pull/54#discussion_r1000363980


##########
lib/PartitionedProducerImpl.h:
##########
@@ -17,16 +17,16 @@
  * under the License.
  */
 #include "ProducerImpl.h"
-#include "ClientImpl.h"
 #include <vector>
-
 #include <mutex>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/TopicMetadata.h>
 #include <lib/TopicName.h>
 
 namespace pulsar {
 
+class ClientImpl;
+
 class PartitionedProducerImpl : public ProducerImplBase,

Review Comment:
   Seems we need to implement the `beforeConnectionChange` for the 
PartitionedProducerImpl.



##########
lib/ConsumerImpl.cc:
##########
@@ -1018,66 +1035,31 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
-        state_ = Closed;
         // If connection is gone, also the consumer is closed on the broker 
side
-        if (callback) {
-            callback(ResultOk);
-        }
+        callback(ResultOk);
         return;
     }
 
-    ClientImplPtr client = client_.lock();
-    if (!client) {
-        state_ = Closed;
-        // Client was already destroyed
-        if (callback) {
-            callback(ResultOk);
-        }
-        return;
-    }
+    int requestId = client_.newRequestId();
 
-    int requestId = client->newRequestId();
-    Future<Result, ResponseData> future =
-        cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
-    if (callback) {
-        // Pass the shared pointer "ptr" to the handler to prevent the object 
from being destroyed
-        future.addListener(std::bind(&ConsumerImpl::handleClose, 
get_shared_this_ptr(), std::placeholders::_1,
-                                     callback, ptr));
-    }
-
-    // fail pendingReceive callback
-    failPendingReceiveCallback();
-    failPendingBatchReceiveCallback();
+    auto self = get_shared_this_ptr();
+    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId)
+        .addListener([this, self, callback](Result result, const 
ResponseData&) { callback(result); });

Review Comment:
   `this` here seems not used.



##########
lib/ConsumerImpl.cc:
##########
@@ -990,20 +1002,25 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& 
messageId) {
 
 void ConsumerImpl::disconnectConsumer() {
     LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
-    Lock lock(mutex_);
-    connection_.reset();
-    lock.unlock();
+    resetCnx();
     scheduleReconnection(get_shared_this_ptr());
 }
 
-void ConsumerImpl::closeAsync(ResultCallback callback) {
-    // Keep a reference to ensure object is kept alive
-    ConsumerImplPtr ptr = get_shared_this_ptr();
+void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
+    auto callback = [this, originalCallback](Result result) {
+        shutdown();

Review Comment:
   I think the key point of the unsubscribe operation is to delete the 
subscription in the broker. 



##########
lib/AckGroupingTrackerDisabled.cc:
##########
@@ -22,6 +22,7 @@
 #include "HandlerBase.h"
 #include "PulsarApi.pb.h"
 #include <pulsar/MessageId.h>
+#include "LogUtils.h"

Review Comment:
   Is this change related to this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to