massakam opened a new pull request, #20931:
URL: https://github.com/apache/pulsar/pull/20931
### Motivation
Occasionally there is an ack hole in the cursor for geo-replication. The
following is the internal stats for the topic where the problem occurred:
```json
{
"entriesAddedCounter" : 11000,
"numberOfEntries" : 6999,
"totalSize" : 362285,
"currentLedgerEntries" : 6999,
"currentLedgerSize" : 362285,
"lastLedgerCreatedTimestamp" : "2023-07-30T16:17:10.137+09:00",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "1687807:6998",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 1687807,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"pulsar.repl.cluster-a" : {
"markDeletePosition" : "1687807:6002",
"readPosition" : "1687807:6999",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 10942,
"cursorLedger" : 1687808,
"cursorLedgerLastEntry" : 13,
"individuallyDeletedMessages" : "[(1687807:6060..1687807:6998]]",
"lastLedgerSwitchTimestamp" : "2023-07-30T16:17:10.143+09:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 997,
"totalNonContiguousDeletedMessagesRange" : 1,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
```
Also, the following log was printed on the broker server. The ack hole is
included in the range where the cursor was rewound.
```
16:50:11.787 [pulsar-io-24-1] INFO o.a.b.mledger.impl.ManagedCursorImpl -
[massakam/test/persistent/t1-pulsar.repl.cluster-a] Rewind from 1687807:6061 to
1687807:5999
```
This problem occurred in the following situations:
- There are two replication clusters, cluster-a and cluster-b
- cluster-a usually has no producers or consumers, but once a day a producer
connects and publishes messages
- Only consumers are connected to cluster-b
- Retention time is 0
In the above case, the producer for geo-replication on the cluster-a side
will be closed after a certain period of time by GC.
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L2626
However, at this time, an already triggered operation to read new entries
will not be cancelled. This operation will remain pending until new entries are
available.
Then 24 hours later the user's producer connects again and publishes
messages. This triggers the pending operation and causes the replicator to
start reading new entries.
However, since the producer for geo-replication has not yet been restarted,
these read entries will be dropped without being acknowledged.
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java#L138-L149
On the other hand, since the user's producer is connected, the producer for
geo-replication is also restarted and the cursor is rewound. After that, the
state of the replicator is changed to `Started`.
https://github.com/apache/pulsar/blob/ca01447fb4df808f8e2da2dc75d44fad0b780032/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L138-L145
At this time, one of the operations triggered before the cursor is rewound
succeeds, causing `readPosition` to move to "the position next to the
successfully read entry". Entries before this position will not be read again.
As a result, entries that have been read once but not acknowledged will be left
as an ack hole.
In short, a race condition between "cursor rewinding when the producer for
geo-replication is restarted" and "an read operation that was triggered the
last time geo-replication occurred" is what causes this issue.
### Modifications
Add a flag named `waitForCursorRewinding` to the `PersistentReplicator`
class. Normally this value is false. If this value becomes true, the replicator
will no longer call `cursor.asyncReadEntriesOrWait`.
On the other hand, set `waitForCursorRewinding` to true at the beginning of
the `readEntries` method that is executed when restarting the producer for
geo-replication. Then wait until at least one of the following conditions is
met:
- `state` is no longer `Starting`
- This means the replicator has been stopped
- `havePendingRead` becomes `FALSE`
- This means there are no reads in progress
- `cursor.cancelPendingReadRequest` returns true
- This means that there was a read in progress, but it was successfully
canceled
Then change `state` to `Started`, rewind the cursor, and set
`waitForCursorRewinding` back to false. This prevents a read triggered before
the cursor has been rewound from advancing the cursor again, leaving an ack
hole.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]