This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e73532a8ae9 [fix][broker] Ensure KeyShared sticky mode consumer 
respects assigned ranges (#24730)
e73532a8ae9 is described below

commit e73532a8ae9f443f5942181f9c70bc796a380160
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon Sep 15 15:19:54 2025 +0800

    [fix][broker] Ensure KeyShared sticky mode consumer respects assigned 
ranges (#24730)
---
 ...ashRangeExclusiveStickyKeyConsumerSelector.java | 112 ++++++++++++---------
 ...angeExclusiveStickyKeyConsumerSelectorTest.java | 106 +++++++++++++++++--
 .../client/api/KeySharedSubscriptionTest.java      |  66 ++++++++++++
 3 files changed, 228 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 904fb702a94..7fb99838197 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.broker.service;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.IntRange;
-import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.util.FutureUtil;
 
 /**
@@ -38,7 +39,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 public class HashRangeExclusiveStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
     private final int rangeSize;
     private final Range keyHashRange;
-    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
+    private final ConcurrentSkipListMap<Integer, Pair<Range, Consumer>> 
rangeMap;
 
     public HashRangeExclusiveStickyKeyConsumerSelector() {
         this(DEFAULT_RANGE_SIZE);
@@ -73,47 +74,41 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
                     + conflictingConsumer);
         }
         for (IntRange intRange : 
consumer.getKeySharedMeta().getHashRangesList()) {
-            rangeMap.put(intRange.getStart(), consumer);
-            rangeMap.put(intRange.getEnd(), consumer);
+            rangeMap.put(intRange.getStart(), 
Pair.of(Range.of(intRange.getStart(), intRange.getEnd()), consumer));
         }
         return Optional.empty();
     }
 
     @Override
     public synchronized Optional<ImpactedConsumersResult> 
removeConsumer(Consumer consumer) {
-        rangeMap.entrySet().removeIf(entry -> 
entry.getValue().equals(consumer));
+        rangeMap.entrySet().removeIf(entry -> 
entry.getValue().getRight().equals(consumer));
         return Optional.empty();
     }
 
     @Override
     public synchronized ConsumerHashAssignmentsSnapshot 
getConsumerHashAssignmentsSnapshot() {
         List<HashRangeAssignment> result = new ArrayList<>();
-        Map.Entry<Integer, Consumer> prev = null;
-        for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
-            if (prev == null) {
-                prev = entry;
-            } else {
-                if (prev.getValue().equals(entry.getValue())) {
-                    result.add(new HashRangeAssignment(Range.of(prev.getKey(), 
entry.getKey()), entry.getValue()));
-                }
-                prev = null;
-            }
+        for (Map.Entry<Integer, Pair<Range, Consumer>> entry : 
rangeMap.entrySet()) {
+            Range assignedRange = entry.getValue().getLeft();
+            Consumer assignedConsumer = entry.getValue().getRight();
+            result.add(new HashRangeAssignment(assignedRange, 
assignedConsumer));
         }
         return ConsumerHashAssignmentsSnapshot.of(result);
     }
 
     @Override
     public Consumer select(int hash) {
-        if (rangeMap.size() > 0) {
-            Map.Entry<Integer, Consumer> ceilingEntry = 
rangeMap.ceilingEntry(hash);
-            Map.Entry<Integer, Consumer> floorEntry = 
rangeMap.floorEntry(hash);
-            Consumer ceilingConsumer = ceilingEntry != null ? 
ceilingEntry.getValue() : null;
-            Consumer floorConsumer = floorEntry != null ? 
floorEntry.getValue() : null;
-            if (floorConsumer != null && 
floorConsumer.equals(ceilingConsumer)) {
-                return ceilingConsumer;
-            } else {
-                return null;
-            }
+        if (rangeMap.isEmpty()) {
+            return null;
+        }
+
+        Map.Entry<Integer, Pair<Range, Consumer>> floorEntry = 
rangeMap.floorEntry(hash);
+        if (floorEntry == null) {
+            return null;
+        }
+        Pair<Range, Consumer> pair = floorEntry.getValue();
+        if (pair.getLeft().contains(hash)) {
+            return pair.getRight();
         } else {
             return null;
         }
@@ -129,10 +124,25 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
             return FutureUtil.failedFuture(new 
BrokerServiceException.ConsumerAssignException(
                     "Ranges for KeyShared policy must not be empty."));
         }
-        for (IntRange intRange : ranges) {
-            if (intRange.getStart() > intRange.getEnd()) {
+        List<IntRange> sortedRanges = new ArrayList<>(ranges);
+        sortedRanges.sort(Comparator.comparingInt(IntRange::getStart));
+        for (int i = 0; i < sortedRanges.size(); i++) {
+            IntRange currentRange = sortedRanges.get(i);
+            // 1. Validate: check if start > end for the current range
+            if (currentRange.getStart() > currentRange.getEnd()) {
                 return FutureUtil.failedFuture(
-                        new 
BrokerServiceException.ConsumerAssignException("Fixed hash range start > end"));
+                        new 
BrokerServiceException.ConsumerAssignException("Fixed hash range start > end 
for range: "
+                                + "[" + currentRange.getStart() + "," + 
currentRange.getEnd() + "]"));
+            }
+            // 2. Validate: check for overlaps with the next range in the 
sorted list
+            if (i < sortedRanges.size() - 1) {
+                IntRange nextRange = sortedRanges.get(i + 1);
+                if (areRangesOverlapping(currentRange, nextRange)) {
+                    return FutureUtil.failedFuture(
+                            new 
BrokerServiceException.ConsumerAssignException("Consumer's own ranges conflict: 
"
+                                    + "[" + currentRange.getStart() + "," + 
currentRange.getEnd() + "] "
+                                    + "overlaps with [" + nextRange.getStart() 
+ "," + nextRange.getEnd() + "]"));
+                }
             }
         }
         Consumer conflictingConsumer = findConflictingConsumer(ranges);
@@ -143,34 +153,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
         }
     }
 
-    private synchronized Consumer findConflictingConsumer(List<IntRange> 
ranges) {
-        for (IntRange intRange : ranges) {
-            Map.Entry<Integer, Consumer> ceilingEntry = 
rangeMap.ceilingEntry(intRange.getStart());
-            Map.Entry<Integer, Consumer> floorEntry = 
rangeMap.floorEntry(intRange.getEnd());
-
-            if (floorEntry != null && floorEntry.getKey() >= 
intRange.getStart()) {
-                return floorEntry.getValue();
-            }
-
-            if (ceilingEntry != null && ceilingEntry.getKey() <= 
intRange.getEnd()) {
-                return ceilingEntry.getValue();
+    private synchronized Consumer findConflictingConsumer(List<IntRange> 
newConsumerRanges) {
+        for (IntRange newRange : newConsumerRanges) {
+            // 1. Check for potential conflicts with existing ranges that 
start before newRange's start.
+            Map.Entry<Integer, Pair<Range, Consumer>> conflictBeforeStart = 
rangeMap.floorEntry(newRange.getStart());
+            if (conflictBeforeStart != null) {
+                Range existingRange = conflictBeforeStart.getValue().getLeft();
+                if (areRangesOverlapping(newRange, existingRange)) {
+                    return conflictBeforeStart.getValue().getRight();
+                }
             }
-
-            if (ceilingEntry != null && floorEntry != null && 
ceilingEntry.getValue().equals(floorEntry.getValue())) {
-                KeySharedMeta keySharedMeta = 
ceilingEntry.getValue().getKeySharedMeta();
-                for (IntRange range : keySharedMeta.getHashRangesList()) {
-                    int start = Math.max(intRange.getStart(), 
range.getStart());
-                    int end = Math.min(intRange.getEnd(), range.getEnd());
-                    if (end >= start) {
-                        return ceilingEntry.getValue();
-                    }
+            // 2. Check for potential conflicts with existing ranges that 
start after newRange's start.
+            Map.Entry<Integer, Pair<Range, Consumer>> conflictAfterStart = 
rangeMap.ceilingEntry(newRange.getStart());
+            if (conflictAfterStart != null) {
+                Range existingRange = conflictAfterStart.getValue().getLeft();
+                if (areRangesOverlapping(newRange, existingRange)) {
+                    return conflictAfterStart.getValue().getRight();
                 }
             }
         }
         return null;
     }
 
-    Map<Integer, Consumer> getRangeConsumer() {
+
+    private static boolean areRangesOverlapping(IntRange range1, Range range2) 
{
+        return Math.max(range1.getStart(), range2.getStart()) <= 
Math.min(range1.getEnd(), range2.getEnd());
+    }
+
+    private static boolean areRangesOverlapping(IntRange range1, IntRange 
range2) {
+        return Math.max(range1.getStart(), range2.getStart()) <= 
Math.min(range1.getEnd(), range2.getEnd());
+    }
+
+    Map<Integer, Pair<Range, Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(rangeMap);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
index f3828981c8e..7dd3bde4489 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
@@ -50,7 +50,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
         when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
         Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
         selector.addConsumer(consumer1).get();
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
         Consumer selectedConsumer;
         for (int i = 0; i < 3; i++) {
             selectedConsumer = selector.select(i);
@@ -66,7 +66,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
         when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2);
         Assert.assertEquals(consumer2.getKeySharedMeta(), keySharedMeta2);
         selector.addConsumer(consumer2).get();
-        Assert.assertEquals(selector.getRangeConsumer().size(), 4);
+        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
 
         for (int i = 3; i < 10; i++) {
             selectedConsumer = selector.select(i);
@@ -79,7 +79,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest {
         }
 
         selector.removeConsumer(consumer1);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
         selectedConsumer = selector.select(1);
         Assert.assertNull(selectedConsumer);
 
@@ -89,6 +89,38 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest 
{
         Assert.assertNull(selectedConsumer);
     }
 
+
+    @Test
+    public void testConsumerSelectWithMultipleRanges() throws 
ExecutionException, InterruptedException {
+
+        HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(20);
+        Consumer consumer1 = mock(Consumer.class);
+        KeySharedMeta keySharedMeta1 = new KeySharedMeta()
+                .setKeySharedMode(KeySharedMode.STICKY);
+        keySharedMeta1.addHashRange().setStart(2).setEnd(6);
+        keySharedMeta1.addHashRange().setStart(10).setEnd(15);
+        when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
+        Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
+        selector.addConsumer(consumer1).get();
+        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+        Consumer selectedConsumer;
+        for (int i = 2; i <= 6; i++) {
+            selectedConsumer = selector.select(i);
+            Assert.assertEquals(selectedConsumer, consumer1);
+        }
+        for (int i = 10; i <= 15; i++) {
+            selectedConsumer = selector.select(i);
+            Assert.assertEquals(selectedConsumer, consumer1);
+        }
+        selectedConsumer = selector.select(1);
+        Assert.assertNull(selectedConsumer);
+        selectedConsumer = selector.select(9);
+        Assert.assertNull(selectedConsumer);
+        selectedConsumer = selector.select(18);
+        Assert.assertNull(selectedConsumer);
+
+    }
+
     @Test
     public void testEmptyRanges() {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
@@ -189,6 +221,66 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
         }
     }
 
+    @Test
+    public void testShouldConflictConsumerWithSelfOverlappingRanges() throws 
ExecutionException, InterruptedException {
+        HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
+
+        final List<IntRange> testRanges = new ArrayList<>();
+        testRanges.add(new IntRange().setStart(1).setEnd(3));
+        testRanges.add(new IntRange().setStart(2).setEnd(2));
+        testRanges.add(new IntRange().setStart(5).setEnd(5));
+        testRanges.add(new IntRange().setStart(4).setEnd(6));
+        KeySharedMeta keySharedMeta = new KeySharedMeta()
+                .setKeySharedMode(KeySharedMode.STICKY);
+        keySharedMeta.addAllHashRanges(testRanges);
+        Consumer consumer = mock(Consumer.class);
+        when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
+        Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
+
+        try {
+            selector.addConsumer(consumer).get();
+            Assert.fail("should be failed");
+        } catch (ExecutionException | InterruptedException e) {
+            // ignore
+        }
+
+        Assert.assertEquals(selector.getRangeConsumer().size(), 0);
+    }
+
+    @Test
+    public void testShouldConflictConsumerWithBoundaryRanges() throws 
ExecutionException, InterruptedException {
+        HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
+        TransportCnx transportCnx = mock(TransportCnx.class);
+
+        // 1. add consumer 1 with range [2, 5]
+        KeySharedMeta keySharedMeta1 = new KeySharedMeta()
+                .setKeySharedMode(KeySharedMode.STICKY);
+        keySharedMeta1.addAllHashRanges(List.of(new 
IntRange().setStart(2).setEnd(5)));
+        Consumer consumer1 = mock(Consumer.class);
+        when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
+        when(consumer1.cnx()).thenReturn(transportCnx);
+        
when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null));
+        Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
+        selector.addConsumer(consumer1).get();
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
+
+        // 2. add consumer 2 with range [5, 10], should be conflict with 
consumer 1
+        KeySharedMeta keySharedMeta2 = new KeySharedMeta()
+                .setKeySharedMode(KeySharedMode.STICKY);
+        keySharedMeta2.addAllHashRanges(List.of(new 
IntRange().setStart(5).setEnd(10)));
+        Consumer consumer2 = mock(Consumer.class);
+        when(consumer2.cnx()).thenReturn(transportCnx);
+        
when(transportCnx.checkConnectionLiveness()).thenReturn(CompletableFuture.completedFuture(null));
+        when(consumer2.getKeySharedMeta()).thenReturn(keySharedMeta2);
+        try {
+            selector.addConsumer(consumer2).get();
+            Assert.fail("should be failed");
+        } catch (ExecutionException | InterruptedException e) {
+            // ignore
+        }
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
+    }
+
     @Test
     public void testSingleRangeConflict() throws ExecutionException, 
InterruptedException {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
@@ -202,7 +294,7 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
         when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
         Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
         selector.addConsumer(consumer1).get();
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
 
         final List<IntRange> testRanges = new ArrayList<>();
         testRanges.add(new IntRange().setStart(4).setEnd(6));
@@ -228,7 +320,7 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
             } catch (ExecutionException | InterruptedException e) {
                 // ignore
             }
-            Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+            Assert.assertEquals(selector.getRangeConsumer().size(), 1);
         }
     }
 
@@ -245,7 +337,7 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
         when(consumer1.getKeySharedMeta()).thenReturn(keySharedMeta1);
         Assert.assertEquals(consumer1.getKeySharedMeta(), keySharedMeta1);
         selector.addConsumer(consumer1).get();
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
 
         final List<List<IntRange>> testRanges = new ArrayList<>();
         testRanges.add(List.of(
@@ -271,7 +363,7 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
             } catch (ExecutionException | InterruptedException e) {
                 // ignore
             }
-            Assert.assertEquals(selector.getRangeConsumer().size(), 2);
+            Assert.assertEquals(selector.getRangeConsumer().size(), 1);
         }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 80b5070077d..7f7de33b43a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -2668,4 +2668,70 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
             logTopicStats(topic);
         }
     }
+
+    @Test
+    public void testCustomStickyRange() throws Exception {
+        int messageCount = 100;
+        final String topicName = 
"persistent://public/default/test-sticky-range-" + System.nanoTime();
+        final String subscriptionName = "sub-sticky-range";
+
+        // 0. Init topic and subscription
+        admin.topics().createPartitionedTopic(topicName, 4);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // 1. Create a producer and send messages
+        try (Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create()) {
+            for (int i = 0; i < messageCount; i++) {
+                String key = String.valueOf(i);
+                producer.newMessage()
+                        .value(String.valueOf(i).getBytes())
+                        .key(key)
+                        .send();
+            }
+        }
+
+        // 3. One by one create consumers consume message with different 
sticky hash range
+        KeySharedPolicy.KeySharedPolicySticky policy1 = 
KeySharedPolicy.stickyHashRange()
+                .ranges(Range.of(0, 9999), Range.of(20000, 29999), 
Range.of(40000, 49999));
+        KeySharedPolicy.KeySharedPolicySticky policy2 = 
KeySharedPolicy.stickyHashRange()
+                .ranges(Range.of(10000, 19999), Range.of(30000, 39999), 
Range.of(50000, 65535));
+
+        List<KeySharedPolicy.KeySharedPolicySticky> policies = 
Arrays.asList(policy1, policy2);
+        int[] receivedCounts = new int[2];
+        for (int i = 0; i < policies.size(); i++) {
+            Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                    .topic(topicName)
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Key_Shared)
+                    .keySharedPolicy(policies.get(i))
+                    .subscribe();
+            while (true) {
+                try {
+                    Message<byte[]> msg = consumer.receive(2, 
TimeUnit.SECONDS);
+                    if (msg == null) {
+                        break;
+                    }
+                    receivedCounts[i]++;
+                    log.debug("Consumer #{} received message with key:{} 
total:{}",
+                            i + 1, msg.getKey(), receivedCounts[i]);
+                } catch (Exception e) {
+                    break;
+                }
+            }
+            // make sure consume closed before start next consumer
+            consumer.close();
+        }
+
+        int consumer1Received = receivedCounts[0];
+        int consumer2Received = receivedCounts[1];
+
+        log.info("Consumer1 total received: {}", consumer1Received);
+        log.info("Consumer2 total received: {}", consumer2Received);
+        Assert.assertEquals(consumer1Received + consumer2Received, 
messageCount,
+                "Total messages received by both consumers should be " + 
messageCount);
+    }
 }

Reply via email to