poorbarcode commented on code in PR #21027:
URL: https://github.com/apache/pulsar/pull/21027#discussion_r1303733160


##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar),
 Pulsar introduced chunk messages to handle the large message. It will separate 
a large message into some chunks when the producer sends the significant 
message to the broker. On the consumer side, a consumer will wait to receive 
all the chunks of a message and then assemble them into a single chunk message 
before returning it.
+In [PIP 
6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication),
 Pulsar introduced deduplication to make sure the messages sent by the producer 
are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase 
for each message. The message with a lower sequence ID will be dropped in the 
broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same 
sequence ID which causes the chunk message can not work when enabling 
deduplication. For example, we have a  chunk message consisting of chunk-1 and 
chunk-2. When Broker receives chunk-1, it will update the last sequence ID to 
the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 
will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this 
case. It allowed the chunks of a single chunk message to use the same sequence 
ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication 
still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that 
chunk messages work normally after enabling deduplication and the topic has no 
duplicate chunks.
+
+## Out of Scope
+
+
+
+# High Level Design
+Introduce a mechanism similar to [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar)
 to check the chunk ID.
+For normal messages, we still only check sequence ID, but we will check both 
sequence ID and chunk ID for chunk messages.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+Add `chunkIDPushed` and `chunkIDPersisted` to store the chunk of each 
producer`s ongoing chunk messages. It will be used to check whether the chunks 
in a single message are duplicated.
+
+```
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPushed =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPersisted =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
+```
+
+Optimize the `properties` of the `MarkDeleteEntry` from `Map<String, Long>` to 
`Map<String, String>`. In the depublication design, the ' MarkDeleteEntry' 
properties are used as a snapshot to store the sequence ID map. After 
introducing the chunk ID map, it cannot hold two long for each producer. So we 
hope to change the `MarkDeleteEntry' properties from `Map<String, Long>` to 
`Map<String, String>` to make it more flexible.
+
+
+
+## Public-facing Changes
+None
+
+### Public API
+None
+
+### Binary protocol
+Add `repeated StringProperty markDeleteProperties = 9;` to replace the 
original `repeated LongProperty properties = 5;`.
+
+### Configuration
+
+### CLI
+
+### Metrics
+
+
+# Monitoring
+
+
+# Security Considerations
+
+
+# Backward & Forward Compatibility
+
+## Revert
+When reverting to the old version of Pulsar, the `ManagedCursorInfo` will not 
contain the properties(`repeated LongProperty properties = 5;`). Because the 
new version of pulsar use markDeleteProperties (`repeated StringProperty 
markDeleteProperties = 9;`) to record mark delete properties.
+So It can only be reverted if losing many last persistent sequence ID data.
+
+## Upgrade
+
+Add an upgrade logic in `recover(final VoidCallback callback)`.
+The original logic:
+```java
+    Map<String, Long> recoveredProperties = Collections.emptyMap();
+    if (info.getPropertiesCount() > 0) {
+        // Recover properties map
+        recoveredProperties = new HashMap<>();
+        for (int i = 0; i < info.getPropertiesCount(); i++) {
+            LongProperty property = info.getProperties(i);
+            recoveredProperties.put(property.getName(), property.getValue());
+        }
+    }
+
+    recoveredCursor(recoveredPosition, recoveredProperties, 
recoveredCursorProperties, null);
+```
+Change to:
+```java
+    // Recover properties map
+    Map<String, String> recoveredProperties;
+    if (info.getPropertiesCount() == 0 && info.getmarkDeletePropertiesCount() 
== 0) {

Review Comment:
   The `info` means `ManagedCursorInfo`, right?



##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar),
 Pulsar introduced chunk messages to handle the large message. It will separate 
a large message into some chunks when the producer sends the significant 
message to the broker. On the consumer side, a consumer will wait to receive 
all the chunks of a message and then assemble them into a single chunk message 
before returning it.
+In [PIP 
6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication),
 Pulsar introduced deduplication to make sure the messages sent by the producer 
are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase 
for each message. The message with a lower sequence ID will be dropped in the 
broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same 
sequence ID which causes the chunk message can not work when enabling 
deduplication. For example, we have a  chunk message consisting of chunk-1 and 
chunk-2. When Broker receives chunk-1, it will update the last sequence ID to 
the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 
will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this 
case. It allowed the chunks of a single chunk message to use the same sequence 
ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication 
still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that 
chunk messages work normally after enabling deduplication and the topic has no 
duplicate chunks.
+
+## Out of Scope
+
+
+
+# High Level Design
+Introduce a mechanism similar to [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar)
 to check the chunk ID.
+For normal messages, we still only check sequence ID, but we will check both 
sequence ID and chunk ID for chunk messages.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+Add `chunkIDPushed` and `chunkIDPersisted` to store the chunk of each 
producer`s ongoing chunk messages. It will be used to check whether the chunks 
in a single message are duplicated.
+
+```
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPushed =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+
+    @VisibleForTesting
+    final ConcurrentOpenHashMap<String, Integer> chunkIDPersisted =
+            ConcurrentOpenHashMap.<String, Integer>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
+```
+
+Optimize the `properties` of the `MarkDeleteEntry` from `Map<String, Long>` to 
`Map<String, String>`. In the depublication design, the ' MarkDeleteEntry' 
properties are used as a snapshot to store the sequence ID map. After 
introducing the chunk ID map, it cannot hold two long for each producer. So we 
hope to change the `MarkDeleteEntry' properties from `Map<String, Long>` to 
`Map<String, String>` to make it more flexible.

Review Comment:
   Could you add a demo to describe the structure of the attribute `properties` 
of cursor metadata that you wanted after this PIP?



##########
pip/pip-295.md:
##########
@@ -0,0 +1,128 @@
+
+# Background knowledge
+
+In [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar),
 Pulsar introduced chunk messages to handle the large message. It will separate 
a large message into some chunks when the producer sends the significant 
message to the broker. On the consumer side, a consumer will wait to receive 
all the chunks of a message and then assemble them into a single chunk message 
before returning it.
+In [PIP 
6](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication),
 Pulsar introduced deduplication to make sure the messages sent by the producer 
are non-repeating.
+In PIP 6, each producer will have a sequence ID that starts at 0 and increase 
for each message. The message with a lower sequence ID will be dropped in the 
broker.
+
+# Motivation
+
+In the earliest design, all the chunks in a single chunk message have the same 
sequence ID which causes the chunk message can not work when enabling 
deduplication. For example, we have a  chunk message consisting of chunk-1 and 
chunk-2. When Broker receives chunk-1, it will update the last sequence ID to 
the sequence ID of chunk-1. And then, when the broker gets chunk-2, the chunk-2 
will be dropped by depublication.
+I opened a [PR](https://github.com/apache/pulsar/pull/20948) to resolve this 
case. It allowed the chunks of a single chunk message to use the same sequence 
ID and filter duplicated chunks in a single-chunk message on the consumer side.
+It can resolve message duplication end to end, but the message duplication 
still exists in the topic.
+
+# Goals
+
+## In Scope
+Chunk messages can be effectively filtered on the broker side. Ensure that 
chunk messages work normally after enabling deduplication and the topic has no 
duplicate chunks.

Review Comment:
   **Background:***: There are two properties in the metadata of the cursor
   - `properties<String, Long>`: used to maintain the last sequence of 
producer-sent messages<sup>[1]</sup>.
     - PIP: 
https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication
     - PR: https://github.com/apache/pulsar/pull/744
   -  `cursorProperties<String, String>`: used to maintain the subscription 
properties. 
     - PIP: https://github.com/apache/pulsar/issues/12269 
     - PR: https://github.com/apache/pulsar/issues/15750
   
   **[1]**: a structure of `properties`:
   ```yaml
   properties: 
     - "producer_name_1" : {{last_persist_sequence_1}}
     - "producer_name_2" : {{last_persist_sequence_2}}
   ```
   
   ----
   
   In this PIP, you want to change `properties<String, Long>` to 
`properties<String, Long>`, right? Could you also explain this change here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to