merlimat commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r280646491
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry> 
entries, SendMessageInfo sent
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, 
subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been 
corrupted
+            MessageMetadata msgMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+            if (msgMetadata == null) {
+                // Message metadata was corrupted
                 iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
                 entry.release();
                 
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual, Collections.emptyMap());
                 continue;
+            } else if (msgMetadata.hasDeliverAtTime()
 
 Review comment:
   @sijie @ivankelly PTAL again, I have removed the list copy. Instead of the 
iterator.remove(), I changed it to just "null" the entries that are being 
removed. That works fine with the sublist.
   
   I also renamed the updatePermitsAndPendingAcks to be more explicit in that 
it filters out messages (either for delay or for metadata/checksum errors). We 
can think later on how to refactor the interactions between dispatcher and 
consumers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to