dao-jun opened a new pull request, #23919: URL: https://github.com/apache/pulsar/pull/23919
Fixes https://github.com/apache/pulsar/issues/23910 ### Motivation As the we don't enable `AppendBrokerTimestampMetadataInterceptor` by default, so the entries timestamp is not Strictly Increasing. Because the message timestamp is generated by the clients, the messages from different producers maybe not in global ordering(because of network delay, backpressure, thread scheduling, etc) In a single ledger, they may be arranged in the following way: [2, 1, 3, 5, 4, 6, 7, 9, 8.....] Overall, they have a self increasing trend, but locally, it may be possible not. 1. If the `Position` is null when call `PersistentMessageFinder.findMessages`, it will reset the cursor position to `earliest`, see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L804-L824 2. If the first entry we read from bk cannot meet the condition, it will return null. see: https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java#L87-L91 In https://github.com/apache/pulsar/pull/22792/commits/823a55dd659da6c1c54dca55d08f92053c91013e#diff-1d1f02c0ae1aed67e77512aebe4d7233705490b14960ee428611462e446861e5R133-R142, we optimized the case of the target entry maybe in the last opening ledger. It's very intuitive but the actual situation is more complicated: If we want to find the message's position whose timestamp is 101, the second-to-last ledger's close timestamp is 100, and the entries's timestamp in the last opening ledger arranged as [102,103,101,104...] The first entry's timestamp is greater than 101, so it will not meet the condition of https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java#L74-L75, and the return value will be null, the cursor position will be finally set to earliest. ### Modifications <!-- Describe the modifications you've done. --> ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: The tests will be run in the forked repository until all PR review comments have been handled, the tests pass and the PR is approved by a reviewer. --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
