Copilot commented on code in PR #24730: URL: https://github.com/apache/pulsar/pull/24730#discussion_r2341218462
########## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ########## @@ -2537,4 +2537,70 @@ public void testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati logTopicStats(topic); } } + + @Test + public void testCustomStickyRange() throws Exception { + int messageCount = 100; + final String topicName = "persistent://public/default/test-sticky-range-" + System.nanoTime(); + final String subscriptionName = "sub-sticky-range"; + + // 0. Init topic and subscription + admin.topics().createPartitionedTopic(topicName, 4); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + // 1. Create a producer and send messages + try (Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create()) { + for (int i = 0; i < messageCount; i++) { + String key = String.valueOf(i); + producer.newMessage() + .value(String.valueOf(i).getBytes()) + .key(key) + .send(); + } + } + + // 3. One by one create consumers consume message with different sticky hash range Review Comment: The comment has a grammatical error. It should be 'One by one create consumers to consume messages with different sticky hash ranges' or 'Create consumers one by one to consume messages with different sticky hash ranges'. ```suggestion // 3. Create consumers one by one to consume messages with different sticky hash ranges ``` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java: ########## @@ -143,34 +153,38 @@ private synchronized CompletableFuture<Void> validateKeySharedMeta(Consumer cons } } - private synchronized Consumer findConflictingConsumer(List<IntRange> ranges) { - for (IntRange intRange : ranges) { - Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(intRange.getStart()); - Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(intRange.getEnd()); - - if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) { - return floorEntry.getValue(); - } - - if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) { - return ceilingEntry.getValue(); + private synchronized Consumer findConflictingConsumer(List<IntRange> newConsumerRanges) { + for (IntRange newRange : newConsumerRanges) { + // 1. Check for potential conflicts with existing ranges that start before newRange's start. + Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart = rangeMap.floorEntry(newRange.getStart()); + if (conflictBeforeStart != null) { + Range existingRange = conflictBeforeStart.getValue().getLeft(); + if (checkRangesOverlap(newRange, existingRange)) { + return conflictBeforeStart.getValue().getRight(); + } } - - if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) { - KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta(); - for (IntRange range : keySharedMeta.getHashRangesList()) { - int start = Math.max(intRange.getStart(), range.getStart()); - int end = Math.min(intRange.getEnd(), range.getEnd()); - if (end >= start) { - return ceilingEntry.getValue(); - } + // 2. Check for potential conflicts with existing ranges that start after newRange's start. + Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart = rangeMap.ceilingEntry(newRange.getStart()); + if (conflictAfterStart != null) { + Range existingRange = conflictAfterStart.getValue().getLeft(); + if (checkRangesOverlap(newRange, existingRange)) { + return conflictAfterStart.getValue().getRight(); } } } return null; } - Map<Integer, Consumer> getRangeConsumer() { + + private boolean checkRangesOverlap(IntRange range1, Range range2) { + return Math.max(range1.getStart(), range2.getStart()) <= Math.min(range1.getEnd(), range2.getEnd()); + } + + private boolean checkRangesOverlap(IntRange range1, IntRange range2) { + return Math.max(range1.getStart(), range2.getStart()) <= Math.min(range1.getEnd(), range2.getEnd()); Review Comment: These two methods have identical logic and only differ in parameter types. Consider consolidating them into a single generic method or having one delegate to the other to reduce code duplication. ```suggestion return checkRangesOverlap(range1, new Range(range2.getStart(), range2.getEnd())); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org