merlimat commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r279587876
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -311,15 +307,26 @@ void updatePermitsAndPendingAcks(final List<Entry> 
entries, SendMessageInfo sent
         while (iter.hasNext()) {
             Entry entry = iter.next();
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, 
subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been 
corrupted
+            MessageMetadata msgMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+            if (msgMetadata == null) {
+                // Message metadata was corrupted
                 iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
                 entry.release();
                 
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual, Collections.emptyMap());
                 continue;
+            } else if (msgMetadata.hasDeliverAtTime()
 
 Review comment:
   Yes. The thing is that if we look into the metadata in the dispatcher only, 
then we need a way to pass the batch size for each of the entries, or otherwise 
we have to deserialize the protobuf twice. 
   
   The thing is that we would have to do the double de-serialization always 
(for shared subscription) since we wouldn't be able to know upfront if the 
message was marked with delay or not. 
   
   Refactoring the `updatePermitsAndPendingAcks()` is a bit tricky because it's 
basically code that should belong to the dispatcher already but that it's in 
Consumer class since it's the same across all dispatchers. 
   
   That's basically the context on why I went the route of checking the delay 
value in that particular position.
   
   The refactoring of `updatePermitsAndPendingAcks()` would have to go in a 
different PR anyways.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to