merlimat commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r279113032
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry> 
entries, SendMessageInfo sent
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, 
subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been 
corrupted
+            MessageMetadata msgMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+            if (msgMetadata == null) {
+                // Message metadata was corrupted
                 iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
                 entry.release();
                 
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual, Collections.emptyMap());
                 continue;
+            } else if (msgMetadata.hasDeliverAtTime()
 
 Review comment:
   > This is strange. The dispatcher gets a list of entries from the managed 
ledger, passes it to the consumer which then passes it back to an interface on 
the dispatcher which for all but one case is a noop.
   
   As mentioned above, the list can be modified already if the checksum doesn't 
match or the metadata fails to deserialize. 
   
   Let me think a bit more at how to refactor this.
   
   > It's also weird that tracking happens in a method called 
updatePermitsAndPendingAcks. And this method doesn't look like it should mutate 
it's argument, but it does.
   
   Agree, the method was already like that, and it could already modify the 
list before. Probably it's about time to rename it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to