This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 53bbd4aeff0 [improve][broker] Reduce unnecessary MessageMetadata parsing by caching the parsed instance in the broker cache (#24682) 53bbd4aeff0 is described below commit 53bbd4aeff003275d9c6bafbc6ef6c5523014ae2 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Wed Sep 10 00:23:17 2025 +0300 [improve][broker] Reduce unnecessary MessageMetadata parsing by caching the parsed instance in the broker cache (#24682) --- .../java/org/apache/bookkeeper/mledger/Entry.java | 25 +++ .../apache/bookkeeper/mledger/impl/EntryImpl.java | 24 +++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 +- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 30 +-- .../mledger/impl/ShadowManagedLedgerImpl.java | 5 + .../mledger/impl/cache/EntryCacheDisabled.java | 3 +- .../bookkeeper/mledger/impl/cache/RangeCache.java | 17 +- .../mledger/impl/cache/RangeCacheEntryWrapper.java | 34 +++- .../mledger/impl/cache/RangeEntryCacheImpl.java | 1 + .../broker/admin/impl/PersistentTopicsBase.java | 28 ++- .../intercept/ManagedLedgerInterceptorImpl.java | 12 +- .../broker/service/AbstractBaseDispatcher.java | 28 ++- .../pulsar/broker/service/EntryAndMetadata.java | 12 +- .../apache/pulsar/broker/service/ServerCnx.java | 5 +- .../NonPersistentDispatcherMultipleConsumers.java | 3 +- ...onPersistentDispatcherSingleActiveConsumer.java | 3 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 4 +- .../PersistentDispatcherMultipleConsumers.java | 6 +- .../PersistentDispatcherSingleActiveConsumer.java | 2 +- .../persistent/PersistentMessageFinder.java | 3 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 6 +- ...tickyKeyDispatcherMultipleConsumersClassic.java | 2 +- .../service/persistent/PersistentSubscription.java | 12 +- .../broker/service/persistent/PersistentTopic.java | 24 ++- .../SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +- .../buffer/impl/TopicTransactionBuffer.java | 7 +- .../service/PersistentMessageFinderTest.java | 2 +- .../apache/pulsar/client/impl/MessageImplTest.java | 3 +- .../apache/pulsar/common/protocol/Commands.java | 208 +++++++++++++++------ 29 files changed, 382 insertions(+), 140 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java index 6b8f8a95773..24ea5c17c0d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java @@ -21,6 +21,8 @@ package org.apache.bookkeeper.mledger; import io.netty.buffer.ByteBuf; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; /** * An Entry represent a ledger entry data and its associated position. @@ -98,4 +100,27 @@ public interface Entry { default boolean matchesPosition(Position position) { return position != null && position.compareTo(getLedgerId(), getEntryId()) == 0; } + + default MessageMetadata getMessageMetadata() { + return null; + } + + /** + * Returns the timestamp of the entry. + * @return + */ + default long getEntryTimestamp() { + // get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor + return Commands.peekBrokerEntryMetadataToLong(getDataBuffer(), brokerEntryMetadata -> { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) { + return brokerEntryMetadata.getBrokerTimestamp(); + } + // otherwise get the publish_time + MessageMetadata messageMetadata = getMessageMetadata(); + if (messageMetadata == null) { + messageMetadata = Commands.peekMessageMetadata(getDataBuffer(), null, -1); + } + return messageMetadata.getPublishTime(); + }); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index b85a1bc45fe..070a0fc1bea 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -24,6 +24,9 @@ import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCounted; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.Entry; @@ -33,7 +36,10 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.ReferenceCountedEntry; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +@Slf4j public final class EntryImpl extends AbstractCASReferenceCounted implements ReferenceCountedEntry, Comparable<EntryImpl> { @@ -51,6 +57,8 @@ public final class EntryImpl extends AbstractCASReferenceCounted ByteBuf data; private EntryReadCountHandler readCountHandler; private boolean decreaseReadCountOnRelease = true; + @Getter @Setter + private MessageMetadata messageMetadata; private Runnable onDeallocate; @@ -161,6 +169,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted entry.entryId = other.entryId; entry.data = other.data.retainedDuplicate(); entry.readCountHandler = other.readCountHandler; + entry.messageMetadata = other.messageMetadata; entry.setRefCnt(1); return entry; } @@ -172,6 +181,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted entry.entryId = other.getEntryId(); entry.data = other.getDataBuffer().retainedDuplicate(); entry.readCountHandler = other.getReadCountHandler(); + entry.messageMetadata = other.getMessageMetadata(); entry.setRefCnt(1); return entry; } @@ -277,6 +287,7 @@ public final class EntryImpl extends AbstractCASReferenceCounted position = null; readCountHandler = null; decreaseReadCountOnRelease = true; + messageMetadata = null; recyclerHandle.recycle(this); } @@ -294,6 +305,19 @@ public final class EntryImpl extends AbstractCASReferenceCounted decreaseReadCountOnRelease = enabled; } + public void initializeMessageMetadataIfNeeded(String managedLedgerName) { + if (messageMetadata == null) { + try { + MessageMetadata msgMetadata = new MessageMetadata(); + Commands.peekMessageMetadata(data, msgMetadata); + this.messageMetadata = msgMetadata; + } catch (Throwable t) { + log.warn("[{}] Failed to parse message metadata for entry {}:{}", managedLedgerName, ledgerId, entryId, + t); + } + } + } + @Override public String toString() { return getClass().getName() + "@" + System.identityHashCode(this) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0be740e1bda..a4527261c8b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -31,7 +31,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; @@ -144,7 +143,6 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.LazyLoadableValue; @@ -1315,9 +1313,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { @Override public void readEntryComplete(Entry entry, Object ctx) { try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); future.complete(entryTimestamp); - } catch (IOException e) { + } catch (Exception e) { log.error("Error deserializing message for message position {}", nextPos, e); future.completeExceptionally(e); } finally { @@ -4991,4 +4989,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { throw new RuntimeException(e); } } + + boolean shouldCacheAddedEntry() { + // Avoid caching entries if no cursor has been created + return getActiveCursors().shouldCacheAddedEntry(); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 49bc3f309d4..3af5618064f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -257,23 +257,23 @@ public class OpAddEntry implements AddCallback, CloseCallback, Runnable, Managed ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml); ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength); + // ctx will contain a Position instance only in the case of ShadowManagedLedgerImpl long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId(); - // Don't insert to the entry cache for the ShadowManagedLedger - if (!(ml instanceof ShadowManagedLedgerImpl)) { - // Avoid caching entries if no cursor has been created - if (ml.getActiveCursors().shouldCacheAddedEntry()) { - int expectedReadCount = 0; - // only use expectedReadCount if cache eviction is enabled by expected read count - if (ml.getConfig().isCacheEvictionByExpectedReadCount()) { - expectedReadCount = ml.getActiveCursors().size(); - } - EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount); - entry.setDecreaseReadCountOnRelease(false); - // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling - // insert - ml.entryCache.insert(entry); - entry.release(); + + // Handle caching for tailing reads + if (ml.shouldCacheAddedEntry()) { + int expectedReadCount = 0; + // only use expectedReadCount if cache eviction is enabled by expected read count + if (ml.getConfig().isCacheEvictionByExpectedReadCount()) { + // use the number of active cursors as the expected read count + expectedReadCount = ml.getActiveCursors().size(); } + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount); + entry.setDecreaseReadCountOnRelease(false); + // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling + // insert + ml.entryCache.insert(entry); + entry.release(); } Position lastEntry = PositionFactory.create(ledgerId, entryId); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 4b03cad8e0a..46cd1335a17 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -407,4 +407,9 @@ public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { this.lastLedgerCreatedTimestamp = clock.millis(); } + + @Override + boolean shouldCacheAddedEntry() { + return false; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 9cd63d99f4c..b5a45415a4f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -78,6 +78,7 @@ public class EntryCacheDisabled implements EntryCache { for (LedgerEntry e : ledgerEntries) { // Insert the entries at the end of the list (they will be unsorted for now) EntryImpl entry = EntryImpl.create(e, interceptor, 0); + entry.initializeMessageMetadataIfNeeded(ml.getName()); entries.add(entry); totalSize += entry.getLength(); } @@ -111,7 +112,7 @@ public class EntryCacheDisabled implements EntryCache { if (iterator.hasNext()) { LedgerEntry ledgerEntry = iterator.next(); EntryImpl returnEntry = EntryImpl.create(ledgerEntry, interceptor, 0); - + returnEntry.initializeMessageMetadataIfNeeded(ml.getName()); ml.getMbean().recordReadEntriesOpsCacheMisses(1, returnEntry.getLength()); ml.getFactory().getMbean().recordCacheMiss(1, returnEntry.getLength()); ml.getMbean().addReadEntriesSample(1, returnEntry.getLength()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java index a5592f7098e..858b7d3c07c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCache.java @@ -46,13 +46,23 @@ import org.apache.commons.lang3.tuple.Pair; class RangeCache { private final ConcurrentNavigableMap<Position, RangeCacheEntryWrapper> entries; private final RangeCacheRemovalQueue removalQueue; - private AtomicLong size; // Total size of values stored in cache + private final AtomicLong size; // Total size of values stored in cache + private final String managedLedgerName; /** * Construct a new RangeCache. */ public RangeCache(RangeCacheRemovalQueue removalQueue) { + this(removalQueue, null); + } + + /** + * Construct a new RangeCache. + * @param managedLedgerName the name of the managed ledger this cache belongs to + */ + public RangeCache(RangeCacheRemovalQueue removalQueue, String managedLedgerName) { this.removalQueue = removalQueue; + this.managedLedgerName = managedLedgerName; this.entries = new ConcurrentSkipListMap<>(); this.size = new AtomicLong(0); } @@ -115,7 +125,7 @@ class RangeCache { if (valueWrapper == null) { return null; } else { - ReferenceCountedEntry value = valueWrapper.getValue(key); + ReferenceCountedEntry value = valueWrapper.getValue(key, managedLedgerName); return getRetainedValueMatchingKey(key, value); } } @@ -124,7 +134,8 @@ class RangeCache { * @apiNote the returned value must be released if it's not null */ private ReferenceCountedEntry getValueMatchingEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry) { - ReferenceCountedEntry valueMatchingEntry = RangeCacheEntryWrapper.getValueMatchingMapEntry(entry); + ReferenceCountedEntry valueMatchingEntry = + RangeCacheEntryWrapper.getValueMatchingMapEntry(entry, managedLedgerName); return getRetainedValueMatchingKey(entry.getKey(), valueMatchingEntry); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java index db5ceaefd38..5c82207e1c7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeCacheEntryWrapper.java @@ -22,14 +22,17 @@ import io.netty.util.Recycler; import java.util.Map; import java.util.concurrent.locks.StampedLock; import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ReferenceCountedEntry; +import org.apache.bookkeeper.mledger.impl.EntryImpl; /** * Wrapper around the value to store in Map. This is needed to ensure that a specific instance can be removed from * the map by calling the {@link Map#remove(Object, Object)} method. Certain race conditions could result in the * wrong value being removed from the map. The instances of this class are recycled to avoid creating new objects. */ +@Slf4j class RangeCacheEntryWrapper { private final Recycler.Handle<RangeCacheEntryWrapper> recyclerHandle; private static final Recycler<RangeCacheEntryWrapper> RECYCLER = new Recycler<RangeCacheEntryWrapper>() { @@ -73,12 +76,13 @@ class RangeCacheEntryWrapper { /** * Get the value associated with the key. Returns null if the key does not match the key. * - * @param key the key to match + * @param key the key to match + * @param managedLedgerName * @return the value associated with the key, or null if the value has already been recycled or the key does not * match */ - ReferenceCountedEntry getValue(Position key) { - return getValueInternal(key, false); + ReferenceCountedEntry getValue(Position key, String managedLedgerName) { + return getValueInternal(key, false, managedLedgerName); } /** @@ -88,8 +92,9 @@ class RangeCacheEntryWrapper { * @return the value associated with the key, or null if the value has already been recycled or the key does not * exactly match the same instance */ - static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry) { - return entry.getValue().getValueInternal(entry.getKey(), true); + static ReferenceCountedEntry getValueMatchingMapEntry(Map.Entry<Position, RangeCacheEntryWrapper> entry, + String managedLedgerName) { + return entry.getValue().getValueInternal(entry.getKey(), true, managedLedgerName); } /** @@ -101,16 +106,20 @@ class RangeCacheEntryWrapper { * key as the one stored in the wrapper. This is used to avoid any races * when retrieving or removing the entries from the cache when the key and value * instances are available. + * @param managedLedgerName * @return the value associated with the key, or null if the key does not match */ - private ReferenceCountedEntry getValueInternal(Position key, boolean requireSameKeyInstance) { + private ReferenceCountedEntry getValueInternal(Position key, boolean requireSameKeyInstance, + String managedLedgerName) { long stamp = lock.tryOptimisticRead(); Position localKey = this.key; ReferenceCountedEntry localValue = this.value; + boolean messageMetadataInitialized = localValue != null && localValue.getMessageMetadata() != null; if (!lock.validate(stamp)) { stamp = lock.readLock(); localKey = this.key; localValue = this.value; + messageMetadataInitialized = localValue != null && localValue.getMessageMetadata() != null; lock.unlockRead(stamp); } // check that the given key matches the key associated with the value in the entry @@ -120,6 +129,19 @@ class RangeCacheEntryWrapper { if (localKey != key && (requireSameKeyInstance || localKey == null || !localKey.equals(key))) { return null; } + // Initialize the metadata if it's not already initialized + if (localValue != null && !messageMetadataInitialized) { + localValue = withWriteLock(wrapper -> { + // ensure that the key still matches + if (wrapper.key != key && (requireSameKeyInstance || wrapper.key == null || !wrapper.key.equals(key))) { + return null; + } + if (wrapper.value instanceof EntryImpl entry) { + entry.initializeMessageMetadataIfNeeded(managedLedgerName); + } + return wrapper.value; + }); + } accessed = true; return localValue; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 04de8dde0ad..fd391ba2bf6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -532,6 +532,7 @@ public class RangeEntryCacheImpl implements EntryCache { final List<Entry> entriesToReturn = new ArrayList<>(entriesToRead); for (LedgerEntry e : ledgerEntries) { EntryImpl entry = EntryImpl.create(e, interceptor, expectedReadCountVal); + entry.initializeMessageMetadataIfNeeded(ml.getName()); entriesToReturn.add(entry); totalSize += entry.getLength(); if (expectedReadCountVal > 0) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a16be37bb38..ec61d58d2af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2659,8 +2659,10 @@ public class PersistentTopicsBase extends AdminResource { if (entry == null) { batchSizeFuture.complete(0); } else { - MessageMetadata metadata = - Commands.parseMessageMetadata(entry.getDataBuffer()); + MessageMetadata metadata = entry.getMessageMetadata(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + } batchSizeFuture.complete(metadata.getNumMessagesInBatch()); } } catch (Exception e) { @@ -2839,7 +2841,7 @@ public class PersistentTopicsBase extends AdminResource { private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) { return managedLedger.asyncFindPosition(entry -> { try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); } catch (Exception e) { log.error("[{}] Error deserializing message for message position find", @@ -3009,7 +3011,12 @@ public class PersistentTopicsBase extends AdminResource { long totalSize = metadataAndPayload.readableBytes(); BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload); - MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); + MessageMetadata metadata = entry.getMessageMetadata(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(metadataAndPayload); + } else { + Commands.skipMessageMetadata(metadataAndPayload); + } ResponseBuilder responseBuilder = Response.ok(); responseBuilder.header("X-Pulsar-Message-ID", pos.toString()); @@ -5451,11 +5458,12 @@ public class PersistentTopicsBase extends AdminResource { private static Long getIndexFromEntry(Entry entry) { - final var brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); - if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { - return brokerEntryMetadata.getIndex(); - } else { - return null; - } + return Commands.peekBrokerEntryMetadataToObject(entry.getDataBuffer(), brokerEntryMetadata -> { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + return brokerEntryMetadata.getIndex(); + } else { + return null; + } + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index db138989a8e..a86b49d627f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; @@ -117,12 +116,11 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { if (lastEntryOptional.isPresent()) { Entry lastEntry = lastEntryOptional.get(); try { - BrokerEntryMetadata brokerEntryMetadata = - Commands.parseBrokerEntryMetadataIfExist(lastEntry.getDataBuffer()); - if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { - appendIndexMetadataInterceptor.recoveryIndexGenerator( - brokerEntryMetadata.getIndex()); - } + Commands.peekBrokerEntryMetadataAndConsume(lastEntry.getDataBuffer(), brokerEntryMetadata -> { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + appendIndexMetadataInterceptor.recoveryIndexGenerator(brokerEntryMetadata.getIndex()); + } + }); } finally { lastEntry.release(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b87..c5e001692f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -149,6 +149,8 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen msgMetadata = metadataArray[metadataIndex]; } else if (entry instanceof EntryAndMetadata) { msgMetadata = ((EntryAndMetadata) entry).getMetadata(); + } else if (entry.getMessageMetadata() != null) { + msgMetadata = entry.getMessageMetadata(); } else { msgMetadata = Commands.peekAndCopyMessageMetadata(metadataAndPayload, subscription.toString(), -1); } @@ -454,8 +456,18 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen return true; } - protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { - return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName()); + protected byte[] peekStickyKey(Entry entry) { + if (entry instanceof EntryAndMetadata entryAndMetadata) { + return entryAndMetadata.getStickyKey(); + } + MessageMetadata metadata = entry.getMessageMetadata(); + if (metadata == null) { + metadata = Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1); + } + if (metadata == null) { + return Commands.NONE_KEY; + } + return Commands.resolveStickyKey(metadata); } protected String getSubscriptionName() { @@ -528,4 +540,16 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen protected final void updatePendingBytesToDispatch(long size) { PENDING_BYTES_TO_DISPATCH.inc(size); } + + protected int getNumberOfMessagesInBatch(Entry entry) { + MessageMetadata msgMetadata = entry.getMessageMetadata(); + if (msgMetadata == null) { + msgMetadata = Commands.peekMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1); + } + if (msgMetadata == null) { + return -1; + } else { + return msgMetadata.getNumMessagesInBatch(); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java index 33abddc300b..117f0115064 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java @@ -43,12 +43,22 @@ public class EntryAndMetadata implements Entry { } public static EntryAndMetadata create(final Entry entry, final MessageMetadata metadata) { + if (entry instanceof EntryAndMetadata entryAndMetadata) { + return entryAndMetadata; + } return new EntryAndMetadata(entry, metadata); } @VisibleForTesting public static EntryAndMetadata create(final Entry entry) { - return create(entry, Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1)); + if (entry instanceof EntryAndMetadata entryAndMetadata) { + return entryAndMetadata; + } + MessageMetadata msgMetadata = entry.getMessageMetadata(); + if (msgMetadata == null) { + msgMetadata = Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), "", -1); + } + return create(entry, msgMetadata); } public byte[] getStickyKey() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d1bc4953d11..44927c375b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2365,7 +2365,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { }, null); CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + MessageMetadata metadata = entry.getMessageMetadata(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + } int batchSize = metadata.getNumMessagesInBatch(); entry.release(); return metadata.hasNumMessagesInBatch() ? batchSize : -1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 5941093e71c..a4fba30cc9b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -37,7 +37,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; @@ -205,7 +204,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } else { entries.forEach(entry -> { - int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1); + int totalMsgs = getNumberOfMessagesInBatch(entry); if (totalMsgs > 0) { msgDrop.recordEvent(totalMsgs); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 414e9235418..26bb5a791cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -30,7 +30,6 @@ import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; @Slf4j @@ -66,7 +65,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()); } else { entries.forEach(entry -> { - int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1); + int totalMsgs = getNumberOfMessagesInBatch(entry); if (totalMsgs > 0) { msgDrop.recordEvent(totalMsgs); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 915f9c0f925..2e884eb0ea1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -151,7 +151,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { - byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); + byte[] stickyKey = peekStickyKey(entry); int stickyKeyHash = selector.makeStickyKeyHash(stickyKey); Consumer consumer = selector.select(stickyKeyHash); @@ -182,7 +182,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } else { entriesForConsumer.forEach(e -> { - int totalMsgs = Commands.getNumberOfMessagesInBatch(e.getDataBuffer(), subscription.toString(), -1); + int totalMsgs = getNumberOfMessagesInBatch(e); if (totalMsgs > 0) { msgDrop.recordEvent(totalMsgs); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c35d802f43d..097ab9cd0fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -72,7 +72,6 @@ import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferEx import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; @@ -798,10 +797,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis if (entry instanceof EntryAndMetadata) { metadata = ((EntryAndMetadata) entry).getMetadata(); } else { - metadata = Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1); // cache the metadata in the entry with EntryAndMetadata for later use to avoid re-parsing the metadata // and to carry the metadata and calculated stickyKeyHash with the entry - entries.set(i, EntryAndMetadata.create(entry, metadata)); + EntryAndMetadata entryAndMetadata = EntryAndMetadata.create(entry); + metadata = entryAndMetadata.getMetadata(); + entries.set(i, entryAndMetadata); } if (metadata != null) { remainingMessages += metadata.getNumMessagesInBatch(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 99a09d3a5d7..e809d984ae3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -189,7 +189,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher Iterator<Entry> iterator = entries.iterator(); while (iterator.hasNext()) { Entry entry = iterator.next(); - byte[] key = peekStickyKey(entry.getDataBuffer()); + byte[] key = peekStickyKey(entry); Consumer consumer = stickyKeyConsumerSelector.select(key); // Skip the entry if it's not for current active consumer. if (consumer == null || currentConsumer != consumer) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index e8bc9fdc3ea..991e6a06029 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -30,7 +30,6 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +72,7 @@ public class PersistentMessageFinder implements AsyncCallbacks.FindEntryCallback cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> { try { // Find the latest entry that is earlier than the target timestamp. - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 080f5acbf16..e0c74f9043c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -58,7 +58,6 @@ import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -419,8 +418,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } else { // replace the input entry with EntryAndMetadata instance. In addition to the entry and metadata, // it will also carry the calculated sticky key hash - entry = EntryAndMetadata.create(inputEntry, - Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1)); + entry = EntryAndMetadata.create(inputEntry); } int stickyKeyHash = getStickyKeyHash(entry); Consumer consumer = null; @@ -610,7 +608,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi // use the cached sticky key hash if available, otherwise calculate the sticky key hash and cache it return entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash); } - return selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + return selector.makeStickyKeyHash(peekStickyKey(entry)); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index c3b246fe9ba..1f29cb73626 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -652,7 +652,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic // use the cached sticky key hash if available, otherwise calculate the sticky key hash and cache it return entryAndMetadata.getOrUpdateCachedStickyKeyHash(selector::makeStickyKeyHash); } - return selector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + return selector.makeStickyKeyHash(peekStickyKey(entry)); } private static final Logger log = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ac54957f0c6..056e9a25fda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -23,7 +23,6 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopi import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; -import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -644,7 +643,12 @@ public class PersistentSubscription extends AbstractSubscription { firstPosition.compareAndSet(null, entryPosition); lastPosition.set(entryPosition); ByteBuf metadataAndPayload = entry.getDataBuffer(); - MessageMetadata messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1); + MessageMetadata messageMetadata; + if (entry.getMessageMetadata() != null) { + messageMetadata = entry.getMessageMetadata(); + } else { + messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1); + } int numMessages = 1; if (messageMetadata.hasNumMessagesInBatch()) { numMessages = messageMetadata.getNumMessagesInBatch(); @@ -1438,9 +1442,9 @@ public class PersistentSubscription extends AbstractSubscription { @Override public void readEntryComplete(Entry entry, Object ctx) { try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); future.complete(entryTimestamp); - } catch (IOException e) { + } catch (Exception e) { log.error("Error deserializing message for message position {}", nextPos, e); future.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 657f03d6b38..3bbd8b00777 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; -import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -3801,7 +3800,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public void readEntryComplete(Entry entry, Object ctx) { try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); updateResultIfNewer( new OldestPositionInfo( oldestMarkDeleteCursorInfo.getPosition(), @@ -3979,7 +3978,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal try { entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include); if (entry != null) { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); isOldestMessageExpired = MessageImpl.isEntryExpired( (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp); } @@ -4009,10 +4008,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public void readEntryComplete(Entry entry, Object ctx) { long entryTimestamp = 0; try { - entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + entryTimestamp = entry.getEntryTimestamp(); res.complete(MessageImpl.isEntryExpired( (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp)); - } catch (IOException e) { + } catch (Exception e) { log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e); res.complete(false); } @@ -4110,7 +4109,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return CompletableFuture.completedFuture(lastDispatchablePosition); } return ledger.getLastDispatchablePosition(entry -> { - MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + MessageMetadata md = entry.getMessageMetadata(); + if (md == null) { + md = Commands.parseMessageMetadata(entry.getDataBuffer()); + } // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer if (Markers.isServerOnlyMarker(md)) { return false; @@ -4179,7 +4181,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public void readEntryComplete(Entry entry, Object ctx) { try { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + MessageMetadata metadata = entry.getMessageMetadata(); + if (metadata == null) { + metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + } if (metadata.hasNumMessagesInBatch()) { completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex, metadata.getNumMessagesInBatch() - 1)); @@ -4797,7 +4802,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public void readEntryComplete(Entry entry, Object ctx) { try { ByteBuf metadataAndPayload = entry.getDataBuffer(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + MessageMetadata msgMetadata = entry.getMessageMetadata(); + if (msgMetadata == null) { + msgMetadata = Commands.parseMessageMetadata(metadataAndPayload); + } long publishTime = msgMetadata.getPublishTime(); future.complete(publishTime); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index 98d8a40eb36..38b902dc520 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -408,7 +408,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess //decode snapshot from entry ByteBuf headersAndPayload = entry.getDataBuffer(); //skip metadata - Commands.parseMessageMetadata(headersAndPayload); + Commands.skipMessageMetadata(headersAndPayload); TransactionBufferSnapshotSegment snapshotSegment = Schema.AVRO(TransactionBufferSnapshotSegment.class) .decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index c43f0ed7fb9..c3961dca6d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -184,8 +184,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen public void handleTxnEntry(Entry entry) { ByteBuf metadataAndPayload = entry.getDataBuffer(); - MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, - TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1); + MessageMetadata msgMetadata = entry.getMessageMetadata(); + if (msgMetadata == null) { + msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, + TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1); + } if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()); Position position = PositionFactory.create(entry.getLedgerId(), entry.getEntryId()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 4dc10671e6a..5707a6cca60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -635,7 +635,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) { return managedLedger.asyncFindPosition(entry -> { try { - long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + long entryTimestamp = entry.getEntryTimestamp(); return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp); } catch (Exception e) { log.error("Error deserializing message for message position find", e); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index b152eda4689..f3eb7b434a9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -28,7 +28,6 @@ import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.fail; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -480,7 +479,7 @@ public class MessageImplTest { entryTimestamp = Commands.getEntryTimestamp(compositeByteBuf); assertFalse(MessageImpl.isEntryExpired(24 * 3600, entryTimestamp)); assertEquals(entryTimestamp, brokerEntryTimestamp); - } catch (IOException e) { + } catch (Exception e) { fail(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7aaa010c11e..cab4dc8bcab 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -22,6 +22,7 @@ import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Strings; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -36,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.ToLongFunction; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -457,7 +460,7 @@ public class Commands { public static void skipChecksumIfPresent(ByteBuf buffer) { if (hasChecksum(buffer)) { - readChecksum(buffer); + buffer.skipBytes(Short.BYTES + Integer.BYTES); } } @@ -488,15 +491,21 @@ public class Commands { buffer.skipBytes(metadataSize); } - public static long getEntryTimestamp(ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException { + /** + * Gets the entry timestamp from either broker metadata broker timestamp or the message metadata publish time. + * Prefer using Managed Ledger's Entry's getEntryTimestamp() method over this method. + * @param headersAndPayloadWithBrokerEntryMetadata headers and payload for the message + * @return the entry timestamp + */ + public static long getEntryTimestamp(ByteBuf headersAndPayloadWithBrokerEntryMetadata) { // get broker timestamp first if BrokerEntryMetadata is enabled with AppendBrokerTimestampMetadataInterceptor - BrokerEntryMetadata brokerEntryMetadata = - Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) { - return brokerEntryMetadata.getBrokerTimestamp(); - } - // otherwise get the publish_time - return parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime(); + return peekBrokerEntryMetadataToLong(headersAndPayloadWithBrokerEntryMetadata, brokerEntryMetadata -> { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) { + return brokerEntryMetadata.getBrokerTimestamp(); + } + // otherwise get the publish_time + return parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime(); + }); } public static BaseCommand newMessageCommand(long consumerId, long ledgerId, long entryId, int partition, @@ -1761,39 +1770,126 @@ public class Commands { return compositeByteBuf; } - public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) { - int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); - if (headerAndPayloadWithBrokerEntryMetadata.readShort() == magicBrokerEntryMetadata) { - int brokerEntryMetadataSize = headerAndPayloadWithBrokerEntryMetadata.readInt(); - headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex() - + brokerEntryMetadataSize); - } else { - headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex); + /** + * Moves the readerIndex ahead skipping possible BrokerEntryMetadata if it exists in the header and payload + * buffer. + * @param headerAndPayload the header and payload buffer + * @return the header and payload buffer passed as parameter + */ + public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf headerAndPayload) { + int readerIndex = headerAndPayload.readerIndex(); + if (headerAndPayload.getShort(readerIndex) == magicBrokerEntryMetadata) { + headerAndPayload.skipBytes(Short.BYTES); + int brokerEntryMetadataSize = headerAndPayload.readInt(); + headerAndPayload.skipBytes(brokerEntryMetadataSize); } - return headerAndPayloadWithBrokerEntryMetadata; + return headerAndPayload; + } + + /** + * Parses the broker entry metadata from the header and payload buffer and returns a new BrokerEntryMetadata + * instance if the broker entry metadata exists in the header and payload buffer. Null is returned if the + * broker entry metadata does not exist in the header and payload buffer. + * The readerIndex of the headerAndPayload buffer is advanced. + * + * @param headerAndPayload the header and payload buffer + * @return broker entry metadata or null + */ + public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(ByteBuf headerAndPayload) { + return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null, false); + } + + /** + * Parses the broker entry metadata from the header and payload buffer and returns a new BrokerEntryMetadata + * instance if the broker entry metadata exists in the header and payload buffer. Null is returned if the + * broker entry metadata does not exist in the header and payload buffer. + * The readerIndex of the headerAndPayload buffer is not advanced. + * + * @param headerAndPayload the header and payload buffer + * @return broker entry metadata or null + */ + public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist( + ByteBuf headerAndPayload) { + return parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, null, true); } - public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist( - ByteBuf headerAndPayloadWithBrokerEntryMetadata) { - int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); - if (headerAndPayloadWithBrokerEntryMetadata.getShort(readerIndex) == magicBrokerEntryMetadata) { - headerAndPayloadWithBrokerEntryMetadata.skipBytes(2); - int brokerEntryMetadataSize = headerAndPayloadWithBrokerEntryMetadata.readInt(); - BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata(); - brokerEntryMetadata.parseFrom(headerAndPayloadWithBrokerEntryMetadata, brokerEntryMetadataSize); - return brokerEntryMetadata; + /** + * Internal method for parsing and peeking broker entry metadata. + * @param headerAndPayload header and payload buffer + * @param brokerEntryMetadata the broker entry metadata instance to reuse, null if a new instance should be created + * @param peek when true, the readerIndex of the headerAndPayload buffer is resetted to the original + * @return the broker entry metadata instance or null + */ + private static BrokerEntryMetadata parseOrPeekBrokerEntryMetadataIfExist( + ByteBuf headerAndPayload, BrokerEntryMetadata brokerEntryMetadata, boolean peek) { + int readerIndex = headerAndPayload.readerIndex(); + if (headerAndPayload.getShort(readerIndex) == magicBrokerEntryMetadata) { + headerAndPayload.skipBytes(Short.BYTES); + try { + int brokerEntryMetadataSize = headerAndPayload.readInt(); + if (brokerEntryMetadata == null) { + brokerEntryMetadata = new BrokerEntryMetadata(); + } + brokerEntryMetadata.parseFrom(headerAndPayload, brokerEntryMetadataSize); + return brokerEntryMetadata; + } finally { + if (peek) { + headerAndPayload.readerIndex(readerIndex); + } + } } else { return null; } } - public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist( - ByteBuf headerAndPayloadWithBrokerEntryMetadata) { - final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); - BrokerEntryMetadata entryMetadata = - parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata); - headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex); - return entryMetadata; + /** + * Peeks the BrokerEntryMetadata from the given payload and applies the function to the result. + * null will be passed to the function if no BrokerEntryMetadata is found. + * The function shouldn't return the BrokerEntryMetadata instance or reference it after the function completes + * since it's a ThreadLocal instance that is reused. + * + * @param headerAndPayload the header and payload of the message + * @param function the function to apply to the BrokerEntryMetadata + * @param <T> the return type of the function + * @return the result of the function + */ + public static <T> T peekBrokerEntryMetadataToObject(ByteBuf headerAndPayload, + Function<BrokerEntryMetadata, T> function) { + BrokerEntryMetadata brokerEntryMetadata = + parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, BROKER_ENTRY_METADATA.get(), true); + return function.apply(brokerEntryMetadata); + } + + /** + * Peeks the BrokerEntryMetadata from the given payload and applies a function returning a long value to the result. + * null will be passed to the function if no BrokerEntryMetadata is found. The function shouldn't reference the + * BrokerEntryMetadata instance after the function completes since it's a ThreadLocal instance that is reused. + * + * @param headerAndPayload the header and payload of the message + * @param function the function to apply to the BrokerEntryMetadata + * @return the result of the function + */ + public static long peekBrokerEntryMetadataToLong(ByteBuf headerAndPayload, + ToLongFunction<BrokerEntryMetadata> function) { + BrokerEntryMetadata brokerEntryMetadata = + parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, BROKER_ENTRY_METADATA.get(), true); + return function.applyAsLong(brokerEntryMetadata); + } + + /** + * Peeks the BrokerEntryMetadata from the given payload and consumes the value using a function. + * null will be passed to the function if no BrokerEntryMetadata is found. + * The function shouldn't keep a reference to the BrokerEntryMetadata instance after the call completes + * since it's a ThreadLocal instance that is reused. + * + * @param headerAndPayload the header and payload of the message + * @param function the function to apply to the BrokerEntryMetadata + */ + public static void peekBrokerEntryMetadataAndConsume(ByteBuf headerAndPayload, + Consumer<BrokerEntryMetadata> function) { + BrokerEntryMetadata brokerEntryMetadata = + parseOrPeekBrokerEntryMetadataIfExist(headerAndPayload, BROKER_ENTRY_METADATA.get(), true); + function.accept(brokerEntryMetadata); } public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType, @@ -1958,28 +2054,28 @@ public class Commands { return ByteBufPair.get(headers, metadataAndPayload); } - public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, String subscription, - long consumerId) { - MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId); - if (msgMetadata == null) { - return -1; - } else { - return msgMetadata.getNumMessagesInBatch(); - } - } - public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, String subscription, long consumerId) { + // save the reader index and restore after parsing + int readerIdx = metadataAndPayload.readerIndex(); try { - // save the reader index and restore after parsing - int readerIdx = metadataAndPayload.readerIndex(); - MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); - metadataAndPayload.readerIndex(readerIdx); - + MessageMetadata metadata = parseMessageMetadata(metadataAndPayload); return metadata; } catch (Throwable t) { log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t); return null; + } finally { + metadataAndPayload.readerIndex(readerIdx); + } + } + + public static void peekMessageMetadata(ByteBuf metadataAndPayload, MessageMetadata msgMetadata) { + // save the reader index and restore after parsing + int readerIdx = metadataAndPayload.readerIndex(); + try { + parseMessageMetadata(metadataAndPayload, msgMetadata); + } finally { + metadataAndPayload.readerIndex(readerIdx); } } @@ -1992,26 +2088,28 @@ public class Commands { */ public static MessageMetadata peekAndCopyMessageMetadata( ByteBuf metadataAndPayload, String subscription, long consumerId) { - final MessageMetadata localMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId); - if (localMetadata == null) { + final MessageMetadata metadata = new MessageMetadata(); + try { + peekMessageMetadata(metadataAndPayload, metadata); + } catch (Throwable t) { + log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t); return null; } - final MessageMetadata metadata = new MessageMetadata(); - metadata.copyFrom(localMetadata); return metadata; } public static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8); public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) { + int readerIdx = metadataAndPayload.readerIndex(); try { - int readerIdx = metadataAndPayload.readerIndex(); MessageMetadata metadata = parseMessageMetadata(metadataAndPayload); - metadataAndPayload.readerIndex(readerIdx); return resolveStickyKey(metadata); } catch (Throwable t) { log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t); return NONE_KEY; + } finally { + metadataAndPayload.readerIndex(readerIdx); } }