This is an automated email from the ASF dual-hosted git repository.

lordcheng10 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f96146f13b [improve][broker] Reducing the parse of MessageMetadata in 
compaction (#23285)
4f96146f13b is described below

commit 4f96146f13b136644a4eb0cf4ec36699e0431929
Author: AloysZhang <[email protected]>
AuthorDate: Sat Sep 14 19:11:26 2024 +0800

    [improve][broker] Reducing the parse of MessageMetadata in compaction 
(#23285)
    
    Co-authored-by: Aloys Zhang <[email protected]>
---
 .../pulsar/client/impl/RawBatchConverter.java      | 28 ++++++++++++++++++----
 .../compaction/AbstractTwoPhaseCompactor.java      | 14 +++++------
 .../pulsar/compaction/EventTimeOrderCompactor.java | 24 +++++++++----------
 .../compaction/PublishingOrderCompactor.java       | 10 ++++----
 4 files changed, 46 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index f41a7aedd59..d8c491dab29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -52,12 +52,16 @@ public class RawBatchConverter {
         return metadata.hasNumMessagesInBatch() && 
metadata.getEncryptionKeysCount() == 0;
     }
 
-    public static List<MessageCompactionData> 
extractMessageCompactionData(RawMessage msg)
+    public static List<MessageCompactionData> 
extractMessageCompactionData(RawMessage msg, MessageMetadata metadata)
         throws IOException {
         checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
         ByteBuf payload = msg.getHeadersAndPayload();
-        MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        if (metadata == null) {
+            metadata = Commands.parseMessageMetadata(payload);
+        } else {
+            Commands.skipMessageMetadata(payload);
+        }
         int batchSize = metadata.getNumMessagesInBatch();
 
         CompressionType compressionType = metadata.getCompression();
@@ -91,7 +95,16 @@ public class RawBatchConverter {
         RawMessage msg)
         throws IOException {
         List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = 
new ArrayList<>();
-        for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
+        for (MessageCompactionData mcd : extractMessageCompactionData(msg, 
null)) {
+            idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), 
mcd.key(), mcd.payloadSize()));
+        }
+        return idsAndKeysAndSize;
+    }
+
+    public static List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSize(
+            RawMessage msg, MessageMetadata metadata) throws IOException {
+        List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = 
new ArrayList<>();
+        for (MessageCompactionData mcd : extractMessageCompactionData(msg, 
metadata)) {
             idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), 
mcd.key(), mcd.payloadSize()));
         }
         return idsAndKeysAndSize;
