void-ptr974 opened a new pull request, #25860:
URL: https://github.com/apache/pulsar/pull/25860

   
   Fixes #xyz
   
   ### Motivation
   
   This fixes a correctness issue in persistent geo-replication V2 
deduplication. The issue is in the core replication data path and can produce 
duplicate messages on the target cluster during normal recovery/failover paths.
   
   Geo-replication V2 deduplication uses the source topic position as the 
target-side dedup watermark. For a replicated message, the source replicator 
adds `__MSG_PROP_REPL_SOURCE_POSITION=<source-ledger-id>:<source-entry-id>`, 
and the target broker stores the latest replicated source position as:
   
   - `<replicator-producer>_LID`
   - `<replicator-producer>_EID`
   
   This state is not normal producer sequence state. It is the target-side 
checkpoint used to identify whether a replayed source entry has already been 
persisted.
   
   There are three related problems in the current handling of this watermark:
   
   1. **The geo V2 watermark is not recovered from replayed target entries**
   
      When deduplication is enabled or a target topic is loaded, the broker 
restores dedup state from the `pulsar.dedup` cursor snapshot and then replays 
entries after that snapshot.
   
      The existing replay path restores normal producer sequence ids from 
message metadata, but it does not rebuild geo V2 `_LID/_EID` state from 
replicated messages. If the target topic is unloaded after replicated messages 
are persisted but before the latest dedup snapshot includes their source 
positions, the in-memory geo watermark is lost.
   
      If the source replicator later reconnects and replays from its 
replication cursor, the target broker can fail to identify those source entries 
as duplicates and persist them again.
   
   2. **The geo V2 watermark is stored as two separate snapshot keys**
   
      A geo V2 watermark is only valid when `_LID` and `_EID` are restored 
together. Saving only one side does not represent a usable source position.
   
      The snapshot logic previously treated all dedup entries as independent 
producer states. This makes the geo watermark vulnerable to partial persistence 
or being crowded out by normal producer sequence state. After reload, an 
incomplete `_LID/_EID` pair cannot safely deduplicate source replay.
   
   3. **The geo V2 watermark can be removed by normal producer inactivity 
cleanup**
   
      Normal producer dedup state can be purged after producer inactivity. Geo 
V2 `_LID/_EID` state has a different lifecycle: it is a source-position 
watermark for replication, not the lifecycle state of the replicator producer.
   
      The source replicator may disconnect, reconnect, or fail over and replay 
the same source entries from its replication cursor. If the target has purged 
the geo watermark because the replicator producer was inactive, those replayed 
source entries can be accepted and written again.
   
   A concrete failure window is:
   
   1. Source replicator sends messages to the target.
   2. Target persists the messages and updates the in-memory geo V2 dedup 
watermark.
   3. The target topic is unloaded before the latest dedup snapshot includes 
the watermark.
   4. The source replicator has not durably advanced its replication cursor yet.
   5. Source reconnects and replays the same source entries.
   6. Target reloads without the geo watermark and can write duplicates.
   
   This does not require client misuse or corrupted input. Source cursor replay 
is an expected recovery behavior, so target-side geo dedup state must survive 
topic reload and producer inactivity cleanup.
   
   ### Modifications
   
   - Recover geo-replication V2 dedup watermarks while replaying the dedup 
cursor by reading `replicatedFrom` and `__MSG_PROP_REPL_SOURCE_POSITION` from 
persisted replicated messages.
   - Store geo-replication watermark keys as complete `_LID/_EID` pairs in 
dedup snapshots.
   - Keep geo-replication watermark state during producer inactivity cleanup, 
since it represents source position rather than producer lifecycle state.
   - Add a regression test that unloads the target topic before the dedup 
snapshot and forces source replicator replay.
   - Add a unit test for geo-replication watermark cleanup behavior.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   ./gradlew :pulsar-broker:test --no-configuration-cache \
     --tests 
org.apache.pulsar.broker.service.OneWayReplicatorDeduplicationTest.testGeoReplDedupAfterTargetUnloadBeforeSnapshot
 \
     --tests 
org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testInactiveGeoReplicationProducerKeepsDedupState
   
   Result: `BUILD SUCCESSFUL`.
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to