jai1 commented on a change in pull request #577: CPP add receiveAsync API
URL: https://github.com/apache/incubator-pulsar/pull/577#discussion_r128685666
 
 

 ##########
 File path: pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
 ##########
 @@ -318,10 +345,44 @@ namespace pulsar {
 
     void PartitionedConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
         LOG_DEBUG("Received Message from one of the partition - " << 
msg.impl_->messageId.partition_);
-        messages_.push(msg);
-        if (messageListener_) {
-            
listenerExecutor_->postWork(boost::bind(&PartitionedConsumerImpl::internalListener,
 shared_from_this(), consumer));
+
+        // messages_ is a blocking queue: if queue is already full then no 
need of lock as receiveAsync already gets available-msg and no need to put 
request in pendingReceives_
+        Lock lock(pendingReceiveMutex_);
+        if(!pendingReceives_.empty()) {
+            ReceiveCallback callback = pendingReceives_.front();
+            pendingReceives_.pop();
+            lock.unlock();
+            
listenerExecutor_->postWork(boost::bind(&PartitionedConsumerImpl::notifyPendingReceivedCallback,
 shared_from_this(), ResultOk, msg, callback));
+        } else {
+            if(messages_.full()) {
+                lock.unlock();
+            }
+            messages_.push(msg);
+            if (messageListener_) {
+                
listenerExecutor_->postWork(boost::bind(&PartitionedConsumerImpl::internalListener,
 shared_from_this(), consumer));
+            }
         }
+
+    }
+
+    void PartitionedConsumerImpl::notifyPendingReceivedCallback(Result result, 
Message& msg, const ReceiveCallback& callback) {
+        if(result == ResultOk) {
+            unAckedMessageTrackerPtr_->add(msg.getMessageId());
+        }
+        callback(result, msg);
+    }
+
+    void PartitionedConsumerImpl::failPendingReceiveCallback() {
+        Message msg;
+        Lock lock(pendingReceiveMutex_);
+        while (!pendingReceives_.empty()) {
+            ReceiveCallback callback = pendingReceives_.front();
+            pendingReceives_.pop();
+            listenerExecutor_->postWork(
+                    
boost::bind(&PartitionedConsumerImpl::notifyPendingReceivedCallback, 
shared_from_this(),
 
 Review comment:
   Use an iterator and no need for notifyPendingReceivedCallback function, just 
call the callback
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to