ivankelly commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r282417672
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
 ##########
 @@ -284,42 +288,69 @@ private void incrementUnackedMessages(int ackedMessages) 
{
         }
     }
 
-    public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, 
Subscription subscription, long consumerId) {
+    public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, 
Subscription subscription,
+            long consumerId) {
+        MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, 
subscription, consumerId);
+        if (msgMetadata == null) {
+            return -1;
+        } else {
+            int numMessagesInBatch = msgMetadata.getNumMessagesInBatch();
+            msgMetadata.recycle();
+            return numMessagesInBatch;
+        }
+    }
+
+    public static MessageMetadata peekMessageMetadata(ByteBuf 
metadataAndPayload, Subscription subscription,
+            long consumerId) {
         try {
             // save the reader index and restore after parsing
-            metadataAndPayload.markReaderIndex();
+            int readerIdx = metadataAndPayload.readerIndex();
             PulsarApi.MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
-            metadataAndPayload.resetReaderIndex();
-            int batchSize = metadata.getNumMessagesInBatch();
-            metadata.recycle();
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] num messages in batch are {} ", 
subscription, consumerId, batchSize);
-            }
-            return batchSize;
+            metadataAndPayload.readerIndex(readerIdx);
+
+            return metadata;
         } catch (Throwable t) {
             log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
+            return null;
         }
-        return -1;
     }
 
-    void updatePermitsAndPendingAcks(final List<Entry> entries, 
SendMessageInfo sentMessages) throws PulsarServerException {
+    private void updatePermitsAndFilterMessages(final List<Entry> entries, 
SendMessageInfo sentMessages) throws PulsarServerException {
         int permitsToReduce = 0;
-        Iterator<Entry> iter = entries.iterator();
         boolean unsupportedVersion = false;
         long totalReadableBytes = 0;
         boolean clientSupportBatchMessages = 
cnx.isBatchMessageCompatibleVersion();
-        while (iter.hasNext()) {
-            Entry entry = iter.next();
+
+        for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
+            Entry entry = entries.get(i);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            int batchSize = getBatchSizeforEntry(metadataAndPayload, 
subscription, consumerId);
-            if (batchSize == -1) {
-                // this would suggest that the message might have been 
corrupted
-                iter.remove();
-                PositionImpl pos = (PositionImpl) entry.getPosition();
-                entry.release();
-                
subscription.acknowledgeMessage(Collections.singletonList(pos), 
AckType.Individual, Collections.emptyMap());
-                continue;
+            PositionImpl pos = (PositionImpl) entry.getPosition();
+
+            int batchSize;
+            MessageMetadata msgMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
 
 Review comment:
   From L330 to L353 can be extracted from here and put into the dispatcher.
   
   The first check can be wrapped in a filterCorrupt() method that everyone 
uses. The subscription.acknowledgeMessage is just telling the cursor not to 
give us that message again. The dispatcher controls the cursor, so it can easy 
do this.
   
   The trackDelayed can then be entirely encapsulated in 
PersistentDispatcherMultipleConsumers. It also avoids the jumping back and 
forth that caused the deadlock we saw internally.
   
   Replacing the entry with null is good. Will keep the list hot on the caches.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to