Copilot commented on code in PR #24730:
URL: https://github.com/apache/pulsar/pull/24730#discussion_r2341218462


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##########
@@ -2537,4 +2537,70 @@ public void 
testDeliveryOfRemainingMessagesWithoutDeadlock(KeySharedImplementati
             logTopicStats(topic);
         }
     }
+
+    @Test
+    public void testCustomStickyRange() throws Exception {
+        int messageCount = 100;
+        final String topicName = 
"persistent://public/default/test-sticky-range-" + System.nanoTime();
+        final String subscriptionName = "sub-sticky-range";
+
+        // 0. Init topic and subscription
+        admin.topics().createPartitionedTopic(topicName, 4);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // 1. Create a producer and send messages
+        try (Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create()) {
+            for (int i = 0; i < messageCount; i++) {
+                String key = String.valueOf(i);
+                producer.newMessage()
+                        .value(String.valueOf(i).getBytes())
+                        .key(key)
+                        .send();
+            }
+        }
+
+        // 3. One by one create consumers consume message with different 
sticky hash range

Review Comment:
   The comment has a grammatical error. It should be 'One by one create 
consumers to consume messages with different sticky hash ranges' or 'Create 
consumers one by one to consume messages with different sticky hash ranges'.
   ```suggestion
           // 3. Create consumers one by one to consume messages with different 
sticky hash ranges
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java:
##########
@@ -143,34 +153,38 @@ private synchronized CompletableFuture<Void> 
validateKeySharedMeta(Consumer cons
         }
     }
 
-    private synchronized Consumer findConflictingConsumer(List<IntRange> 
ranges) {
-        for (IntRange intRange : ranges) {
-            Map.Entry<Integer, Consumer> ceilingEntry = 
rangeMap.ceilingEntry(intRange.getStart());
-            Map.Entry<Integer, Consumer> floorEntry = 
rangeMap.floorEntry(intRange.getEnd());
-
-            if (floorEntry != null && floorEntry.getKey() >= 
intRange.getStart()) {
-                return floorEntry.getValue();
-            }
-
-            if (ceilingEntry != null && ceilingEntry.getKey() <= 
intRange.getEnd()) {
-                return ceilingEntry.getValue();
+    private synchronized Consumer findConflictingConsumer(List<IntRange> 
newConsumerRanges) {
+        for (IntRange newRange : newConsumerRanges) {
+            // 1. Check for potential conflicts with existing ranges that 
start before newRange's start.
+            Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart = 
rangeMap.floorEntry(newRange.getStart());
+            if (conflictBeforeStart != null) {
+                Range existingRange = conflictBeforeStart.getValue().getLeft();
+                if (checkRangesOverlap(newRange, existingRange)) {
+                    return conflictBeforeStart.getValue().getRight();
+                }
             }
-
-            if (ceilingEntry != null && floorEntry != null && 
ceilingEntry.getValue().equals(floorEntry.getValue())) {
-                KeySharedMeta keySharedMeta = 
ceilingEntry.getValue().getKeySharedMeta();
-                for (IntRange range : keySharedMeta.getHashRangesList()) {
-                    int start = Math.max(intRange.getStart(), 
range.getStart());
-                    int end = Math.min(intRange.getEnd(), range.getEnd());
-                    if (end >= start) {
-                        return ceilingEntry.getValue();
-                    }
+            // 2. Check for potential conflicts with existing ranges that 
start after newRange's start.
+            Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart = 
rangeMap.ceilingEntry(newRange.getStart());
+            if (conflictAfterStart != null) {
+                Range existingRange = conflictAfterStart.getValue().getLeft();
+                if (checkRangesOverlap(newRange, existingRange)) {
+                    return conflictAfterStart.getValue().getRight();
                 }
             }
         }
         return null;
     }
 
-    Map<Integer, Consumer> getRangeConsumer() {
+
+    private boolean checkRangesOverlap(IntRange range1, Range range2) {
+        return Math.max(range1.getStart(), range2.getStart()) <= 
Math.min(range1.getEnd(), range2.getEnd());
+    }
+
+    private boolean checkRangesOverlap(IntRange range1, IntRange range2) {
+        return Math.max(range1.getStart(), range2.getStart()) <= 
Math.min(range1.getEnd(), range2.getEnd());

Review Comment:
   These two methods have identical logic and only differ in parameter types. 
Consider consolidating them into a single generic method or having one delegate 
to the other to reduce code duplication.
   ```suggestion
           return checkRangesOverlap(range1, new Range(range2.getStart(), 
range2.getEnd()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to