This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bbd4aeff0 [improve][broker] Reduce unnecessary MessageMetadata 
parsing by caching the parsed instance in the broker cache (#24682)
53bbd4aeff0 is described below

commit 53bbd4aeff003275d9c6bafbc6ef6c5523014ae2
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Wed Sep 10 00:23:17 2025 +0300

    [improve][broker] Reduce unnecessary MessageMetadata parsing by caching the 
parsed instance in the broker cache (#24682)
---
 .../java/org/apache/bookkeeper/mledger/Entry.java  |  25 +++
 .../apache/bookkeeper/mledger/impl/EntryImpl.java  |  24 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  11 +-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  30 +--
 .../mledger/impl/ShadowManagedLedgerImpl.java      |   5 +
 .../mledger/impl/cache/EntryCacheDisabled.java     |   3 +-
 .../bookkeeper/mledger/impl/cache/RangeCache.java  |  17 +-
 .../mledger/impl/cache/RangeCacheEntryWrapper.java |  34 +++-
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |   1 +
 .../broker/admin/impl/PersistentTopicsBase.java    |  28 ++-
 .../intercept/ManagedLedgerInterceptorImpl.java    |  12 +-
 .../broker/service/AbstractBaseDispatcher.java     |  28 ++-
 .../pulsar/broker/service/EntryAndMetadata.java    |  12 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   5 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |   3 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   4 +-
 .../PersistentDispatcherMultipleConsumers.java     |   6 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   2 +-
 .../persistent/PersistentMessageFinder.java        |   3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   6 +-
 ...tickyKeyDispatcherMultipleConsumersClassic.java |   2 +-
 .../service/persistent/PersistentSubscription.java |  12 +-
 .../broker/service/persistent/PersistentTopic.java |  24 ++-
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |   2 +-
 .../buffer/impl/TopicTransactionBuffer.java        |   7 +-
 .../service/PersistentMessageFinderTest.java       |   2 +-
 .../apache/pulsar/client/impl/MessageImplTest.java |   3 +-
 .../apache/pulsar/common/protocol/Commands.java    | 208 +++++++++++++++------
 29 files changed, 382 insertions(+), 140 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
index 6b8f8a95773..24ea5c17c0d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java
@@ -21,6 +21,8 @@ package org.apache.bookkeeper.mledger;
 import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
 
 /**
  * An Entry represent a ledger entry data and its associated position.
@@ -98,4 +100,27 @@ public interface Entry {
     default boolean matchesPosition(Position position) {
         return position != null && position.compareTo(getLedgerId(), 
getEntryId()) == 0;
     }
+
+    default MessageMetadata getMessageMetadata() {
+        return null;
+    }
+
+    /**
+     * Returns the timestamp of the entry.
+     * @return
+     */
+    default long getEntryTimestamp() {
+        // get broker timestamp first if BrokerEntryMetadata is enabled with 
AppendBrokerTimestampMetadataInterceptor
+        return Commands.peekBrokerEntryMetadataToLong(getDataBuffer(), 
brokerEntryMetadata -> {
+            if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
+                return brokerEntryMetadata.getBrokerTimestamp();
+            }
+            // otherwise get the publish_time
+            MessageMetadata messageMetadata = getMessageMetadata();
+            if (messageMetadata == null) {
+                messageMetadata = 
Commands.peekMessageMetadata(getDataBuffer(), null, -1);
+            }
+            return messageMetadata.getPublishTime();
+        });
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index b85a1bc45fe..070a0fc1bea 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -24,6 +24,9 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCounted;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.Entry;
@@ -33,7 +36,10 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
 
+@Slf4j
 public final class EntryImpl extends AbstractCASReferenceCounted
         implements ReferenceCountedEntry, Comparable<EntryImpl> {
 
@@ -51,6 +57,8 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
     ByteBuf data;
     private EntryReadCountHandler readCountHandler;
     private boolean decreaseReadCountOnRelease = true;
+    @Getter @Setter
+    private MessageMetadata messageMetadata;
 
     private Runnable onDeallocate;
 
@@ -161,6 +169,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
         entry.entryId = other.entryId;
         entry.data = other.data.retainedDuplicate();
         entry.readCountHandler = other.readCountHandler;
+        entry.messageMetadata = other.messageMetadata;
         entry.setRefCnt(1);
         return entry;
     }
@@ -172,6 +181,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
         entry.entryId = other.getEntryId();
         entry.data = other.getDataBuffer().retainedDuplicate();
         entry.readCountHandler = other.getReadCountHandler();
+        entry.messageMetadata = other.getMessageMetadata();
         entry.setRefCnt(1);
         return entry;
     }
@@ -277,6 +287,7 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
         position = null;
         readCountHandler = null;
         decreaseReadCountOnRelease = true;
+        messageMetadata = null;
         recyclerHandle.recycle(this);
     }
 
@@ -294,6 +305,19 @@ public final class EntryImpl extends 
AbstractCASReferenceCounted
         decreaseReadCountOnRelease = enabled;
     }
 
