shibd commented on code in PR #228:
URL: https://github.com/apache/pulsar-client-cpp/pull/228#discussion_r1147174340


##########
lib/ConsumerImpl.cc:
##########
@@ -649,10 +649,11 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
 void ConsumerImpl::notifyBatchPendingReceivedCallback(const 
BatchReceiveCallback& callback) {
     auto messages = 
std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
                                                    
batchReceivePolicy_.getMaxNumBytes());
-    Message peekMsg;
-    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && 
messages->canAdd(peekMsg)) {
-        messageProcessed(peekMsg);
-        Message interceptMsg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), peekMsg);
+    Message msg;
+    while (incomingMessages_.peek(msg) && messages->canAdd(msg)) {
+        incomingMessages_.pop(msg);

Review Comment:
   I noticed that Java client has the same problem. 
   
   
https://github.com/apache/pulsar/blob/9ab9d0170a0a955875656d683839dd516c335bde/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L981-L999
   
   > So you have to use a lock to synchronize these two operations.
   
   But the lock feels a bit heavy because we also need to synchronize the take 
operation of `receive`. 
   
   
https://github.com/apache/pulsar/blob/e0b50c9ec5f12d0fb8275f235d8ac00e87a9099e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L445-L446
   
   The current implementation does not lose or duplicate data, that just may 
return batches that exceed `batchReceivePolicy.MaxNumBytes`.
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to