codelipenghui commented on code in PR #24439:
URL: https://github.com/apache/pulsar/pull/24439#discussion_r2167902325
##########
pip/pip-429.md:
##########
@@ -0,0 +1,165 @@
+# PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload
Buffer Parsing
+
+# Background knowledge
+
+The typical reader's work flow looks like:
+
+```java
+while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNext();
+ handleMessage(msg);
+}
+```
+
+`hasMessageAvailable` could perform the `GetLastMessageId` RPC to get the last
message ID from broker. However, when the reader is a compacted reader, i.e.
`readCompacted(true)` is configured when creating the reader, the server side
could compute the message ID from the last entry in the compaction service.
+
+Generally, with the built-in compaction service, when the entry represents a
batch of messages, the compacted entry buffer consists of:
+
+1. Serialized `MessageMetadata`
+2. Serialized payload buffer, which can be compressed or encrypted. The
uncompressed payload buffer consists of a list of `SingleMessageMetadata` and
value buffers.
+
+Take a typical example, when a producer that configures `LZ4` as the
compression type sends the following messages in a batch:
+
+```java
+producer.newMessage().key("k0").value("v0").sendAsync();
+producer.newMessage().key("k0").value("v1").sendAsync();
+producer.newMessage().key("k1").value("v0").sendAsync();
+producer.newMessage().key("k1").value(null).sendAsync();
+```
+
+After the compaction, the compacted entry buffer could be represented as
follows:
+
+```yaml
+metadata: # MessageMetadata
+ num_messages_in_batch: 4
+ compression: LZ4
+payload:
+ - singleMetadata: # SingleMessageMetadata
+ key: k0
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k0
+ compactedOut: false
+ value: v1
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ nullValue: true
+ value: ""
+```
+
+- For a given key, only the latest value will be retained, so `k0 => v0` will
be compacted out.
+- A null value means the key will be removed, so `k1 => v0` and `k1 => null`
will be compacted out.
+
+Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the
`hasMessageAvailable` and `readNext` loop might encounter issues because the
`GetLastMessageId` RPC returns `{ledger, entry, batchIndex=3}` as the last
message ID, which represents `k1 => null`.
+
+The issue occurs because the batch index of the last message ID is calculated
as `num_messages_in_batch - 1` without considering certain edge cases.
[#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by
uncompressing the compacted entry buffer on the broker side and filtering out
messages where the individual metadata has `compactedOut` set to `true`. This
ensures that only valid messages are considered when determining the last
message ID.
+
+The `compacted_out` field was first introduced in the early stages of
development through [#1361](https://github.com/apache/pulsar/pull/1361).
However, as part of the overall payload buffer, parsing a
`SingleMessageMetadata` currently requires decompressing the compacted entry
buffer. This process can be resource-intensive, particularly when handling
large topics or encrypted messages, leading to potential performance
bottlenecks.
+
+# Motivation
+
+Decompressing the payload buffer solely to check whether individual messages
have the `compacted_out` field set is both inefficient and restrictive, as it
imposes constraints on the payload buffer format. Furthermore, when using a
custom topic compaction service, the entry buffer in the compacted ledger may
not include a `SingleMessageMetadata` for every single message, adding further
complexity to the process.
+
+This challenge is exacerbated when messages are encrypted, as decompression is
not possible without the public key required for decryption. This limitation
also impacts the current compaction service, as encrypted messages cannot be
compacted. Consequently, operations such as the `GetLastMessageId` RPC will
fail, resulting in an error similar to the following:
+
+```
+org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException:
The subscription sub of the topic <topic> gets the last message id was failed
+{"errorMsg":"Failed to read last entry of the compacted Ledger Invalid unknown
tag type: 3","reqId":3317275583068061944, "remote":"localhost/127.0.0.1:50818",
"local":"/127.0.0.1:50823"}
+```
+
+Instead, the expected behavior is to return the last message ID (e.g., `k1 =>
null` in the previous example).
+
+Another issue arises from the assumption made by the `GetLastMessageId` RPC
that the compacted entry's payload buffer must always contain a
`SingleMessageMetadata` list. However, this is not always the case. For
instance, a custom topic compaction service might write a payload buffer that
omits the `SingleMessageMetadata`. In such cases, the `compactedOut`
information could instead be stored in the properties of the `MessageMetadata`,
but the `GetLastMessageId` RPC will always fail.
+
+The custom topic compaction service has the flexibility to serialize and
deserialize the payload buffer in a different format. However, it still depends
on the `GetLastMessageId` RPC in the `hasMessageAvailableAsync` and
`getLastMessageIdAsync` methods of the `RawReader` to compute the last message
ID. This reliance creates a compatibility issue, as the `GetLastMessageId` RPC
will fail when working with a payload buffer in a non-standard format, breaking
the functionality of these methods.
Review Comment:
I'm thinking, how about introducing a new method `getLastMessageID()` to the
TopicCompactionService for this case?
The MessageMetadata change is still reasonable for pulsar format encrypted
data.
##########
pip/pip-429.md:
##########
@@ -0,0 +1,165 @@
+# PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload
Buffer Parsing
+
+# Background knowledge
+
+The typical reader's work flow looks like:
+
+```java
+while (reader.hasMessageAvailable()) {
+ final var msg = reader.readNext();
+ handleMessage(msg);
+}
+```
+
+`hasMessageAvailable` could perform the `GetLastMessageId` RPC to get the last
message ID from broker. However, when the reader is a compacted reader, i.e.
`readCompacted(true)` is configured when creating the reader, the server side
could compute the message ID from the last entry in the compaction service.
+
+Generally, with the built-in compaction service, when the entry represents a
batch of messages, the compacted entry buffer consists of:
+
+1. Serialized `MessageMetadata`
+2. Serialized payload buffer, which can be compressed or encrypted. The
uncompressed payload buffer consists of a list of `SingleMessageMetadata` and
value buffers.
+
+Take a typical example, when a producer that configures `LZ4` as the
compression type sends the following messages in a batch:
+
+```java
+producer.newMessage().key("k0").value("v0").sendAsync();
+producer.newMessage().key("k0").value("v1").sendAsync();
+producer.newMessage().key("k1").value("v0").sendAsync();
+producer.newMessage().key("k1").value(null).sendAsync();
+```
+
+After the compaction, the compacted entry buffer could be represented as
follows:
+
+```yaml
+metadata: # MessageMetadata
+ num_messages_in_batch: 4
+ compression: LZ4
+payload:
+ - singleMetadata: # SingleMessageMetadata
+ key: k0
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k0
+ compactedOut: false
+ value: v1
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ value: ""
+ - singleMetadata:
+ key: k1
+ compactedOut: true
+ nullValue: true
+ value: ""
+```
+
+- For a given key, only the latest value will be retained, so `k0 => v0` will
be compacted out.
+- A null value means the key will be removed, so `k1 => v0` and `k1 => null`
will be compacted out.
+
+Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the
`hasMessageAvailable` and `readNext` loop might encounter issues because the
`GetLastMessageId` RPC returns `{ledger, entry, batchIndex=3}` as the last
message ID, which represents `k1 => null`.
+
+The issue occurs because the batch index of the last message ID is calculated
as `num_messages_in_batch - 1` without considering certain edge cases.
[#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by
uncompressing the compacted entry buffer on the broker side and filtering out
messages where the individual metadata has `compactedOut` set to `true`. This
ensures that only valid messages are considered when determining the last
message ID.
+
+The `compacted_out` field was first introduced in the early stages of
development through [#1361](https://github.com/apache/pulsar/pull/1361).
However, as part of the overall payload buffer, parsing a
`SingleMessageMetadata` currently requires decompressing the compacted entry
buffer. This process can be resource-intensive, particularly when handling
large topics or encrypted messages, leading to potential performance
bottlenecks.
+
+# Motivation
+
+Decompressing the payload buffer solely to check whether individual messages
have the `compacted_out` field set is both inefficient and restrictive, as it
imposes constraints on the payload buffer format. Furthermore, when using a
custom topic compaction service, the entry buffer in the compacted ledger may
not include a `SingleMessageMetadata` for every single message, adding further
complexity to the process.
+
+This challenge is exacerbated when messages are encrypted, as decompression is
not possible without the public key required for decryption. This limitation
also impacts the current compaction service, as encrypted messages cannot be
compacted. Consequently, operations such as the `GetLastMessageId` RPC will
fail, resulting in an error similar to the following:
Review Comment:
```suggestion
This challenge is exacerbated when messages are encrypted, as decryption is
not possible without the public key required for decryption. This limitation
also impacts the current compaction service, as encrypted messages cannot be
compacted. Consequently, operations such as the `GetLastMessageId` RPC will
fail, resulting in an error similar to the following:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]