This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new e4a1e675eee [cleanup][broker] Simplify extract entryMetadata code in 
filterEntriesForConsumer (#18729)
e4a1e675eee is described below

commit e4a1e675eee06e6cd55622eb217cae6fa97c225a
Author: lifepuzzlefun <[email protected]>
AuthorDate: Mon Dec 12 14:18:15 2022 +0800

    [cleanup][broker] Simplify extract entryMetadata code in 
filterEntriesForConsumer (#18729)
    
    origin extract entry metadata logic is based on `Optional.map.orElseGet` 
which can be simplified by if condition and also has better performance on hot 
code path.
    
    1. use if null check replace Optional code.
    2. remove duplicate hasChunk check logic in 
`PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers`
    
    (cherry picked from commit a1e3b80af804c8697d24b5be51f417650ec67112)
---
 .../broker/service/AbstractBaseDispatcher.java     | 26 ++++++++++++++++------
 .../PersistentDispatcherMultipleConsumers.java     |  2 +-
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index da6be55f8e1..6b9ddcc1162 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
@@ -52,6 +51,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 @Slf4j
 public abstract class AbstractBaseDispatcher implements Dispatcher {
@@ -128,13 +128,25 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
     public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes 
batchSizes,
             SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
             ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
-        return filterEntriesForConsumer(Optional.empty(), 0, entries, 
batchSizes, sendMessageInfo, indexesAcks, cursor,
+        return filterEntriesForConsumer(null, 0, entries, batchSizes,
+                sendMessageInfo, indexesAcks, cursor,
                 isReplayRead, consumer);
     }
 
-    public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, 
int entryWrapperOffset,
-             List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo 
sendMessageInfo,
-             EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean 
isReplayRead, Consumer consumer) {
+    /**
+     * Filter entries with prefetched message metadata range so that there is 
no need to peek metadata from Entry.
+     *
+     * @param entryWrapper the optional message metadata array. need check if 
null pass.
+     * @param entryWrapperOffset the index in `optMetadataArray` of the first 
Entry's message metadata
+     *
+     * @see AbstractBaseDispatcher#filterEntriesForConsumer(List, 
EntryBatchSizes, SendMessageInfo,
+     *   EntryBatchIndexesAcks, ManagedCursor, boolean, Consumer)
+     */
+    public int filterEntriesForConsumer(@Nullable EntryWrapper[] entryWrapper, 
int entryWrapperOffset,
+                                        List<? extends Entry> entries, 
EntryBatchSizes batchSizes,
+                                        SendMessageInfo sendMessageInfo,
+                                        EntryBatchIndexesAcks indexesAcks, 
ManagedCursor cursor,
+                                        boolean isReplayRead, Consumer 
consumer) {
         int totalMessages = 0;
         long totalBytes = 0;
         int totalChunkedMessages = 0;
@@ -148,8 +160,8 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
             }
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             int entryWrapperIndex = i + entryWrapperOffset;
-            MessageMetadata msgMetadata = entryWrapper.isPresent() && 
entryWrapper.get()[entryWrapperIndex] != null
-                    ? entryWrapper.get()[entryWrapperIndex].getMetadata()
+            MessageMetadata msgMetadata = entryWrapper != null  && 
entryWrapper[entryWrapperIndex] != null
+                    ? entryWrapper[entryWrapperIndex].getMetadata()
                     : null;
             msgMetadata = msgMetadata == null
                     ? Commands.peekMessageMetadata(metadataAndPayload, 
subscription.toString(), -1)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index b4397635c41..f74ac857999 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -589,7 +589,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
                 EntryBatchSizes batchSizes = 
EntryBatchSizes.get(entriesForThisConsumer.size());
                 EntryBatchIndexesAcks batchIndexesAcks = 
EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
-                totalEntries += 
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
+                totalEntries += filterEntriesForConsumer(entryWrappers, start,
                         entriesForThisConsumer, batchSizes, sendMessageInfo, 
batchIndexesAcks, cursor,
                         readType == ReadType.Replay, c);
 

Reply via email to