shibd commented on code in PR #228:
URL: https://github.com/apache/pulsar-client-cpp/pull/228#discussion_r1148279075


##########
lib/ConsumerImpl.cc:
##########
@@ -649,10 +649,11 @@ void ConsumerImpl::executeNotifyCallback(Message& msg) {
 void ConsumerImpl::notifyBatchPendingReceivedCallback(const 
BatchReceiveCallback& callback) {
     auto messages = 
std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
                                                    
batchReceivePolicy_.getMaxNumBytes());
-    Message peekMsg;
-    while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && 
messages->canAdd(peekMsg)) {
-        messageProcessed(peekMsg);
-        Message interceptMsg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), peekMsg);
+    Message msg;
+    while (incomingMessages_.peek(msg) && messages->canAdd(msg)) {
+        incomingMessages_.pop(msg);

Review Comment:
   > The current implementation does not lose or duplicate data, that just may 
return batches that exceed batchReceivePolicy.MaxNumBytes.
   
   I mean, if we don't think it's a bug, there's no need to lock it here.
   
   Because we can't accurately control the batch returned be equal to 
`batchReceivePolicy.MaxNumBytes`
   
   - If we lock here, it is possible to return batches less than 
`batchReceivePolicy.MaxNumBytes`. 
   - If we don't lock here, it is possible to return batches greater than 
`batchReceivePolicy.MaxNumBytes`.
   
   We can discuss it first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to