massakam opened a new pull request, #20931:
URL: https://github.com/apache/pulsar/pull/20931

   ### Motivation
   
   Occasionally there is an ack hole in the cursor for geo-replication. The 
following is the internal stats for the topic where the problem occurred:
   ```json
   {
     "entriesAddedCounter" : 11000,
     "numberOfEntries" : 6999,
     "totalSize" : 362285,
     "currentLedgerEntries" : 6999,
     "currentLedgerSize" : 362285,
     "lastLedgerCreatedTimestamp" : "2023-07-30T16:17:10.137+09:00",
     "waitingCursorsCount" : 1,
     "pendingAddEntriesCount" : 0,
     "lastConfirmedEntry" : "1687807:6998",
     "state" : "LedgerOpened",
     "ledgers" : [ {
       "ledgerId" : 1687807,
       "entries" : 0,
       "size" : 0,
       "offloaded" : false,
       "underReplicated" : false
     } ],
     "cursors" : {
       "pulsar.repl.cluster-a" : {
         "markDeletePosition" : "1687807:6002",
         "readPosition" : "1687807:6999",
         "waitingReadOp" : true,
         "pendingReadOps" : 0,
         "messagesConsumedCounter" : 10942,
         "cursorLedger" : 1687808,
         "cursorLedgerLastEntry" : 13,
         "individuallyDeletedMessages" : "[(1687807:6060..1687807:6998]]",
         "lastLedgerSwitchTimestamp" : "2023-07-30T16:17:10.143+09:00",
         "state" : "Open",
         "numberOfEntriesSinceFirstNotAckedMessage" : 997,
         "totalNonContiguousDeletedMessagesRange" : 1,
         "properties" : { }
       }
     },
     "schemaLedgers" : [ ],
     "compactedLedger" : {
       "ledgerId" : -1,
       "entries" : -1,
       "size" : -1,
       "offloaded" : false,
       "underReplicated" : false
     }
   }
   ```
   
   Also, the following log was printed on the broker server. The ack hole is 
included in the range where the cursor was rewound.
   ```
   16:50:11.787 [pulsar-io-24-1] INFO  o.a.b.mledger.impl.ManagedCursorImpl - 
[massakam/test/persistent/t1-pulsar.repl.cluster-a] Rewind from 1687807:6061 to 
1687807:5999
   ```
   
   This problem occurred in the following situations:
   - There are two replication clusters, cluster-a and cluster-b
   - cluster-a usually has no producers or consumers, but once a day a producer 
connects and publishes messages
   - Only consumers are connected to cluster-b
   - Retention time is 0
   
   In the above case, the producer for geo-replication on the cluster-a side 
will be closed after a certain period of time by GC.
   
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L2626
   
   However, at this time, an already triggered operation to read new entries 
will not be cancelled. This operation will remain pending until new entries are 
available.
   
   Then 24 hours later the user's producer connects again and publishes 
messages. This triggers the pending operation and causes the replicator to 
start reading new entries.
   
   However, since the producer for geo-replication has not yet been restarted, 
these read entries will be dropped without being acknowledged.
   
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L138-L149
   
   On the other hand, since the user's producer is connected, the producer for 
geo-replication is also restarted and the cursor is rewound. After that, the 
state of the replicator is changed to `Started`.
   
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L138-L145
   
   At this time, one of the operations triggered before the cursor is rewound 
succeeds, causing `readPosition` to move to "the position next to the 
successfully read entry". Entries before this position will not be read again. 
As a result, entries that have been read once but not acknowledged will be left 
as an ack hole.
   
   In short, a race condition between "cursor rewinding when the producer for 
geo-replication is restarted" and "an read operation that was triggered the 
last time geo-replication occurred" is what causes this issue.
   
   ### Modifications
   
   Add a flag named `waitForCursorRewinding` to the `PersistentReplicator` 
class. Normally this value is false. If this value becomes true, the replicator 
will no longer call `cursor.asyncReadEntriesOrWait`.
   
   On the other hand, set `waitForCursorRewinding` to true at the beginning of 
the `readEntries` method that is executed when restarting the producer for 
geo-replication. Then wait until at least one of the following conditions is 
met:
   - `state` is no longer `Starting`
     - This means the replicator has been stopped
   - `havePendingRead` becomes `FALSE`
     - This means there are no reads in progress
   - `cursor.cancelPendingReadRequest` returns true
     - This means that there was a read in progress, but it was successfully 
canceled
   
   Then change `state` to `Started`, rewind the cursor, and set 
`waitForCursorRewinding` back to false. This prevents a read triggered before 
the cursor has been rewound from advancing the cursor again, leaving an ack 
hole.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to