This is an automated email from the ASF dual-hosted git repository.
lordcheng10 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f96146f13b [improve][broker] Reducing the parse of MessageMetadata in
compaction (#23285)
4f96146f13b is described below
commit 4f96146f13b136644a4eb0cf4ec36699e0431929
Author: AloysZhang <[email protected]>
AuthorDate: Sat Sep 14 19:11:26 2024 +0800
[improve][broker] Reducing the parse of MessageMetadata in compaction
(#23285)
Co-authored-by: Aloys Zhang <[email protected]>
---
.../pulsar/client/impl/RawBatchConverter.java | 28 ++++++++++++++++++----
.../compaction/AbstractTwoPhaseCompactor.java | 14 +++++------
.../pulsar/compaction/EventTimeOrderCompactor.java | 24 +++++++++----------
.../compaction/PublishingOrderCompactor.java | 10 ++++----
4 files changed, 46 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index f41a7aedd59..d8c491dab29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -52,12 +52,16 @@ public class RawBatchConverter {
return metadata.hasNumMessagesInBatch() &&
metadata.getEncryptionKeysCount() == 0;
}
- public static List<MessageCompactionData>
extractMessageCompactionData(RawMessage msg)
+ public static List<MessageCompactionData>
extractMessageCompactionData(RawMessage msg, MessageMetadata metadata)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
ByteBuf payload = msg.getHeadersAndPayload();
- MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+ if (metadata == null) {
+ metadata = Commands.parseMessageMetadata(payload);
+ } else {
+ Commands.skipMessageMetadata(payload);
+ }
int batchSize = metadata.getNumMessagesInBatch();
CompressionType compressionType = metadata.getCompression();
@@ -91,7 +95,16 @@ public class RawBatchConverter {
RawMessage msg)
throws IOException {
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize =
new ArrayList<>();
- for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
+ for (MessageCompactionData mcd : extractMessageCompactionData(msg,
null)) {
+ idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(),
mcd.key(), mcd.payloadSize()));
+ }
+ return idsAndKeysAndSize;
+ }
+
+ public static List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSize(
+ RawMessage msg, MessageMetadata metadata) throws IOException {
+ List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize =
new ArrayList<>();
+ for (MessageCompactionData mcd : extractMessageCompactionData(msg,
metadata)) {
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(),
mcd.key(), mcd.payloadSize()));
}
return idsAndKeysAndSize;
@@ -99,7 +112,7 @@ public class RawBatchConverter {
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String,
MessageId> filter) throws IOException {
- return rebatchMessage(msg, filter, true);
+ return rebatchMessage(msg, null, filter, true);
}
/**
@@ -109,6 +122,7 @@ public class RawBatchConverter {
* NOTE: this message does not alter the reference count of the
RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+ MessageMetadata metadata,
BiPredicate<String,
MessageId> filter,
boolean retainNullKey)
throws IOException {
@@ -123,7 +137,11 @@ public class RawBatchConverter {
payload.readerIndex(readerIndex);
brokerMeta = payload.readSlice(brokerEntryMetadataSize +
Short.BYTES + Integer.BYTES);
}
- MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+ if (metadata == null) {
+ metadata = Commands.parseMessageMetadata(payload);
+ } else {
+ Commands.skipMessageMetadata(payload);
+ }
ByteBuf batchBuffer =
PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity());
CompressionType compressionType = metadata.getCompression();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
index 5b03f270251..ddfe8825a88 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java
@@ -77,7 +77,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
protected abstract Map<String, MessageId>
toLatestMessageIdForKey(Map<String, T> latestForKey);
protected abstract boolean compactMessage(String topic, Map<String, T>
latestForKey,
- RawMessage m, MessageId id);
+ RawMessage m, MessageMetadata metadata, MessageId id);
protected abstract boolean compactBatchMessage(String topic, Map<String, T>
latestForKey,
@@ -147,7 +147,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
} else if (RawBatchConverter.isReadableBatch(metadata)) {
deletedMessage = compactBatchMessage(reader.getTopic(),
latestForKey, m, metadata, id);
} else {
- deletedMessage = compactMessage(reader.getTopic(), latestForKey, m,
id);
+ deletedMessage = compactMessage(reader.getTopic(), latestForKey, m,
metadata, id);
}
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
@@ -239,7 +239,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
- m, (key, subid) -> subid.equals(latestForKey.get(key)),
+ m, metadata, (key, subid) ->
subid.equals(latestForKey.get(key)),
topicCompactionRetainNullKey);
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be
included in output",
@@ -247,7 +247,7 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
messageToAdd = Optional.of(m);
}
} else {
- Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
MessageId msg;
if (keyAndSize == null) {
messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) :
Optional.empty();
@@ -392,9 +392,8 @@ public abstract class AbstractTwoPhaseCompactor<T> extends
Compactor {
return bkf;
}
- protected Pair<String, Integer> extractKeyAndSize(RawMessage m) {
+ protected Pair<String, Integer> extractKeyAndSize(RawMessage m,
MessageMetadata msgMetadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
if (msgMetadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
if (msgMetadata.hasUncompressedSize()) {
@@ -408,13 +407,14 @@ public abstract class AbstractTwoPhaseCompactor<T>
extends Compactor {
protected Optional<RawMessage> rebatchMessage(String topic, RawMessage msg,
+ MessageMetadata metadata,
BiPredicate<String, MessageId> filter,
boolean retainNullKey)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("Rebatching message {} for topic {}", msg.getMessageId(),
topic);
}
- return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey);
+ return RawBatchConverter.rebatchMessage(msg, metadata, filter,
retainNullKey);
}
protected static class PhaseOneResult<T> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
index 2cd19ba15d6..db129b54533 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java
@@ -34,7 +34,6 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +60,10 @@ public class EventTimeOrderCompactor extends
AbstractTwoPhaseCompactor<Pair<Mess
@Override
protected boolean compactMessage(String topic, Map<String, Pair<MessageId,
Long>> latestForKey,
- RawMessage m, MessageId id) {
+ RawMessage m, MessageMetadata metadata, MessageId id) {
boolean deletedMessage = false;
boolean replaceMessage = false;
- MessageCompactionData mcd = extractMessageCompactionData(m);
+ MessageCompactionData mcd = extractMessageCompactionData(m, metadata);
if (mcd != null) {
boolean newer = Optional.ofNullable(latestForKey.get(mcd.key()))
@@ -100,7 +99,7 @@ public class EventTimeOrderCompactor extends
AbstractTwoPhaseCompactor<Pair<Mess
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
- for (MessageCompactionData mcd :
extractMessageCompactionDataFromBatch(m)) {
+ for (MessageCompactionData mcd :
extractMessageCompactionDataFromBatch(m, metadata)) {
if (mcd.key() == null) {
if (!topicCompactionRetainNullKey) {
// record delete null-key message event
@@ -139,23 +138,22 @@ public class EventTimeOrderCompactor extends
AbstractTwoPhaseCompactor<Pair<Mess
return deletedMessage;
}
- protected MessageCompactionData extractMessageCompactionData(RawMessage m) {
+ protected MessageCompactionData extractMessageCompactionData(RawMessage m,
MessageMetadata metadata) {
ByteBuf headersAndPayload = m.getHeadersAndPayload();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- if (msgMetadata.hasPartitionKey()) {
+ if (metadata.hasPartitionKey()) {
int size = headersAndPayload.readableBytes();
- if (msgMetadata.hasUncompressedSize()) {
- size = msgMetadata.getUncompressedSize();
+ if (metadata.hasUncompressedSize()) {
+ size = metadata.getUncompressedSize();
}
- return new MessageCompactionData(m.getMessageId(),
msgMetadata.getPartitionKey(),
- size, msgMetadata.getEventTime());
+ return new MessageCompactionData(m.getMessageId(),
metadata.getPartitionKey(),
+ size, metadata.getEventTime());
} else {
return null;
}
}
- private List<MessageCompactionData>
extractMessageCompactionDataFromBatch(RawMessage msg)
+ private List<MessageCompactionData>
extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata)
throws IOException {
- return RawBatchConverter.extractMessageCompactionData(msg);
+ return RawBatchConverter.extractMessageCompactionData(msg, metadata);
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
index a825c0782fb..223e8c421a5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java
@@ -53,10 +53,10 @@ public class PublishingOrderCompactor extends
AbstractTwoPhaseCompactor<MessageI
@Override
protected boolean compactMessage(String topic, Map<String, MessageId>
latestForKey,
- RawMessage m, MessageId id) {
+ RawMessage m, MessageMetadata metadata, MessageId id) {
boolean deletedMessage = false;
boolean replaceMessage = false;
- Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
+ Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
@@ -84,7 +84,7 @@ public class PublishingOrderCompactor extends
AbstractTwoPhaseCompactor<MessageI
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e :
extractIdsAndKeysAndSizeFromBatch(
- m)) {
+ m, metadata)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRetainNullKey) {
@@ -119,9 +119,9 @@ public class PublishingOrderCompactor extends
AbstractTwoPhaseCompactor<MessageI
}
protected List<ImmutableTriple<MessageId, String, Integer>>
extractIdsAndKeysAndSizeFromBatch(
- RawMessage msg)
+ RawMessage msg, MessageMetadata metadata)
throws IOException {
- return RawBatchConverter.extractIdsAndKeysAndSize(msg);
+ return RawBatchConverter.extractIdsAndKeysAndSize(msg, metadata);
}
}
\ No newline at end of file