shibd opened a new pull request, #24730:
URL: https://github.com/apache/pulsar/pull/24730

   ### Motivation
   
   This PR fixes a critical bug where the `KeyShared` sticky mode consumer in 
Pulsar (reproduced with the Node.js client) would incorrectly consume messages 
from hash ranges not explicitly assigned to it. This deviates from the expected 
"sticky" behavior and instead mimics "auto-split" logic.
   
   Specifically, if a single `KeyShared` consumer is configured with 
non-contiguous sticky ranges, e.g.:
   - `[0, 9999]`
   - `[20000, 29999]`
   - `[40000, 49999]`
   
   The consumer would incorrectly receive messages for keys falling into the 
gaps (e.g., `15000` which is between `9999` and `20000`). The root cause lies 
in the `HashRangeExclusiveStickyKeyConsumerSelector`'s `select` method, which 
previously stored only the start/end points of ranges in its `rangeMap`. This 
led to an erroneous interpretation where any hash between a range's `end` and 
the next range's `start` would be assigned to the previous range's consumer, 
effectively closing the gaps.
   
   This issue is demonstrable with:
   - The `testConsumerSelectWithMultipRanges` unit test in this PR.
   - The `testCustomStickyRange` integration test in this PR.
   
   ### Modifications
   
   This PR refactors the `HashRangeExclusiveStickyKeyConsumerSelector` to 
ensure strict enforcement of explicitly defined sticky ranges:
   
   1.  **Refactored `rangeMap`**: Changed `rangeMap` from 
`ConcurrentSkipListMap<Integer, Consumer>` to `ConcurrentSkipListMap<Integer, 
Pair<Range, Consumer>>`. This now allows the selector to store the complete 
`Range` object for each assignment, eliminating the need to infer range 
boundaries and enabling precise range containment checks.
   2.  **Enhanced `select(int hash)` Logic**: The `select` method is updated to 
directly utilize the stored `Range` objects. It now accurately checks if a 
given `hash` is explicitly contained within an assigned `Range` using 
`pair.getLeft().contains(hash)`, rather than relying on potentially ambiguous 
`floorEntry`/`ceilingEntry` comparisons.
   3.  **Improved `validateKeySharedMeta(Consumer consumer)` for Strict 
Validation**:
       * **Internal Range Validity**: A new validation step ensures that a 
consumer's own `KeySharedMeta` ranges are valid. It verifies that `start <= 
end` for all `IntRange` objects.
       * **Internal Range Overlap**: Critically, it now detects and rejects 
`KeySharedMeta` where a consumer's own assigned `IntRange`s internally overlap. 
This prevents consumers from being registered with self-conflicting 
configurations.
   4.  **Refined `findConflictingConsumer(List<IntRange> newConsumerRanges)`**: 
This method's logic was updated to leverage the explicit `Pair<Range, 
Consumer>` storage in `rangeMap`. It now accurately detects overlaps between a 
new consumer's ranges and ranges already assigned to other active consumers, 
utilizing `floorEntry`, `tailMap`, and an `checkRangesOverlap` helper for 
efficient and precise conflict detection.
   
   These modifications collectively guarantee that 
`HashRangeExclusiveStickyKeyConsumerSelector` strictly enforces range 
exclusivity and assignment according to the `KeyShared` sticky policy, 
resolving the unintended message consumption from unassigned ranges.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added new unit tests and an integration test and can be verified 
as follows:
   
   -   **`HashRangeExclusiveStickyKeyConsumerSelectorTest` (Unit Tests)**:
       * `testConsumerSelect`: Updated to verify `select(hash)` returns the 
correct consumer only when `hash` is within an explicitly defined range, and 
`null` for hashes in gaps.
       * `testConsumerSelectWithMultipRanges`: Added to confirm a single 
consumer with multiple distinct sticky ranges correctly selects messages only 
within those ranges and returns `null` for hashes in gaps.
       * `testOneConsumerRangeConflict`: Added to verify that a consumer cannot 
be added if its own `KeySharedMeta` contains internally conflicting or invalid 
(`start > end`) ranges.
       * `testSingleRangeConflict` and `testMultipleRangeConflict`: Updated to 
correctly assert expected conflicts and non-conflicts based on the new strict 
range overlap detection logic.
   -   **`KeySharedSubscriptionTest.testCustomStickyRange` (Integration Test)**:
       * A new end-to-end integration test has been added. It simulates the 
reported scenario with a partitioned topic and two consumers assigned 
non-overlapping sticky ranges. This test verifies that each consumer *only* 
receives messages for keys falling within its *explicitly assigned ranges*, 
confirming the fix for the unintended auto-split behavior.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [x] The binary protocol (Changes to `KeySharedMeta` validation might 
affect how clients sending invalid `IntRange` lists are handled by the broker, 
potentially rejecting previously accepted invalid configurations.)
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [x] Anything that affects deployment (The broker's consumer selection 
logic is fundamentally changed for `KeyShared` sticky subscriptions, enforcing 
strict range adherence. This alters message distribution behavior for affected 
subscriptions.)
   
   ### Documentation
   
   - [ ] `doc`
   - [x] `doc-required` - [ ] `doc-not-needed`
   - [ ] `doc-complete`
   
   ### Matching PR in forked repository
   
   PR in forked repository: ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to