@@ -99,7 +112,7 @@ public class RawBatchConverter {
 
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
                                                       BiPredicate<String, 
MessageId> filter) throws IOException {
-        return rebatchMessage(msg, filter, true);
+        return rebatchMessage(msg, null, filter, true);
     }
 
     /**
@@ -109,6 +122,7 @@ public class RawBatchConverter {
      *  NOTE: this message does not alter the reference count of the 
RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      MessageMetadata metadata,
                                                       BiPredicate<String, 
MessageId> filter,
                                                       boolean retainNullKey)
             throws IOException {
@@ -123,7 +137,11 @@ public class RawBatchConverter {
             payload.readerIndex(readerIndex);
             brokerMeta = payload.readSlice(brokerEntryMetadataSize + 
Short.BYTES + Integer.BYTES);
         }
-        MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        if (metadata == null) {
+            metadata = Commands.parseMessageMetadata(payload);
+        } else {
+            Commands.skipMessageMetadata(payload);
+        }
         ByteBuf batchBuffer = 
PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
 
         CompressionType compressionType = metadata.getCompression();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 5b03f270251..ddfe8825a88 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -77,7 +77,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
   protected abstract Map<String, MessageId> 
toLatestMessageIdForKey(Map<String, T> latestForKey);
 
   protected abstract boolean compactMessage(String topic, Map<String, T> 
latestForKey,
-      RawMessage m, MessageId id);
+      RawMessage m, MessageMetadata metadata, MessageId id);
 
 
   protected abstract boolean compactBatchMessage(String topic, Map<String, T> 
latestForKey,
@@ -147,7 +147,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
         } else if (RawBatchConverter.isReadableBatch(metadata)) {
           deletedMessage = compactBatchMessage(reader.getTopic(), 
latestForKey, m, metadata, id);
         } else {
-          deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, 
id);
+          deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, 
metadata, id);
         }
         MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
         MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
@@ -239,7 +239,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
         } else if (RawBatchConverter.isReadableBatch(metadata)) {
           try {
             messageToAdd = rebatchMessage(reader.getTopic(),
-                m, (key, subid) -> subid.equals(latestForKey.get(key)),
+                m, metadata, (key, subid) -> 
subid.equals(latestForKey.get(key)),
                 topicCompactionRetainNullKey);
           } catch (IOException ioe) {
             log.info("Error decoding batch for message {}. Whole batch will be 
included in output",
@@ -247,7 +247,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
             messageToAdd = Optional.of(m);
           }
         } else {
-          Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+          Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
           MessageId msg;
           if (keyAndSize == null) {
             messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : 
Optional.empty();
@@ -392,9 +392,8 @@ public abstract class AbstractTwoPhaseCompactor<T> extends 
Compactor {
     return bkf;
   }
 
-  protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+  protected Pair<String, Integer> extractKeyAndSize(RawMessage m, 
MessageMetadata msgMetadata) {
     ByteBuf headersAndPayload = m.getHeadersAndPayload();
-    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
     if (msgMetadata.hasPartitionKey()) {
       int size = headersAndPayload.readableBytes();
       if (msgMetadata.hasUncompressedSize()) {
@@ -408,13 +407,14 @@ public abstract class AbstractTwoPhaseCompactor<T> 
extends Compactor {
 
 
   protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
+      MessageMetadata metadata,
       BiPredicate<String, MessageId> filter,
       boolean retainNullKey)
       throws IOException {
     if (log.isDebugEnabled()) {
       log.debug("Rebatching message {} for topic {}", msg.getMessageId(), 
topic);
     }
-    return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
+    return RawBatchConverter.rebatchMessage(msg, metadata, filter, 
retainNullKey);
   }
 
   protected static class PhaseOneResult<T> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
index 2cd19ba15d6..db129b54533 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -34,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.impl.RawBatchConverter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,10 +60,10 @@ public class EventTimeOrderCompactor extends 
AbstractTwoPhaseCompactor<Pair<Mess
 
   @Override
   protected boolean compactMessage(String topic, Map<String, Pair<MessageId, 
Long>> latestForKey,
-      RawMessage m, MessageId id) {
+      RawMessage m, MessageMetadata metadata, MessageId id) {
     boolean deletedMessage = false;
     boolean replaceMessage = false;
-    MessageCompactionData mcd = extractMessageCompactionData(m);
+    MessageCompactionData mcd = extractMessageCompactionData(m, metadata);
 
     if (mcd != null) {
       boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
@@ -100,7 +99,7 @@ public class EventTimeOrderCompactor extends 
AbstractTwoPhaseCompactor<Pair<Mess
       int numMessagesInBatch = metadata.getNumMessagesInBatch();
       int deleteCnt = 0;
 
-      for (MessageCompactionData mcd : 
extractMessageCompactionDataFromBatch(m)) {
+      for (MessageCompactionData mcd : 
extractMessageCompactionDataFromBatch(m, metadata)) {
         if (mcd.key() == null) {
           if (!topicCompactionRetainNullKey) {
             // record delete null-key message event
@@ -139,23 +138,22 @@ public class EventTimeOrderCompactor extends 
AbstractTwoPhaseCompactor<Pair<Mess
     return deletedMessage;
   }
 
-  protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
+  protected MessageCompactionData extractMessageCompactionData(RawMessage m, 
MessageMetadata metadata) {
     ByteBuf headersAndPayload = m.getHeadersAndPayload();
-    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
-    if (msgMetadata.hasPartitionKey()) {
+    if (metadata.hasPartitionKey()) {
       int size = headersAndPayload.readableBytes();
-      if (msgMetadata.hasUncompressedSize()) {
-        size = msgMetadata.getUncompressedSize();
+      if (metadata.hasUncompressedSize()) {
+        size = metadata.getUncompressedSize();
       }
-      return new MessageCompactionData(m.getMessageId(), 
msgMetadata.getPartitionKey(),
-          size, msgMetadata.getEventTime());
+      return new MessageCompactionData(m.getMessageId(), 
metadata.getPartitionKey(),
+          size, metadata.getEventTime());
     } else {
       return null;
     }
   }
 
-  private List<MessageCompactionData> 
extractMessageCompactionDataFromBatch(RawMessage msg)
+  private List<MessageCompactionData> 
extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
       throws IOException {
-    return RawBatchConverter.extractMessageCompactionData(msg);
+    return RawBatchConverter.extractMessageCompactionData(msg, metadata);
   }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
index a825c0782fb..223e8c421a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
@@ -53,10 +53,10 @@ public class PublishingOrderCompactor extends 
AbstractTwoPhaseCompactor<MessageI
 
     @Override
     protected boolean compactMessage(String topic, Map<String, MessageId> 
latestForKey,
-        RawMessage m, MessageId id) {
+        RawMessage m, MessageMetadata metadata, MessageId id) {
         boolean deletedMessage = false;
         boolean replaceMessage = false;
-        Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+        Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
         if (keyAndSize != null) {
             if (keyAndSize.getRight() > 0) {
                 MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
@@ -84,7 +84,7 @@ public class PublishingOrderCompactor extends 
AbstractTwoPhaseCompactor<MessageI
             int numMessagesInBatch = metadata.getNumMessagesInBatch();
             int deleteCnt = 0;
             for (ImmutableTriple<MessageId, String, Integer> e : 
extractIdsAndKeysAndSizeFromBatch(
-                m)) {
+                m, metadata)) {
                 if (e != null) {
                     if (e.getMiddle() == null) {
                         if (!topicCompactionRetainNullKey) {
@@ -119,9 +119,9 @@ public class PublishingOrderCompactor extends 
AbstractTwoPhaseCompactor<MessageI
     }
 
     protected List<ImmutableTriple<MessageId, String, Integer>> 
extractIdsAndKeysAndSizeFromBatch(
-        RawMessage msg)
+        RawMessage msg, MessageMetadata metadata)
         throws IOException {
-        return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+        return RawBatchConverter.extractIdsAndKeysAndSize(msg, metadata);
     }
 
 }
\ No newline at end of file

Reply via email to