merlimat commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r282561929
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -284,42 +288,69 @@ private void incrementUnackedMessages(int ackedMessages) 
{
         }
     }
 
-    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, 
Subscription subscription, long consumerId) {
+    public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, 
Subscription subscription,
+            long consumerId) {
+        MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, 
subscription, consumerId);
+        if (msgMetadata == null) {
+            return -1;
+        } else {
+            int numMessagesInBatch = msgMetadata.getNumMessagesInBatch();
+            msgMetadata.recycle();
+            return numMessagesInBatch;
+        }
+    }
+
+    public static MessageMetadata peekMessageMetadata(ByteBuf 
metadataAndPayload, Subscription subscription,
+            long consumerId) {
         try {
             // save the reader index and restore after parsing
-            metadataAndPayload.markReaderIndex();
+            int readerIdx = metadataAndPayload.readerIndex();
             PulsarApi.MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
-            metadataAndPayload.resetReaderIndex();
-            int batchSize = metadata.getNumMessagesInBatch();
-            metadata.recycle();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] num messages in batch are {} ", 
subscription, consumerId, batchSize);
-            }
-            return batchSize;
+            metadataAndPayload.readerIndex(readerIdx);
+
+            return metadata;
         } catch (Throwable t) {
             log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
+            return null;
         }
-        return -1;
     }
 
-    void updatePermitsAndPendingAcks(final List<Entry> entries, 
SendMessageInfo sentMessages) throws PulsarServerException {
+    private void updatePermitsAndFilterMessages(final List<Entry> entries, 
SendMessageInfo sentMessages) throws PulsarServerException {
         int permitsToReduce = 0;
-        Iterator<Entry> iter = entries.iterator();
         boolean unsupportedVersion = false;
         long totalReadableBytes = 0;
         boolean clientSupportBatchMessages = 
cnx.isBatchMessageCompatibleVersion();
-        while (iter.hasNext()) {
-            Entry entry = iter.next();
+
+        for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
+            Entry entry = entries.get(i);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, 
subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been 
corrupted
-                iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
-                entry.release();
-                
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual, Collections.emptyMap());
-                continue;
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+
+            int batchSize;
+            MessageMetadata msgMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
 
 Review comment:
   Yes, though the only reason we'd still need the metadata in consumer is the 
`batchSize` for each entry so that we can adjust the permits for the consumer.
   
   That would need to either: 
    1. Deserialize the metadata twice
    2. Deserialize the metadata in dispatcher and associated batch size with 
each entry.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to