RobertIndie commented on code in PR #24439:
URL: https://github.com/apache/pulsar/pull/24439#discussion_r2162849534


##########
pip/pip-429.md:
##########
@@ -0,0 +1,161 @@
+# PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload 
Buffer Parsing
+
+# Background knowledge
+
+The typical reader's work flow looks like:
+
+```java
+while (reader.hasMessageAvailable()) {
+    final var msg = reader.readNext();
+    handleMessage(msg);
+}
+```
+
+`hasMessageAvailable` could perform the `GetLastMessageId` RPC to get the last 
message ID from broker. However, when the reader is a compacted reader, i.e. 
`readCompacted(true)` is configured when creating the reader, the server side 
could compute the message ID from the last entry in the compaction service.
+
+Generally, with the built-in compaction service, when the entry represents a 
batch of messages, the compacted entry buffer consists of:
+
+1. Serialized `MessageMetadata`
+2. Serialized payload buffer, which can be compressed or encrypted. The 
uncompressed payload buffer consists of a list of `SingleMessageMetadata` and 
value buffers.
+
+Take a typical example, when a producer that configures `LZ4` as the 
compression type sends the following messages in a batch:
+
+```java
+producer.newMessage().key("k0").value("v0").sendAsync();
+producer.newMessage().key("k0").value("v1").sendAsync();
+producer.newMessage().key("k1").value("v0").sendAsync();
+producer.newMessage().key("k1").value(null).sendAsync();
+```
+
+After the compaction, the compacted entry buffer could be represented as 
follows:
+
+```yaml
+metadata: # MessageMetadata
+  num_messages_in_batch: 4
+  compression: LZ4
+payload:
+  - singleMetadata: # SingleMessageMetadata
+      key: k0
+      compactedOut: true
+    value: ""
+  - singleMetadata:
+      key: k0
+      compactedOut: false
+    value: v1
+  - singleMetadata:
+      key: k1
+      compactedOut: true
+    value: ""
+  - singleMetadata:
+      key: k1
+      compactedOut: true
+      nullValue: true
+    value: ""
+```
+
+- For a given key, only the latest value will be retained, so `k0 => v0` will 
be compacted out.
+- A null value means the key will be removed, so `k1 => v0` and `k1 => null` 
will be compacted out.
+
+Prior to [#18877](https://github.com/apache/pulsar/pull/18877), the 
`hasMessageAvailable` and `readNext` loop might encounter issues because the 
`GetLastMessageId` RPC returns `{ledger, entry, batchIndex=3}` as the last 
message ID, which represents `k1 => null`.
+
+The issue occurs because the batch index of the last message ID is calculated 
as `num_messages_in_batch - 1` without considering certain edge cases. 
[#18877](https://github.com/apache/pulsar/pull/18877) resolves this problem by 
uncompressing the compacted entry buffer on the broker side and filtering out 
messages where the individual metadata has `compactedOut` set to `true`. This 
ensures that only valid messages are considered when determining the last 
message ID.
+
+The `compacted_out` field was first introduced in the early stages of 
development through [#1361](https://github.com/apache/pulsar/pull/1361). 
However, as part of the overall payload buffer, parsing a 
`SingleMessageMetadata` currently requires decompressing the compacted entry 
buffer. This process can be resource-intensive, particularly when handling 
large topics or encrypted messages, leading to potential performance 
bottlenecks.
+
+# Motivation
+
+Decompressing the payload buffer solely to check whether individual messages 
have the `compacted_out` field set is both inefficient and restrictive, as it 
imposes constraints on the payload buffer format. Furthermore, when using a 
custom topic compaction service, the entry buffer in the compacted ledger may 
not include a `SingleMessageMetadata` for every single message, adding further 
complexity to the process.
+
+This challenge is exacerbated when messages are encrypted, as decompression is 
not possible without the public key required for decryption. This limitation 
also impacts the current compaction service, as encrypted messages cannot be 
compacted. Consequently, operations such as the `GetLastMessageId` RPC will 
fail, resulting in an error similar to the following:
+
+```
+org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: 
The subscription sub of the topic <topic> gets the last message id was failed
+{"errorMsg":"Failed to read last entry of the compacted Ledger Invalid unknown 
tag type: 3","reqId":3317275583068061944, "remote":"localhost/127.0.0.1:50818", 
"local":"/127.0.0.1:50823"}
+```
+
+Instead, the expected behavior is to return the last message ID (e.g., `k1 => 
null` in the previous example).
+
+Another issue arises from the assumption made by the `GetLastMessageId` RPC 
that the compacted entry's payload buffer must always contain a 
`SingleMessageMetadata` list. However, this is not always the case. For 
instance, a custom topic compaction service might write a payload buffer that 
omits the `SingleMessageMetadata`. In such cases, the `compactedOut` 
information could instead be stored in the properties of the `MessageMetadata`, 
but the `GetLastMessageId` RPC will always fail.
+
+The custom topic compaction service has the flexibility to serialize and 
deserialize the payload buffer in a different format. However, it still depends 
on the `GetLastMessageId` RPC in the `hasMessageAvailableAsync` and 
`getLastMessageIdAsync` methods of the `RawReader` to compute the last message 
ID. This reliance creates a compatibility issue, as the `GetLastMessageId` RPC 
will fail when working with a payload buffer in a non-standard format, breaking 
the functionality of these methods.
+
+# Goals
+
+## In Scope
+
+Instead of relying on the `compacted_out` field in `SingleMessageMetadata`, 
this PIP proposes to use `MessageMetadata` to determine the last message ID in 
the compacted last entry. Since `compacted_out` is no longer used, the payload 
buffer's format could be improved as well.
+
+## Out of Scope
+
+# High Level Design
+
+To enhance efficiency and simplify the handling of compacted entries, a new 
field will be added to `MessageMetadata` to record the batch indexes of all 
retained single messages. This change allows the server to determine the last 
message ID directly from the new field in `MessageMetadata` when processing a 
`GetLastMessageId` request, rather than relying on the `compacted_out` field in 
`SingleMessageMetadata`.
+
+With this update, the `compacted_out` field will no longer be used, and only 
the retained messages will be included in the payload buffer. For example, the 
previous representation:
+
+```yaml
+metadata: # MessageMetadata
+  num_messages_in_batch: 4
+  compression: LZ4
+  compacted_batch_indexes: [1] # Retained messages' batch indexes
+```
+
+The improvements are:
+1. **Reduced Payload Size**: 
+   - In the new format, only the retained messages are included in the payload 
buffer. 
+   - For the example above, the new format contains just 1 pair of 
`SingleMessageMetadata` and its corresponding value buffer, compared to 4 pairs 
in the original format.
+
+2. **Efficient Batch Index Retrieval**:
+   - The actual batch index of retained messages can now be retrieved from the 
`compacted_batch_indexes` field in `MessageMetadata`. 
+   - In this example, the retained message's batch index is 
`compacted_batch_indexes[0] = 1`.
+
+## Public-facing Changes
+
+### Binary protocol
+
+A new field will be added to `MessageMetadata`:
+
+```protobuf
+    // Indicates the indexes of messages retained in the batch after 
compaction. When a batch is compacted, 
+    // some messages may be removed (compacted out). For example, if the 
original batch contains:
+    // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will 
retain only `k0 => v0` and `k2 => v2`.
+    // In this case, this field will be set to `[0, 2]`, and the payload 
buffer will only include the retained messages.
+    //
+    // Note: Batches compacted by older versions of the compaction service do 
not include this field. For such batches, 
+    // the `compacted_out` field in `SingleMessageMetadata` must be checked to 
identify and filter out compacted messages 
+    // (e.g., `k1 => v1` and `k1 => null` in the example above).
+    repeated int32 compacted_batch_indexes = 31;
+```
+
+# Backward & Forward Compatibility
+

Review Comment:
   This section is empty. I think we could move the section line 124-126 here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to