+    public void initializeMessageMetadataIfNeeded(String managedLedgerName) {
+        if (messageMetadata == null) {
+            try {
+                MessageMetadata msgMetadata = new MessageMetadata();
+                Commands.peekMessageMetadata(data, msgMetadata);
+                this.messageMetadata = msgMetadata;
+            } catch (Throwable t) {
+                log.warn("[{}] Failed to parse message metadata for entry 
{}:{}", managedLedgerName, ledgerId, entryId,
+                        t);
+            }
+        }
+    }
+
     @Override
     public String toString() {
         return getClass().getName() + "@" + System.identityHashCode(this)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0be740e1bda..a4527261c8b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -31,7 +31,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -144,7 +143,6 @@ import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.LazyLoadableValue;
@@ -1315,9 +1313,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 try {
-                    long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    long entryTimestamp = entry.getEntryTimestamp();
                     future.complete(entryTimestamp);
-                } catch (IOException e) {
+                } catch (Exception e) {
                     log.error("Error deserializing message for message 
position {}", nextPos, e);
                     future.completeExceptionally(e);
                 } finally {
@@ -4991,4 +4989,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             throw new RuntimeException(e);
         }
     }
+
+    boolean shouldCacheAddedEntry() {
+        // Avoid caching entries if no cursor has been created
+        return getActiveCursors().shouldCacheAddedEntry();
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 49bc3f309d4..3af5618064f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -257,23 +257,23 @@ public class OpAddEntry implements AddCallback, 
CloseCallback, Runnable, Managed
         ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
         ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
 
+        // ctx will contain a Position instance only in the case of 
ShadowManagedLedgerImpl
         long ledgerId = ledger != null ? ledger.getId() : ((Position) 
ctx).getLedgerId();
-        // Don't insert to the entry cache for the ShadowManagedLedger
-        if (!(ml instanceof ShadowManagedLedgerImpl)) {
-            // Avoid caching entries if no cursor has been created
-            if (ml.getActiveCursors().shouldCacheAddedEntry()) {
-                int expectedReadCount = 0;
-                // only use expectedReadCount if cache eviction is enabled by 
expected read count
-                if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
-                    expectedReadCount = ml.getActiveCursors().size();
-                }
-                EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, 
expectedReadCount);
-                entry.setDecreaseReadCountOnRelease(false);
-                // EntryCache.insert: duplicates entry by allocating new entry 
and data. so, recycle entry after calling
-                // insert
-                ml.entryCache.insert(entry);
-                entry.release();
+
+        // Handle caching for tailing reads
+        if (ml.shouldCacheAddedEntry()) {
+            int expectedReadCount = 0;
+            // only use expectedReadCount if cache eviction is enabled by 
expected read count
+            if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
+                // use the number of active cursors as the expected read count
+                expectedReadCount = ml.getActiveCursors().size();
             }
+            EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, 
expectedReadCount);
+            entry.setDecreaseReadCountOnRelease(false);
+            // EntryCache.insert: duplicates entry by allocating new entry and 
data. so, recycle entry after calling
+            // insert
+            ml.entryCache.insert(entry);
+            entry.release();
         }
 
         Position lastEntry = PositionFactory.create(ledgerId, entryId);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index 4b03cad8e0a..46cd1335a17 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -407,4 +407,9 @@ public class ShadowManagedLedgerImpl extends 
ManagedLedgerImpl {
     protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
         this.lastLedgerCreatedTimestamp = clock.millis();
     }
+
+    @Override
+    boolean shouldCacheAddedEntry() {
+        return false;
+    }
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
index 9cd63d99f4c..b5a45415a4f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java
@@ -78,6 +78,7 @@ public class EntryCacheDisabled implements EntryCache {
                         for (LedgerEntry e : ledgerEntries) {
                             // Insert the entries at the end of the list (they 
will be unsorted for now)
                             EntryImpl entry = EntryImpl.create(e, interceptor, 
0);
+                            
entry.initializeMessageMetadataIfNeeded(ml.getName());
                             entries.add(entry);
                             totalSize += entry.getLength();
                         }
@@ -111,7 +112,7 @@ public class EntryCacheDisabled implements EntryCache {
                         if (iterator.hasNext()) {
                             LedgerEntry ledgerEntry = iterator.next();
                             EntryImpl returnEntry = 
EntryImpl.create(ledgerEntry, interceptor, 0);
-
+                            
returnEntry.initializeMessageMetadataIfNeeded(ml.getName());
                             ml.getMbean().recordReadEntriesOpsCacheMisses(1, 
returnEntry.getLength());
                             ml.getFactory().getMbean().recordCacheMiss(1, 
returnEntry.getLength());
                             ml.getMbean().addReadEntriesSample(1, 
returnEntry.getLength());
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
index a5592f7098e..858b7d3c07c 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java
@@ -46,13 +46,23 @@ import org.apache.commons.lang3.tuple.Pair;
 class RangeCache {
     private final ConcurrentNavigableMap<Position, RangeCacheEntryWrapper> 
entries;
     private final RangeCacheRemovalQueue removalQueue;
-    private AtomicLong size; // Total size of values stored in cache
+    private final AtomicLong size; // Total size of values stored in cache
+    private final String managedLedgerName;
 
     /**
      * Construct a new RangeCache.
      */
     public RangeCache(RangeCacheRemovalQueue removalQueue) {
+        this(removalQueue, null);
+    }
+
+    /**
+     * Construct a new RangeCache.
+     * @param managedLedgerName the name of the managed ledger this cache 
belongs to
+     */
+    public RangeCache(RangeCacheRemovalQueue removalQueue, String 
managedLedgerName) {
         this.removalQueue = removalQueue;
+        this.managedLedgerName = managedLedgerName;
         this.entries = new ConcurrentSkipListMap<>();
         this.size = new AtomicLong(0);
     }
@@ -115,7 +125,7 @@ class RangeCache {
         if (valueWrapper == null) {
             return null;
         } else {
-            ReferenceCountedEntry value = valueWrapper.getValue(key);
+            ReferenceCountedEntry value = valueWrapper.getValue(key, 
managedLedgerName);
             return getRetainedValueMatchingKey(key, value);
         }
     }
@@ -124,7 +134,8 @@ class RangeCache {
      * @apiNote the returned value must be released if it's not null
      */
     private ReferenceCountedEntry getValueMatchingEntry(Map.Entry<Position, 
RangeCacheEntryWrapper> entry) {
-        ReferenceCountedEntry valueMatchingEntry = 
RangeCacheEntryWrapper.getValueMatchingMapEntry(entry);
+        ReferenceCountedEntry valueMatchingEntry =
+                RangeCacheEntryWrapper.getValueMatchingMapEntry(entry, 
managedLedgerName);
         return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry);
     }
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
index db5ceaefd38..5c82207e1c7 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java
@@ -22,14 +22,17 @@ import io.netty.util.Recycler;
 import java.util.Map;
 import java.util.concurrent.locks.StampedLock;
 import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReferenceCountedEntry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
 
 /**
  * Wrapper around the value to store in Map. This is needed to ensure that a 
specific instance can be removed from
  * the map by calling the {@link Map#remove(Object, Object)} method. Certain 
race conditions could result in the
  * wrong value being removed from the map. The instances of this class are 
recycled to avoid creating new objects.
  */
+@Slf4j
 class RangeCacheEntryWrapper {
     private final Recycler.Handle<RangeCacheEntryWrapper> recyclerHandle;
     private static final Recycler<RangeCacheEntryWrapper> RECYCLER = new 
Recycler<RangeCacheEntryWrapper>() {
@@ -73,12 +76,13 @@ class RangeCacheEntryWrapper {
     /**
      * Get the value associated with the key. Returns null if the key does not 
match the key.
      *
-     * @param key the key to match
+     * @param key               the key to match
+     * @param managedLedgerName
      * @return the value associated with the key, or null if the value has 
already been recycled or the key does not
      * match
      */
-    ReferenceCountedEntry getValue(Position key) {
-        return getValueInternal(key, false);
+    ReferenceCountedEntry getValue(Position key, String managedLedgerName) {
+        return getValueInternal(key, false, managedLedgerName);
     }
 
     /**
@@ -88,8 +92,9 @@ class RangeCacheEntryWrapper {
      * @return the value associated with the key, or null if the value has 
already been recycled or the key does not
      * exactly match the same instance
      */
-    static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, 
RangeCacheEntryWrapper> entry) {
-        return entry.getValue().getValueInternal(entry.getKey(), true);
+    static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, 
RangeCacheEntryWrapper> entry,
+                                                          String 
managedLedgerName) {
+        return entry.getValue().getValueInternal(entry.getKey(), true, 
managedLedgerName);
     }
 
     /**
@@ -101,16 +106,20 @@ class RangeCacheEntryWrapper {
      *                               key as the one stored in the wrapper. 
This is used to avoid any races
      *                               when retrieving or removing the entries 
from the cache when the key and value
      *                               instances are available.
+     * @param managedLedgerName
      * @return the value associated with the key, or null if the key does not 
match
      */
-    private ReferenceCountedEntry getValueInternal(Position key, boolean 
requireSameKeyInstance) {
+    private ReferenceCountedEntry getValueInternal(Position key, boolean 
requireSameKeyInstance,
+                                                   String managedLedgerName) {
         long stamp = lock.tryOptimisticRead();
         Position localKey = this.key;
         ReferenceCountedEntry localValue = this.value;
+        boolean messageMetadataInitialized = localValue != null && 
localValue.getMessageMetadata() != null;
         if (!lock.validate(stamp)) {
             stamp = lock.readLock();
             localKey = this.key;
             localValue = this.value;
+            messageMetadataInitialized = localValue != null && 
localValue.getMessageMetadata() != null;
             lock.unlockRead(stamp);
         }
         // check that the given key matches the key associated with the value 
in the entry
@@ -120,6 +129,19 @@ class RangeCacheEntryWrapper {
         if (localKey != key && (requireSameKeyInstance || localKey == null || 
!localKey.equals(key))) {
             return null;
         }
+        // Initialize the metadata if it's not already initialized
+        if (localValue != null && !messageMetadataInitialized) {
+            localValue = withWriteLock(wrapper -> {
+                // ensure that the key still matches
+                if (wrapper.key != key && (requireSameKeyInstance || 
wrapper.key == null || !wrapper.key.equals(key))) {
+                    return null;
+                }
+                if (wrapper.value instanceof EntryImpl entry) {
+                    entry.initializeMessageMetadataIfNeeded(managedLedgerName);
+                }
+                return wrapper.value;
+            });
+        }
         accessed = true;
         return localValue;
     }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 04de8dde0ad..fd391ba2bf6 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -532,6 +532,7 @@ public class RangeEntryCacheImpl implements EntryCache {
                                 final List<Entry> entriesToReturn = new 
ArrayList<>(entriesToRead);
                                 for (LedgerEntry e : ledgerEntries) {
                                     EntryImpl entry = EntryImpl.create(e, 
interceptor, expectedReadCountVal);
+                                    
entry.initializeMessageMetadataIfNeeded(ml.getName());
                                     entriesToReturn.add(entry);
                                     totalSize += entry.getLength();
                                     if (expectedReadCountVal > 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a16be37bb38..ec61d58d2af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2659,8 +2659,10 @@ public class PersistentTopicsBase extends AdminResource {
                                 if (entry == null) {
                                     batchSizeFuture.complete(0);
                                 } else {
-                                    MessageMetadata metadata =
-                                            
Commands.parseMessageMetadata(entry.getDataBuffer());
+                                    MessageMetadata metadata = 
entry.getMessageMetadata();
+                                    if (metadata == null) {
+                                        metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                                    }
                                     
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
                                 }
                             } catch (Exception e) {
@@ -2839,7 +2841,7 @@ public class PersistentTopicsBase extends AdminResource {
     private CompletableFuture<MessageId> findMessageIdByPublishTime(long 
timestamp, ManagedLedger managedLedger) {
         return managedLedger.asyncFindPosition(entry -> {
             try {
-                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                long entryTimestamp = entry.getEntryTimestamp();
                 return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, 
timestamp);
             } catch (Exception e) {
                 log.error("[{}] Error deserializing message for message 
position find",
@@ -3009,7 +3011,12 @@ public class PersistentTopicsBase extends AdminResource {
 
         long totalSize = metadataAndPayload.readableBytes();
         BrokerEntryMetadata brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
-        MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+        MessageMetadata metadata = entry.getMessageMetadata();
+        if (metadata == null) {
+            metadata = Commands.parseMessageMetadata(metadataAndPayload);
+        } else {
+            Commands.skipMessageMetadata(metadataAndPayload);
+        }
 
         ResponseBuilder responseBuilder = Response.ok();
         responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
@@ -5451,11 +5458,12 @@ public class PersistentTopicsBase extends AdminResource 
{
 
 
     private static Long getIndexFromEntry(Entry entry) {
-        final var brokerEntryMetadata = 
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
-        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
-            return brokerEntryMetadata.getIndex();
-        } else {
-            return null;
-        }
+        return Commands.peekBrokerEntryMetadataToObject(entry.getDataBuffer(), 
brokerEntryMetadata -> {
+            if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) 
{
+                return brokerEntryMetadata.getIndex();
+            } else {
+                return null;
+            }
+        });
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index db138989a8e..a86b49d627f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
@@ -117,12 +116,11 @@ public class ManagedLedgerInterceptorImpl implements 
ManagedLedgerInterceptor {
             if (lastEntryOptional.isPresent()) {
                 Entry lastEntry = lastEntryOptional.get();
                 try {
-                    BrokerEntryMetadata brokerEntryMetadata =
-                            
Commands.parseBrokerEntryMetadataIfExist(lastEntry.getDataBuffer());
-                    if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
-                        appendIndexMetadataInterceptor.recoveryIndexGenerator(
-                                brokerEntryMetadata.getIndex());
-                    }
+                    
Commands.peekBrokerEntryMetadataAndConsume(lastEntry.getDataBuffer(), 
brokerEntryMetadata -> {
+                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
+                            
appendIndexMetadataInterceptor.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                        }
+                    });
                 } finally {
                     lastEntry.release();
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index f074f234b87..c5e001692f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -149,6 +149,8 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
                 msgMetadata = metadataArray[metadataIndex];
             } else if (entry instanceof EntryAndMetadata) {
                 msgMetadata = ((EntryAndMetadata) entry).getMetadata();
+            } else if (entry.getMessageMetadata() != null) {
+                msgMetadata = entry.getMessageMetadata();
             } else {
                 msgMetadata = 
Commands.peekAndCopyMessageMetadata(metadataAndPayload, 
subscription.toString(), -1);
             }
@@ -454,8 +456,18 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
         return true;
     }
 
-    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
-        return Commands.peekStickyKey(metadataAndPayload, 
subscription.getTopicName(), subscription.getName());
+    protected byte[] peekStickyKey(Entry entry) {
+        if (entry instanceof EntryAndMetadata entryAndMetadata) {
+            return entryAndMetadata.getStickyKey();
+        }
+        MessageMetadata metadata = entry.getMessageMetadata();
+        if (metadata == null) {
+            metadata = Commands.peekMessageMetadata(entry.getDataBuffer(), 
subscription.toString(), -1);
+        }
+        if (metadata == null) {
+            return Commands.NONE_KEY;
+        }
+        return Commands.resolveStickyKey(metadata);
     }
 
     protected String getSubscriptionName() {
@@ -528,4 +540,16 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
     protected final void updatePendingBytesToDispatch(long size) {
         PENDING_BYTES_TO_DISPATCH.inc(size);
     }
+
+    protected int getNumberOfMessagesInBatch(Entry entry) {
+        MessageMetadata msgMetadata = entry.getMessageMetadata();
+        if (msgMetadata == null) {
+            msgMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(), 
subscription.toString(), -1);
+        }
+        if (msgMetadata == null) {
+            return -1;
+        } else {
+            return msgMetadata.getNumMessagesInBatch();
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
index 33abddc300b..117f0115064 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
@@ -43,12 +43,22 @@ public class EntryAndMetadata implements Entry {
     }
 
     public static EntryAndMetadata create(final Entry entry, final 
MessageMetadata metadata) {
+        if (entry instanceof EntryAndMetadata entryAndMetadata) {
+            return entryAndMetadata;
+        }
         return new EntryAndMetadata(entry, metadata);
     }
 
     @VisibleForTesting
     public static EntryAndMetadata create(final Entry entry) {
-        return create(entry, 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1));
+        if (entry instanceof EntryAndMetadata entryAndMetadata) {
+            return entryAndMetadata;
+        }
+        MessageMetadata msgMetadata = entry.getMessageMetadata();
+        if (msgMetadata == null) {
+            msgMetadata = 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1);
+        }
+        return create(entry, msgMetadata);
     }
 
     public byte[] getStickyKey() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d1bc4953d11..44927c375b5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2365,7 +2365,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }, null);
 
             CompletableFuture<Integer> batchSizeFuture = 
entryFuture.thenApply(entry -> {
-                MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                MessageMetadata metadata = entry.getMessageMetadata();
+                if (metadata == null) {
+                    metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                }
                 int batchSize = metadata.getNumMessagesInBatch();
                 entry.release();
                 return metadata.hasNumMessagesInBatch() ? batchSize : -1;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5941093e71c..a4fba30cc9b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -205,7 +204,7 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
             TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-sendMessageInfo.getTotalMessages());
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = 
Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), 
subscription.toString(), -1);
+                int totalMsgs = getNumberOfMessagesInBatch(entry);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent(totalMsgs);
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 414e9235418..26bb5a791cf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -30,7 +30,6 @@ import 
org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.stats.Rate;
 
 @Slf4j
@@ -66,7 +65,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
                     sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
         } else {
             entries.forEach(entry -> {
-                int totalMsgs = 
Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), 
subscription.toString(), -1);
+                int totalMsgs = getNumberOfMessagesInBatch(entry);
                 if (totalMsgs > 0) {
                     msgDrop.recordEvent(totalMsgs);
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 915f9c0f925..2e884eb0ea1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -151,7 +151,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
         consumerStickyKeyHashesMap.clear();
 
         for (Entry entry : entries) {
-            byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+            byte[] stickyKey = peekStickyKey(entry);
             int stickyKeyHash = selector.makeStickyKeyHash(stickyKey);
 
             Consumer consumer = selector.select(stickyKeyHash);
@@ -182,7 +182,7 @@ public class 
NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
                 TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, 
-sendMessageInfo.getTotalMessages());
             } else {
                 entriesForConsumer.forEach(e -> {
-                    int totalMsgs = 
Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(), 
-1);
+                    int totalMsgs = getNumberOfMessagesInBatch(e);
                     if (totalMsgs > 0) {
                         msgDrop.recordEvent(totalMsgs);
                     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c35d802f43d..097ab9cd0fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -72,7 +72,6 @@ import 
org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -798,10 +797,11 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
             if (entry instanceof EntryAndMetadata) {
                 metadata = ((EntryAndMetadata) entry).getMetadata();
             } else {
-                metadata = 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), 
subscription.toString(), -1);
                 // cache the metadata in the entry with EntryAndMetadata for 
later use to avoid re-parsing the metadata
                 // and to carry the metadata and calculated stickyKeyHash with 
the entry
-                entries.set(i, EntryAndMetadata.create(entry, metadata));
+                EntryAndMetadata entryAndMetadata = 
EntryAndMetadata.create(entry);
+                metadata = entryAndMetadata.getMetadata();
+                entries.set(i, entryAndMetadata);
             }
             if (metadata != null) {
                 remainingMessages += metadata.getNumMessagesInBatch();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 99a09d3a5d7..e809d984ae3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -189,7 +189,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             Iterator<Entry> iterator = entries.iterator();
             while (iterator.hasNext()) {
                 Entry entry = iterator.next();
-                byte[] key = peekStickyKey(entry.getDataBuffer());
+                byte[] key = peekStickyKey(entry);
                 Consumer consumer = stickyKeyConsumerSelector.select(key);
                 // Skip the entry if it's not for current active consumer.
                 if (consumer == null || currentConsumer != consumer) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
index e8bc9fdc3ea..991e6a06029 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java
@@ -30,7 +30,6 @@ import org.apache.bookkeeper.mledger.PositionFactory;
 import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +72,7 @@ public class PersistentMessageFinder implements 
AsyncCallbacks.FindEntryCallback
             
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries,
 entry -> {
                 try {
                     // Find the latest entry that is earlier than the target 
timestamp.
-                    long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    long entryTimestamp = entry.getEntryTimestamp();
                     return 
MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
                 } catch (Exception e) {
                     log.error("[{}][{}] Error deserializing message for 
message position find", topicName, subName, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 080f5acbf16..e0c74f9043c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -58,7 +58,6 @@ import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -419,8 +418,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             } else {
                 // replace the input entry with EntryAndMetadata instance. In 
addition to the entry and metadata,
                 // it will also carry the calculated sticky key hash
-                entry = EntryAndMetadata.create(inputEntry,
-                        
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), 
getSubscriptionName(), -1));
+                entry = EntryAndMetadata.create(inputEntry);
             }
             int stickyKeyHash = getStickyKeyHash(entry);
             Consumer consumer = null;
@@ -610,7 +608,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             // use the cached sticky key hash if available, otherwise 
calculate the sticky key hash and cache it
             return 
entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash);
         }
-        return 
selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+        return selector.makeStickyKeyHash(peekStickyKey(entry));
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index c3b246fe9ba..1f29cb73626 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -652,7 +652,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
             // use the cached sticky key hash if available, otherwise 
calculate the sticky key hash and cache it
             return 
entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash);
         }
-        return 
selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
+        return selector.makeStickyKeyHash(peekStickyKey(entry));
     }
 
     private static final Logger log =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index ac54957f0c6..056e9a25fda 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -23,7 +23,6 @@ import static 
org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import io.netty.buffer.ByteBuf;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -644,7 +643,12 @@ public class PersistentSubscription extends 
AbstractSubscription {
             firstPosition.compareAndSet(null, entryPosition);
             lastPosition.set(entryPosition);
             ByteBuf metadataAndPayload = entry.getDataBuffer();
-            MessageMetadata messageMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+            MessageMetadata messageMetadata;
+            if (entry.getMessageMetadata() != null) {
+                messageMetadata = entry.getMessageMetadata();
+            } else {
+                messageMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, "", -1);
+            }
             int numMessages = 1;
             if (messageMetadata.hasNumMessagesInBatch()) {
                 numMessages = messageMetadata.getNumMessagesInBatch();
@@ -1438,9 +1442,9 @@ public class PersistentSubscription extends 
AbstractSubscription {
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 try {
-                    long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    long entryTimestamp = entry.getEntryTimestamp();
                     future.complete(entryTimestamp);
-                } catch (IOException e) {
+                } catch (Exception e) {
                     log.error("Error deserializing message for message 
position {}", nextPos, e);
                     future.completeExceptionally(e);
                 } finally {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 657f03d6b38..3bbd8b00777 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
-import java.io.IOException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -3801,7 +3800,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         @Override
                         public void readEntryComplete(Entry entry, Object ctx) 
{
                             try {
-                                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                                long entryTimestamp = 
entry.getEntryTimestamp();
                                 updateResultIfNewer(
                                         new OldestPositionInfo(
                                                 
oldestMarkDeleteCursorInfo.getPosition(),
@@ -3979,7 +3978,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         try {
             entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
             if (entry != null) {
-                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                long entryTimestamp = entry.getEntryTimestamp();
                 isOldestMessageExpired = MessageImpl.isEntryExpired(
                         (int) (messageTTLInSeconds * 
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
             }
@@ -4009,10 +4008,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             public void readEntryComplete(Entry entry, Object ctx) {
                 long entryTimestamp = 0;
                 try {
-                    entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                    entryTimestamp = entry.getEntryTimestamp();
                     res.complete(MessageImpl.isEntryExpired(
                             (int) (messageTTLInSeconds * 
MESSAGE_EXPIRY_THRESHOLD), entryTimestamp));
-                } catch (IOException e) {
+                } catch (Exception e) {
                     log.warn("[{}] [{}] Error while getting the oldest 
message", topic, cursor.toString(), e);
                     res.complete(false);
                 }
@@ -4110,7 +4109,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return CompletableFuture.completedFuture(lastDispatchablePosition);
         }
         return ledger.getLastDispatchablePosition(entry -> {
-            MessageMetadata md = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+            MessageMetadata md = entry.getMessageMetadata();
+            if (md == null) {
+                md = Commands.parseMessageMetadata(entry.getDataBuffer());
+            }
             // If a messages has marker will filter by 
AbstractBaseDispatcher.filterEntriesForConsumer
             if (Markers.isServerOnlyMarker(md)) {
                 return false;
@@ -4179,7 +4181,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 try {
-                    MessageMetadata metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                    MessageMetadata metadata = entry.getMessageMetadata();
+                    if (metadata == null) {
+                        metadata = 
Commands.parseMessageMetadata(entry.getDataBuffer());
+                    }
                     if (metadata.hasNumMessagesInBatch()) {
                         completableFuture.complete(new 
BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(),
                                 partitionIndex, 
metadata.getNumMessagesInBatch() - 1));
@@ -4797,7 +4802,10 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 public void readEntryComplete(Entry entry, Object ctx) {
                     try {
                         ByteBuf metadataAndPayload = entry.getDataBuffer();
-                        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+                        MessageMetadata msgMetadata = 
entry.getMessageMetadata();
+                        if (msgMetadata == null) {
+                            msgMetadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+                        }
                         long publishTime = msgMetadata.getPublishTime();
                         future.complete(publishTime);
                     } catch (Exception e) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 98d8a40eb36..38b902dc520 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -408,7 +408,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
         //decode snapshot from entry
         ByteBuf headersAndPayload = entry.getDataBuffer();
         //skip metadata
-        Commands.parseMessageMetadata(headersAndPayload);
+        Commands.skipMessageMetadata(headersAndPayload);
         TransactionBufferSnapshotSegment snapshotSegment = 
Schema.AVRO(TransactionBufferSnapshotSegment.class)
                 .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index c43f0ed7fb9..c3961dca6d4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -184,8 +184,11 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     public void handleTxnEntry(Entry entry) {
                         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
-                        MessageMetadata msgMetadata = 
Commands.peekMessageMetadata(metadataAndPayload,
-                                
TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1);
+                        MessageMetadata msgMetadata = 
entry.getMessageMetadata();
+                        if (msgMetadata == null) {
+                            msgMetadata = 
Commands.peekMessageMetadata(metadataAndPayload,
+                                    
TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1);
+                        }
                         if (msgMetadata != null && 
msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                             TxnID txnID = new 
TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
                             Position position = 
PositionFactory.create(entry.getLedgerId(), entry.getEntryId());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 4dc10671e6a..5707a6cca60 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -635,7 +635,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
     private CompletableFuture<MessageId> findMessageIdByPublishTime(long 
timestamp, ManagedLedger managedLedger) {
         return managedLedger.asyncFindPosition(entry -> {
             try {
-                long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
+                long entryTimestamp = entry.getEntryTimestamp();
                 return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, 
timestamp);
             } catch (Exception e) {
                 log.error("Error deserializing message for message position 
find", e);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index b152eda4689..f3eb7b434a9 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -28,7 +28,6 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.fail;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
@@ -480,7 +479,7 @@ public class MessageImplTest {
             entryTimestamp = Commands.getEntryTimestamp(compositeByteBuf);
             assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp));
             assertEquals(entryTimestamp, brokerEntryTimestamp);
-        } catch (IOException e) {
+        } catch (Exception e) {
             fail();
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 7aaa010c11e..cab4dc8bcab 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -22,6 +22,7 @@ import static 
com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Strings;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
@@ -36,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.ToLongFunction;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -457,7 +460,7 @@ public class Commands {
 
     public static void skipChecksumIfPresent(ByteBuf buffer) {
         if (hasChecksum(buffer)) {
-            readChecksum(buffer);
+            buffer.skipBytes(Short.BYTES + Integer.BYTES);
         }
     }
 
@@ -488,15 +491,21 @@ public class Commands {
         buffer.skipBytes(metadataSize);
     }
 
-    public static long getEntryTimestamp(ByteBuf 
headersAndPayloadWithBrokerEntryMetadata) throws IOException {
+    /**
+     * Gets the entry timestamp from either broker metadata broker timestamp 
or the message metadata publish time.
+     * Prefer using Managed Ledger's Entry's getEntryTimestamp() method over 
this method.
+     * @param headersAndPayloadWithBrokerEntryMetadata headers and payload for 
the message
+     * @return the entry timestamp
+     */
+    public static long getEntryTimestamp(ByteBuf 
headersAndPayloadWithBrokerEntryMetadata) {
         // get broker timestamp first if BrokerEntryMetadata is enabled with 
AppendBrokerTimestampMetadataInterceptor
-        BrokerEntryMetadata brokerEntryMetadata =
-                
Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
-            return brokerEntryMetadata.getBrokerTimestamp();
-        }
-        // otherwise get the publish_time
-        return 
parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+        return 
peekBrokerEntryMetadataToLong(headersAndPayloadWithBrokerEntryMetadata, 
brokerEntryMetadata -> {
+            if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
+                return brokerEntryMetadata.getBrokerTimestamp();
+            }
+            // otherwise get the publish_time
+            return 
parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
+        });
     }
 
     public static BaseCommand newMessageCommand(long consumerId, long 
ledgerId, long entryId, int partition,
@@ -1761,39 +1770,126 @@ public class Commands {
         return compositeByteBuf;
     }
 
-    public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf 
headerAndPayloadWithBrokerEntryMetadata) {
-        int readerIndex = 
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
-        if (headerAndPayloadWithBrokerEntryMetadata.readShort() == 
magicBrokerEntryMetadata) {
-            int brokerEntryMetadataSize = 
headerAndPayloadWithBrokerEntryMetadata.readInt();
-            
headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
-                    + brokerEntryMetadataSize);
-        } else {
-            headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+    /**
+     * Moves the readerIndex ahead skipping possible BrokerEntryMetadata if it 
exists in the header and payload
+     * buffer.
+     * @param headerAndPayload the header and payload buffer
+     * @return the header and payload buffer passed as parameter
+     */
+    public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf 
headerAndPayload) {
+        int readerIndex = headerAndPayload.readerIndex();
+        if (headerAndPayload.getShort(readerIndex) == 
magicBrokerEntryMetadata) {
+            headerAndPayload.skipBytes(Short.BYTES);
+            int brokerEntryMetadataSize = headerAndPayload.readInt();
+            headerAndPayload.skipBytes(brokerEntryMetadataSize);
         }
-        return headerAndPayloadWithBrokerEntryMetadata;
+        return headerAndPayload;
+    }
+
+    /**
+     * Parses the broker entry metadata from the header and payload buffer and 
returns a new BrokerEntryMetadata
+     * instance if the broker entry metadata exists in the header and payload 
buffer. Null is returned if the
+     * broker entry metadata does not exist in the header and payload buffer.
+     * The readerIndex of the headerAndPayload buffer is advanced.
+     *
+     * @param headerAndPayload the header and payload buffer
+     * @return broker entry metadata or null
+     */
+    public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(ByteBuf 
headerAndPayload) {
+        return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null, 
false);
+    }
+
+    /**
+     * Parses the broker entry metadata from the header and payload buffer and 
returns a new BrokerEntryMetadata
+     * instance if the broker entry metadata exists in the header and payload 
buffer. Null is returned if the
+     * broker entry metadata does not exist in the header and payload buffer.
+     * The readerIndex of the headerAndPayload buffer is not advanced.
+     *
+     * @param headerAndPayload the header and payload buffer
+     * @return broker entry metadata or null
+     */
+    public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
+            ByteBuf headerAndPayload) {
+        return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null, 
true);
     }
 
-    public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(
-            ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
-        int readerIndex = 
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
-        if (headerAndPayloadWithBrokerEntryMetadata.getShort(readerIndex) == 
magicBrokerEntryMetadata) {
-            headerAndPayloadWithBrokerEntryMetadata.skipBytes(2);
-            int brokerEntryMetadataSize = 
headerAndPayloadWithBrokerEntryMetadata.readInt();
-            BrokerEntryMetadata brokerEntryMetadata = new 
BrokerEntryMetadata();
-            
brokerEntryMetadata.parseFrom(headerAndPayloadWithBrokerEntryMetadata, 
brokerEntryMetadataSize);
-            return brokerEntryMetadata;
+    /**
+     * Internal method for parsing and peeking broker entry metadata.
+     * @param headerAndPayload header and payload buffer
+     * @param brokerEntryMetadata the broker entry metadata instance to reuse, 
null if a new instance should be created
+     * @param peek when true, the readerIndex of the headerAndPayload buffer 
is resetted to the original
+     * @return the broker entry metadata instance or null
+     */
+    private static BrokerEntryMetadata parseOrPeekBrokerEntryMetadataIfExist(
+            ByteBuf headerAndPayload, BrokerEntryMetadata brokerEntryMetadata, 
boolean peek) {
+        int readerIndex = headerAndPayload.readerIndex();
+        if (headerAndPayload.getShort(readerIndex) == 
magicBrokerEntryMetadata) {
+            headerAndPayload.skipBytes(Short.BYTES);
+            try {
+                int brokerEntryMetadataSize = headerAndPayload.readInt();
+                if (brokerEntryMetadata == null) {
+                    brokerEntryMetadata = new BrokerEntryMetadata();
+                }
+                brokerEntryMetadata.parseFrom(headerAndPayload, 
brokerEntryMetadataSize);
+                return brokerEntryMetadata;
+            } finally {
+                if (peek) {
+                    headerAndPayload.readerIndex(readerIndex);
+                }
+            }
         } else {
             return null;
         }
     }
 
-    public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
-            ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
-        final int readerIndex = 
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
-        BrokerEntryMetadata entryMetadata =
-                
parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
-        headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
-        return entryMetadata;
+    /**
+     * Peeks the BrokerEntryMetadata from the given payload and applies the 
function to the result.
+     * null will be passed to the function if no BrokerEntryMetadata is found.
+     * The function shouldn't return the BrokerEntryMetadata instance or 
reference it after the function completes
+     * since it's a ThreadLocal instance that is reused.
+     *
+     * @param headerAndPayload the header and payload of the message
+     * @param function the function to apply to the BrokerEntryMetadata
+     * @param <T> the return type of the function
+     * @return the result of the function
+     */
+    public static <T> T peekBrokerEntryMetadataToObject(ByteBuf 
headerAndPayload,
+                                                        
Function<BrokerEntryMetadata, T> function) {
+        BrokerEntryMetadata brokerEntryMetadata =
+                parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, 
BROKER_ENTRY_METADATA.get(), true);
+        return function.apply(brokerEntryMetadata);
+    }
+
+    /**
+     * Peeks the BrokerEntryMetadata from the given payload and applies a 
function returning a long value to the result.
+     * null will be passed to the function if no BrokerEntryMetadata is found. 
The function shouldn't reference the
+     * BrokerEntryMetadata instance after the function completes since it's a 
ThreadLocal instance that is reused.
+     *
+     * @param headerAndPayload the header and payload of the message
+     * @param function the function to apply to the BrokerEntryMetadata
+     * @return the result of the function
+     */
+    public static long peekBrokerEntryMetadataToLong(ByteBuf headerAndPayload,
+                                                     
ToLongFunction<BrokerEntryMetadata> function) {
+        BrokerEntryMetadata brokerEntryMetadata =
+                parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, 
BROKER_ENTRY_METADATA.get(), true);
+        return function.applyAsLong(brokerEntryMetadata);
+    }
+
+    /**
+     * Peeks the BrokerEntryMetadata from the given payload and consumes the 
value using a function.
+     * null will be passed to the function if no BrokerEntryMetadata is found.
+     * The function shouldn't keep a reference to the BrokerEntryMetadata 
instance after the call completes
+     * since it's a ThreadLocal instance that is reused.
+     *
+     * @param headerAndPayload the header and payload of the message
+     * @param function the function to apply to the BrokerEntryMetadata
+     */
+    public static void peekBrokerEntryMetadataAndConsume(ByteBuf 
headerAndPayload,
+                                                         
Consumer<BrokerEntryMetadata> function) {
+        BrokerEntryMetadata brokerEntryMetadata =
+                parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, 
BROKER_ENTRY_METADATA.get(), true);
+        function.accept(brokerEntryMetadata);
     }
 
     public static ByteBuf serializeMetadataAndPayload(ChecksumType 
checksumType,
@@ -1958,28 +2054,28 @@ public class Commands {
         return ByteBufPair.get(headers, metadataAndPayload);
     }
 
-    public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, 
String subscription,
-            long consumerId) {
-        MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, 
subscription, consumerId);
-        if (msgMetadata == null) {
-            return -1;
-        } else {
-            return msgMetadata.getNumMessagesInBatch();
-        }
-    }
-
     public static MessageMetadata peekMessageMetadata(ByteBuf 
metadataAndPayload, String subscription,
             long consumerId) {
+        // save the reader index and restore after parsing
+        int readerIdx = metadataAndPayload.readerIndex();
         try {
-            // save the reader index and restore after parsing
-            int readerIdx = metadataAndPayload.readerIndex();
-            MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
-            metadataAndPayload.readerIndex(readerIdx);
-
+            MessageMetadata metadata = 
parseMessageMetadata(metadataAndPayload);
             return metadata;
         } catch (Throwable t) {
             log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
             return null;
+        } finally {
+            metadataAndPayload.readerIndex(readerIdx);
+        }
+    }
+
+    public static void peekMessageMetadata(ByteBuf metadataAndPayload, 
MessageMetadata msgMetadata) {
+        // save the reader index and restore after parsing
+        int readerIdx = metadataAndPayload.readerIndex();
+        try {
+            parseMessageMetadata(metadataAndPayload, msgMetadata);
+        } finally {
+            metadataAndPayload.readerIndex(readerIdx);
         }
     }
 
@@ -1992,26 +2088,28 @@ public class Commands {
      */
     public static MessageMetadata peekAndCopyMessageMetadata(
             ByteBuf metadataAndPayload, String subscription, long consumerId) {
-        final MessageMetadata localMetadata = 
peekMessageMetadata(metadataAndPayload, subscription, consumerId);
-        if (localMetadata == null) {
+        final MessageMetadata metadata = new MessageMetadata();
+        try {
+            peekMessageMetadata(metadataAndPayload, metadata);
+        } catch (Throwable t) {
+            log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
             return null;
         }
-        final MessageMetadata metadata = new MessageMetadata();
-        metadata.copyFrom(localMetadata);
         return metadata;
     }
 
     public static final byte[] NONE_KEY = 
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
 
     public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String 
topic, String subscription) {
+        int readerIdx = metadataAndPayload.readerIndex();
         try {
-            int readerIdx = metadataAndPayload.readerIndex();
             MessageMetadata metadata = 
parseMessageMetadata(metadataAndPayload);
-            metadataAndPayload.readerIndex(readerIdx);
             return resolveStickyKey(metadata);
         } catch (Throwable t) {
             log.error("[{}] [{}] Failed to peek sticky key from the message 
metadata", topic, subscription, t);
             return NONE_KEY;
+        } finally {
+            metadataAndPayload.readerIndex(readerIdx);
         }
     }
 

Reply via email to