This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43aabfc720912a2773fac91e4061460b335d2a73
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Oct 2 11:13:36 2024 +0300

    [fix][broker] Fix out-of-order issues with 
ConsistentHashingStickyKeyConsumerSelector (#23327)
    
    (cherry picked from commit adb9014dbac21afdfb5fc252ac38e07ed2d6b19c)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java | 104 +++---
 .../broker/service/ConsumerIdentityWrapper.java    |  70 ++++
 .../broker/service/ConsumerNameIndexTracker.java   | 136 ++++++++
 ...istentHashingStickyKeyConsumerSelectorTest.java | 366 ++++++++++++++++++++-
 .../service/ConsumerIdentityWrapperTest.java       |  68 ++++
 .../service/ConsumerNameIndexTrackerTest.java      | 157 +++++++++
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   9 +-
 .../java/org/apache/pulsar/client/api/Range.java   |  11 +-
 8 files changed, 853 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index b2b2b512c8c..1ae9a6ff96b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
-import com.google.common.collect.Lists;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -44,7 +42,9 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     // Consistent-Hash ring
-    private final NavigableMap<Integer, List<Consumer>> hashRing;
+    private final NavigableMap<Integer, ConsumerIdentityWrapper> hashRing;
+    // Tracks the used consumer name indexes for each consumer name
+    private final ConsumerNameIndexTracker consumerNameIndexTracker = new 
ConsumerNameIndexTracker();
 
     private final int numberOfPoints;
 
@@ -57,21 +57,20 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
     public CompletableFuture<Void> addConsumer(Consumer consumer) {
         rwLock.writeLock().lock();
         try {
+            ConsumerIdentityWrapper consumerIdentityWrapper = new 
ConsumerIdentityWrapper(consumer);
             // Insert multiple points on the hash ring for every consumer
             // The points are deterministically added based on the hash of the 
consumer name
             for (int i = 0; i < numberOfPoints; i++) {
-                int hash = calculateHashForConsumerAndIndex(consumer, i);
-                hashRing.compute(hash, (k, v) -> {
-                    if (v == null) {
-                        return Lists.newArrayList(consumer);
-                    } else {
-                        if (!v.contains(consumer)) {
-                            v.add(consumer);
-                            
v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
-                        }
-                        return v;
-                    }
-                });
+                int consumerNameIndex =
+                        
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
+                int hash = calculateHashForConsumerAndIndex(consumer, 
consumerNameIndex, i);
+                // When there's a collision, the new consumer will replace the 
old one.
+                // This is a rare case, and it is acceptable to replace the 
old consumer since there
+                // are multiple points for each consumer. This won't affect 
the overall distribution significantly.
+                ConsumerIdentityWrapper removed = hashRing.put(hash, 
consumerIdentityWrapper);
+                if (removed != null) {
+                    consumerNameIndexTracker.decreaseConsumerRefCount(removed);
+                }
             }
             return CompletableFuture.completedFuture(null);
         } finally {
@@ -79,8 +78,19 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
         }
     }
 
-    private static int calculateHashForConsumerAndIndex(Consumer consumer, int 
index) {
-        String key = consumer.consumerName() + KEY_SEPARATOR + index;
+    /**
+     * Calculate the hash for a consumer and hash ring point.
+     * The hash is calculated based on the consumer name, consumer name index, 
and hash ring point index.
+     * The resulting hash is used as the key to insert the consumer into the 
hash ring.
+     *
+     * @param consumer the consumer
+     * @param consumerNameIndex the index of the consumer name
+     * @param hashRingPointIndex the index of the hash ring point
+     * @return the hash value
+     */
+    private static int calculateHashForConsumerAndIndex(Consumer consumer, int 
consumerNameIndex,
+                                                        int 
hashRingPointIndex) {
+        String key = consumer.consumerName() + KEY_SEPARATOR + 
consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
         return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
     }
 
@@ -88,20 +98,16 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
     public void removeConsumer(Consumer consumer) {
         rwLock.writeLock().lock();
         try {
-            // Remove all the points that were added for this consumer
-            for (int i = 0; i < numberOfPoints; i++) {
-                int hash = calculateHashForConsumerAndIndex(consumer, i);
-                hashRing.compute(hash, (k, v) -> {
-                    if (v == null) {
-                        return null;
-                    } else {
-                        v.removeIf(c -> c.equals(consumer));
-                        if (v.isEmpty()) {
-                            v = null;
-                        }
-                        return v;
+            ConsumerIdentityWrapper consumerIdentityWrapper = new 
ConsumerIdentityWrapper(consumer);
+            int consumerNameIndex = 
consumerNameIndexTracker.getTrackedIndex(consumerIdentityWrapper);
+            if (consumerNameIndex > -1) {
+                // Remove all the points that were added for this consumer
+                for (int i = 0; i < numberOfPoints; i++) {
+                    int hash = calculateHashForConsumerAndIndex(consumer, 
consumerNameIndex, i);
+                    if (hashRing.remove(hash, consumerIdentityWrapper)) {
+                        
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
                     }
-                });
+                }
             }
         } finally {
             rwLock.writeLock().unlock();
@@ -115,16 +121,13 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
             if (hashRing.isEmpty()) {
                 return null;
             }
-
-            List<Consumer> consumerList;
-            Map.Entry<Integer, List<Consumer>> ceilingEntry = 
hashRing.ceilingEntry(hash);
+            Map.Entry<Integer, ConsumerIdentityWrapper> ceilingEntry = 
hashRing.ceilingEntry(hash);
             if (ceilingEntry != null) {
-                consumerList =  ceilingEntry.getValue();
+                return ceilingEntry.getValue().consumer;
             } else {
-                consumerList = hashRing.firstEntry().getValue();
+                // Handle wrap-around in the hash ring, return the first 
consumer
+                return hashRing.firstEntry().getValue().consumer;
             }
-
-            return consumerList.get(hash % consumerList.size());
         } finally {
             rwLock.readLock().unlock();
         }
@@ -132,16 +135,27 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
 
     @Override
     public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
-        Map<Consumer, List<Range>> result = new LinkedHashMap<>();
+        Map<Consumer, List<Range>> result = new IdentityHashMap<>();
         rwLock.readLock().lock();
         try {
+            if (hashRing.isEmpty()) {
+                return result;
+            }
             int start = 0;
-            for (Map.Entry<Integer, List<Consumer>> entry: 
hashRing.entrySet()) {
-                for (Consumer consumer: entry.getValue()) {
-                    result.computeIfAbsent(consumer, key -> new ArrayList<>())
-                            .add(Range.of(start, entry.getKey()));
-                }
-                start = entry.getKey() + 1;
+            int lastKey = 0;
+            for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: 
hashRing.entrySet()) {
+                Consumer consumer = entry.getValue().consumer;
+                result.computeIfAbsent(consumer, key -> new ArrayList<>())
+                        .add(Range.of(start, entry.getKey()));
+                lastKey = entry.getKey();
+                start = lastKey + 1;
+            }
+            // Handle wrap-around in the hash ring, the first consumer will 
also contain the range from the last key
+            // to the maximum value of the hash range
+            Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
+            List<Range> ranges = result.get(firstConsumer);
+            if (lastKey != Integer.MAX_VALUE - 1) {
+                ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
             }
         } finally {
             rwLock.readLock().unlock();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java
new file mode 100644
index 00000000000..2aae1d9b062
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+/**
+ * A wrapper class for a Consumer instance that provides custom implementations
+ * of equals and hashCode methods. The equals method returns true if and only 
if
+ * the compared instance is the same instance.
+ *
+ * <p>The reason for this class is the custom implementation of {@link 
Consumer#equals(Object)}.
+ * Using this wrapper class will be useful in use cases where it's necessary 
to match a key
+ * in a map by instance or a value in a set by instance.</p>
+ */
+class ConsumerIdentityWrapper {
+    final Consumer consumer;
+
+    public ConsumerIdentityWrapper(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    /**
+     * Compares this wrapper to the specified object. The result is true if 
and only if
+     * the argument is not null and is a ConsumerIdentityWrapper object that 
wraps
+     * the same Consumer instance.
+     *
+     * @param obj the object to compare this ConsumerIdentityWrapper against
+     * @return true if the given object represents a ConsumerIdentityWrapper
+     *         equivalent to this wrapper, false otherwise
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof ConsumerIdentityWrapper) {
+            ConsumerIdentityWrapper other = (ConsumerIdentityWrapper) obj;
+            return consumer == other.consumer;
+        }
+        return false;
+    }
+
+    /**
+     * Returns a hash code for this wrapper. The hash code is computed based on
+     * the wrapped Consumer instance.
+     *
+     * @return a hash code value for this object
+     */
+    @Override
+    public int hashCode() {
+        return consumer.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return consumer.toString();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java
new file mode 100644
index 00000000000..1f93313ab1b
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Tracks the used consumer name indexes for each consumer name.
+ * This is used by {@link ConsistentHashingStickyKeyConsumerSelector} to get a 
unique "consumer name index"
+ * for each consumer name. It is useful when there are multiple consumers with 
the same name, but they are
+ * different consumers. The purpose of the index is to prevent collisions in 
the hash ring.
+ *
+ * The consumer name index serves as an additional key for the hash ring 
assignment. The logic keeps track of
+ * used "index slots" for each consumer name and assigns the first unused 
index when a new consumer is added.
+ * This approach minimizes hash collisions due to using the same consumer name.
+ *
+ * An added benefit of this tracking approach is that a consumer that leaves 
and then rejoins immediately will get the
+ * same index and therefore the same assignments in the hash ring. This 
improves stability since the hash assignment
+ * changes are minimized over time, although a better solution would be to 
avoid reusing the same consumer name
+ * in the first place.
+ *
+ * When a consumer is removed, the index is deallocated. RoaringBitmap is used 
to keep track of the used indexes.
+ * The data structure to track a consumer name is removed when the reference 
count of the consumer name is zero.
+ *
+ * This class is not thread-safe and should be used in a synchronized context 
in the caller.
+ */
+@NotThreadSafe
+class ConsumerNameIndexTracker {
+    // tracks the used index slots for each consumer name
+    private final Map<String, ConsumerNameIndexSlots> 
consumerNameIndexSlotsMap = new HashMap<>();
+    // tracks the active consumer entries
+    private final Map<ConsumerIdentityWrapper, ConsumerEntry> consumerEntries 
= new HashMap<>();
+
+    // Represents a consumer entry in the tracker, including the consumer 
name, index, and reference count.
+    record ConsumerEntry(String consumerName, int nameIndex, MutableInt 
refCount) {
+    }
+
+    /*
+     * Tracks the used indexes for a consumer name using a RoaringBitmap.
+     * A specific index slot is used when the bit is set.
+     * When all bits are cleared, the customer name can be removed from 
tracking.
+     */
+    static class ConsumerNameIndexSlots {
+        private RoaringBitmap indexSlots = new RoaringBitmap();
+
+        public int allocateIndexSlot() {
+            // find the first index that is not set, if there is no such 
index, add a new one
+            int index = (int) indexSlots.nextAbsentValue(0);
+            if (index == -1) {
+                index = indexSlots.getCardinality();
+            }
+            indexSlots.add(index);
+            return index;
+        }
+
+        public boolean deallocateIndexSlot(int index) {
+            indexSlots.remove(index);
+            return indexSlots.isEmpty();
+        }
+    }
+
+    /*
+     * Adds a reference to the consumer and returns the index assigned to this 
consumer.
+     */
+    public int increaseConsumerRefCountAndReturnIndex(ConsumerIdentityWrapper 
wrapper) {
+        ConsumerEntry entry = consumerEntries.computeIfAbsent(wrapper, k -> {
+            String consumerName = wrapper.consumer.consumerName();
+            return new ConsumerEntry(consumerName, 
allocateConsumerNameIndex(consumerName), new MutableInt(0));
+        });
+        entry.refCount.increment();
+        return entry.nameIndex;
+    }
+
+    private int allocateConsumerNameIndex(String consumerName) {
+        return getConsumerNameIndexBitmap(consumerName).allocateIndexSlot();
+    }
+
+    private ConsumerNameIndexSlots getConsumerNameIndexBitmap(String 
consumerName) {
+        return consumerNameIndexSlotsMap.computeIfAbsent(consumerName, k -> 
new ConsumerNameIndexSlots());
+    }
+
+    /*
+     * Decreases the reference count of the consumer and removes the consumer 
name from tracking if the ref count is
+     * zero.
+     */
+    public void decreaseConsumerRefCount(ConsumerIdentityWrapper removed) {
+        ConsumerEntry consumerEntry = consumerEntries.get(removed);
+        int refCount = consumerEntry.refCount.decrementAndGet();
+        if (refCount == 0) {
+            deallocateConsumerNameIndex(consumerEntry.consumerName, 
consumerEntry.nameIndex);
+            consumerEntries.remove(removed, consumerEntry);
+        }
+    }
+
+    private void deallocateConsumerNameIndex(String consumerName, int index) {
+        if 
(getConsumerNameIndexBitmap(consumerName).deallocateIndexSlot(index)) {
+            consumerNameIndexSlotsMap.remove(consumerName);
+        }
+    }
+
+    /*
+     * Returns the currently tracked index for the consumer.
+     */
+    public int getTrackedIndex(ConsumerIdentityWrapper wrapper) {
+        ConsumerEntry consumerEntry = consumerEntries.get(wrapper);
+        return consumerEntry != null ? consumerEntry.nameIndex : -1;
+    }
+
+    int getTrackedConsumerNamesCount() {
+        return consumerNameIndexSlotsMap.size();
+    }
+
+    int getTrackedConsumersCount() {
+        return consumerEntries.size();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 48311c57338..04aafc49b47 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -18,19 +18,27 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.commons.lang3.mutable.MutableInt;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
 import org.apache.pulsar.client.api.Range;
+import org.assertj.core.data.Offset;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -40,7 +48,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest {
     @Test
     public void testConsumerSelect() throws ConsumerAssignException {
 
-        ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
         String key1 = "anyKey";
         Assert.assertNull(selector.select(key1.getBytes()));
 
@@ -146,31 +154,115 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(3);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3");
         List<Consumer> consumers = new ArrayList<>();
+        long id=0;
         for (String s : consumerName) {
-            Consumer consumer = mock(Consumer.class);
-            when(consumer.consumerName()).thenReturn(s);
+            Consumer consumer = createMockConsumer(s, s, id++);
             selector.addConsumer(consumer);
             consumers.add(consumer);
         }
+
+        // check that results are the same when called multiple times
+        assertThat(selector.getConsumerKeyHashRanges())
+                .containsExactlyEntriesOf(selector.getConsumerKeyHashRanges());
+
         Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        assertThat(consumers.get(0).consumerName()).isEqualTo("consumer1");
         expectedResult.put(consumers.get(0), Arrays.asList(
-                Range.of(119056335, 242013991),
-                Range.of(722195657, 1656011842),
-                Range.of(1707482098, 1914695766)));
+                Range.of(95615213, 440020355),
+                Range.of(440020356, 455987436),
+                Range.of(1189794593, 1264144431)));
+        assertThat(consumers.get(1).consumerName()).isEqualTo("consumer2");
         expectedResult.put(consumers.get(1), Arrays.asList(
-                Range.of(0, 90164503),
-                Range.of(90164504, 119056334),
-                Range.of(382436668, 722195656)));
+                Range.of(939655188, 1189794592),
+                Range.of(1314727625, 1977451233),
+                Range.of(1977451234, 2016237253)));
+        assertThat(consumers.get(2).consumerName()).isEqualTo("consumer3");
         expectedResult.put(consumers.get(2), Arrays.asList(
-                Range.of(242013992, 242377547),
-                Range.of(242377548, 382436667),
-                Range.of(1656011843, 1707482097)));
-        for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
-            System.out.println(entry.getValue());
-            Assert.assertEquals(entry.getValue(), 
expectedResult.get(entry.getKey()));
-            expectedResult.remove(entry.getKey());
+                Range.of(0, 95615212),
+                Range.of(455987437, 939655187),
+                Range.of(1264144432, 1314727624),
+                Range.of(2016237254, 2147483646)));
+        Map<Consumer, List<Range>> consumerKeyHashRanges = 
selector.getConsumerKeyHashRanges();
+        
assertThat(consumerKeyHashRanges).containsExactlyInAnyOrderEntriesOf(expectedResult);
+
+        // check that ranges are continuous and cover the whole range
+        List<Range> allRanges =
+                
consumerKeyHashRanges.values().stream().flatMap(List::stream).sorted().collect(Collectors.toList());
+        Range previousRange = null;
+        for (Range range : allRanges) {
+            if (previousRange != null) {
+                assertThat(range.getStart()).isEqualTo(previousRange.getEnd() 
+ 1);
+            }
+            previousRange = range;
+        }
+        assertThat(allRanges.stream().mapToInt(r -> r.getEnd() - r.getStart() 
+ 1).sum()).isEqualTo(Integer.MAX_VALUE);
+    }
+
+    @Test
+    public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
+            throws BrokerServiceException.ConsumerAssignException {
+        ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(200);
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            // use the same name for all consumers, use toString to 
distinguish them
+            Consumer consumer = createMockConsumer("consumer", 
String.format("index %02d", i), i);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
-        Assert.assertEquals(expectedResult.size(), 0);
+        printConsumerRangesStats(selector);
+
+        int totalSelections = 10000;
+
+        Map<Consumer, MutableInt> consumerSelectionCount = new HashMap<>();
+        for (int i = 0; i < totalSelections; i++) {
+            Consumer selectedConsumer = selector.select(("key " + 
i).getBytes(StandardCharsets.UTF_8));
+            consumerSelectionCount.computeIfAbsent(selectedConsumer, c -> new 
MutableInt()).increment();
+        }
+
+        printSelectionCountStats(consumerSelectionCount);
+
+        int averageCount = totalSelections / consumers.size();
+        int allowedVariance = (int) (0.2d * averageCount);
+        System.out.println("averageCount: " + averageCount + " 
allowedVariance: " + allowedVariance);
+
+        for (Map.Entry<Consumer, MutableInt> entry : 
consumerSelectionCount.entrySet()) {
+            assertThat(entry.getValue().intValue()).describedAs("consumer: 
%s", entry.getKey())
+                    .isCloseTo(averageCount, Offset.offset(allowedVariance));
+        }
+
+        consumers.forEach(selector::removeConsumer);
+        assertThat(selector.getConsumerKeyHashRanges()).isEmpty();
+    }
+
+    private static void printSelectionCountStats(Map<Consumer, MutableInt> 
consumerSelectionCount) {
+        int totalSelections = 
consumerSelectionCount.values().stream().mapToInt(MutableInt::intValue).sum();
+        consumerSelectionCount.entrySet().stream()
+                
.sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString)))
+                .forEach(entry -> System.out.println(
+                        String.format("consumer: %s got selected %d times. 
ratio: %.2f%%", entry.getKey(),
+                                entry.getValue().intValue(),
+                                ((double) entry.getValue().intValue() / 
totalSelections) * 100.0d)));
+    }
+
+    private static void 
printConsumerRangesStats(ConsistentHashingStickyKeyConsumerSelector selector) {
+        selector.getConsumerKeyHashRanges().entrySet().stream()
+                .map(entry -> Map.entry(entry.getKey(),
+                        entry.getValue().stream().mapToInt(r -> r.getEnd() - 
r.getStart() + 1).sum()))
+                
.sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString)))
+                .forEach(entry -> System.out.println(
+                        String.format("consumer: %s total ranges size: %d 
ratio: %.2f%%", entry.getKey(),
+                                entry.getValue(),
+                                ((double) entry.getValue() / 
(Integer.MAX_VALUE - 1)) * 100.0d)));
+    }
+
+    private static Consumer createMockConsumer(String consumerName, String 
toString, long id) {
+        // without stubOnly, the mock will record method invocations and run 
into OOME
+        Consumer consumer =  mock(Consumer.class, 
Mockito.withSettings().stubOnly());
+        when(consumer.consumerName()).thenReturn(consumerName);
+        when(consumer.getPriorityLevel()).thenReturn(0);
+        when(consumer.toString()).thenReturn(toString);
+        when(consumer.consumerId()).thenReturn(id);
+        return consumer;
     }
 
     // reproduces https://github.com/apache/pulsar/issues/22050
@@ -215,5 +307,243 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
         consumers.forEach(selector::removeConsumer);
         // then there should be no mapping remaining
         Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
+        // when consumers are removed again, should not fail
+        consumers.forEach(selector::removeConsumer);
+    }
+
+    @Test
+    public void testShouldNotChangeSelectedConsumerWhenConsumerIsRemoved() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 100;
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        int hashRangeSize = Integer.MAX_VALUE;
+        int validationPointCount = 200;
+        int increment = hashRangeSize / (validationPointCount + 1);
+        List<Consumer> selectedConsumerBeforeRemoval = new ArrayList<>();
+
+        for (int i = 0; i < validationPointCount; i++) {
+            selectedConsumerBeforeRemoval.add(selector.select(i * increment));
+        }
+
+        for (int i = 0; i < validationPointCount; i++) {
+            Consumer selected = selector.select(i * increment);
+            Consumer expected = selectedConsumerBeforeRemoval.get(i);
+            assertThat(selected.consumerId()).as("validationPoint %d", 
i).isEqualTo(expected.consumerId());
+        }
+
+        Set<Consumer> removedConsumers = new HashSet<>();
+        for (Consumer removedConsumer : consumers) {
+            selector.removeConsumer(removedConsumer);
+            removedConsumers.add(removedConsumer);
+            for (int i = 0; i < validationPointCount; i++) {
+                int hash = i * increment;
+                Consumer selected = selector.select(hash);
+                Consumer expected = selectedConsumerBeforeRemoval.get(i);
+                if (!removedConsumers.contains(expected)) {
+                    assertThat(selected.consumerId()).as("validationPoint %d, 
removed %s, hash %d ranges %s", i,
+                            removedConsumer.toString(), hash, 
selector.getConsumerKeyHashRanges()).isEqualTo(expected.consumerId());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void 
testShouldNotChangeSelectedConsumerWhenConsumerIsRemovedCheckHashRanges() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 25;
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        Map<Consumer, List<Range>> expected = 
selector.getConsumerKeyHashRanges();
+        assertThat(selector.getConsumerKeyHashRanges()).as("sanity 
check").containsExactlyInAnyOrderEntriesOf(expected);
+        System.out.println(expected);
+
+        for (Consumer removedConsumer : consumers) {
+            selector.removeConsumer(removedConsumer);
+            for (Map.Entry<Consumer, List<Range>> entry : expected.entrySet()) 
{
+                if (entry.getKey() == removedConsumer) {
+                    continue;
+                }
+                for (Range range : entry.getValue()) {
+                    Consumer rangeStartConsumer = 
selector.select(range.getStart());
+                    assertThat(rangeStartConsumer).as("removed %s, range %s", 
removedConsumer, range)
+                            .isEqualTo(entry.getKey());
+                    Consumer rangeEndConsumer = 
selector.select(range.getEnd());
+                    assertThat(rangeEndConsumer).as("removed %s, range %s", 
removedConsumer, range)
+                            .isEqualTo(entry.getKey());
+                    assertThat(rangeStartConsumer).isSameAs(rangeEndConsumer);
+                }
+            }
+            expected = selector.getConsumerKeyHashRanges();
+        }
+    }
+
+    @Test
+    public void 
testShouldNotChangeSelectedConsumerUnnecessarilyWhenConsumerIsAddedCheckHashRanges()
 {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 25;
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        Map<Consumer, List<Range>> expected = 
selector.getConsumerKeyHashRanges();
+        assertThat(selector.getConsumerKeyHashRanges()).as("sanity 
check").containsExactlyInAnyOrderEntriesOf(expected);
+
+        for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; 
i++) {
+            final Consumer addedConsumer = createMockConsumer(consumerName, 
"index " + i, i);
+            selector.addConsumer(addedConsumer);
+            for (Map.Entry<Consumer, List<Range>> entry : expected.entrySet()) 
{
+                if (entry.getKey() == addedConsumer) {
+                    continue;
+                }
+                for (Range range : entry.getValue()) {
+                    Consumer rangeStartConsumer = 
selector.select(range.getStart());
+                    if (rangeStartConsumer != addedConsumer) {
+                        assertThat(rangeStartConsumer).as("added %s, range 
start %s", addedConsumer, range)
+                                .isEqualTo(entry.getKey());
+                    }
+                    Consumer rangeEndConsumer = 
selector.select(range.getStart());
+                    if (rangeEndConsumer != addedConsumer) {
+                        assertThat(rangeEndConsumer).as("added %s, range end 
%s", addedConsumer, range)
+                                .isEqualTo(entry.getKey());
+                    }
+                }
+            }
+            expected = selector.getConsumerKeyHashRanges();
+        }
+    }
+
+    @Test
+    public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 50;
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        int hashRangeSize = Integer.MAX_VALUE;
+        int validationPointCount = 200;
+        int increment = hashRangeSize / (validationPointCount + 1);
+        List<Consumer> selectedConsumerBeforeRemoval = new ArrayList<>();
+
+        for (int i = 0; i < validationPointCount; i++) {
+            selectedConsumerBeforeRemoval.add(selector.select(i * increment));
+        }
+
+        for (int i = 0; i < validationPointCount; i++) {
+            Consumer selected = selector.select(i * increment);
+            Consumer expected = selectedConsumerBeforeRemoval.get(i);
+            assertThat(selected.consumerId()).as("validationPoint %d", 
i).isEqualTo(expected.consumerId());
+        }
+
+        Set<Consumer> addedConsumers = new HashSet<>();
+        for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; 
i++) {
+            final Consumer addedConsumer = createMockConsumer(consumerName, 
"index " + i, i);
+            selector.addConsumer(addedConsumer);
+            addedConsumers.add(addedConsumer);
+            for (int j = 0; j < validationPointCount; j++) {
+                int hash = j * increment;
+                Consumer selected = selector.select(hash);
+                Consumer expected = selectedConsumerBeforeRemoval.get(j);
+                if (!addedConsumers.contains(addedConsumer)) {
+                    assertThat(selected.consumerId()).as("validationPoint %d, 
hash %d", j, hash).isEqualTo(expected.consumerId());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testShouldNotChangeMappingWhenConsumerLeavesAndRejoins() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 25;
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        Map<Consumer, List<Range>> expected = 
selector.getConsumerKeyHashRanges();
+        assertThat(selector.getConsumerKeyHashRanges()).as("sanity 
check").containsExactlyInAnyOrderEntriesOf(expected);
+
+        selector.removeConsumer(consumers.get(0));
+        selector.removeConsumer(consumers.get(numOfInitialConsumers / 2));
+        selector.addConsumer(consumers.get(0));
+        selector.addConsumer(consumers.get(numOfInitialConsumers / 2));
+
+        assertThat(selector.getConsumerKeyHashRanges()).as("ranges shouldn't 
change").containsExactlyInAnyOrderEntriesOf(expected);
+    }
+
+    @Test
+    public void testConsumersReconnect() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "consumer";
+        final int numOfInitialConsumers = 50;
+        final int validationPointCount = 200;
+        final List<Integer> pointsToTest = pointsToTest(validationPointCount);
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
+            consumers.add(consumer);
+            selector.addConsumer(consumer);
+        }
+
+        // Mark original results.
+        List<Consumer> selectedConsumersBeforeRemove = new ArrayList<>();
+        for (int i = 0; i < validationPointCount; i++) {
+            int point = pointsToTest.get(i);
+            selectedConsumersBeforeRemove.add(selector.select(point));
+        }
+
+        // All consumers leave (in any order)
+        List<Consumer> randomOrderConsumers = new ArrayList<>(consumers);
+        Collections.shuffle(randomOrderConsumers);
+        for (Consumer c : randomOrderConsumers) {
+            selector.removeConsumer(c);
+        }
+
+        // All consumers reconnect in the same order as originally
+        for (Consumer c : consumers) {
+            selector.addConsumer(c);
+        }
+
+        // Check that the same consumers are selected as before
+        for (int j = 0; j < validationPointCount; j++) {
+            int point = pointsToTest.get(j);
+            Consumer selected = selector.select(point);
+            Consumer expected = selectedConsumersBeforeRemove.get(j);
+            assertThat(selected.consumerId()).as("validationPoint %d, hash 
%d", j, point).isEqualTo(expected.consumerId());
+        }
+    }
+
+    private List<Integer> pointsToTest(int validationPointCount) {
+        List<Integer> res = new ArrayList<>();
+        int hashRangeSize = Integer.MAX_VALUE;
+        final int increment = hashRangeSize / (validationPointCount + 1);
+        for (int i = 0; i < validationPointCount; i++) {
+            res.add(i * increment);
+        }
+        return res;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java
new file mode 100644
index 00000000000..75c8e6db5d2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerIdentityWrapperTest {
+    private static Consumer mockConsumer() {
+        return mockConsumer("consumer");
+    }
+
+    private static Consumer mockConsumer(String consumerName) {
+        Consumer consumer = mock(Consumer.class);
+        when(consumer.consumerName()).thenReturn(consumerName);
+        return consumer;
+    }
+
+    @Test
+    public void testEquals() {
+        Consumer consumer = mockConsumer();
+        assertEquals(new ConsumerIdentityWrapper(consumer), new 
ConsumerIdentityWrapper(consumer));
+    }
+
+    @Test
+    public void testHashCode() {
+        Consumer consumer = mockConsumer();
+        assertEquals(new ConsumerIdentityWrapper(consumer).hashCode(),
+                new ConsumerIdentityWrapper(consumer).hashCode());
+    }
+
+    @Test
+    public void testEqualsAndHashCode() {
+        Consumer consumer1 = mockConsumer();
+        Consumer consumer2 = mockConsumer();
+        ConsumerIdentityWrapper wrapper1 = new 
ConsumerIdentityWrapper(consumer1);
+        ConsumerIdentityWrapper wrapper2 = new 
ConsumerIdentityWrapper(consumer1);
+        ConsumerIdentityWrapper wrapper3 = new 
ConsumerIdentityWrapper(consumer2);
+
+        // Test equality
+        assertEquals(wrapper1, wrapper2);
+        assertNotEquals(wrapper1, wrapper3);
+
+        // Test hash code
+        assertEquals(wrapper1.hashCode(), wrapper2.hashCode());
+        assertNotEquals(wrapper1.hashCode(), wrapper3.hashCode());
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java
new file mode 100644
index 00000000000..0f18ecce2ff
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerNameIndexTrackerTest {
+    private ConsumerNameIndexTracker tracker;
+
+    @BeforeMethod
+    public void setUp() {
+        tracker = new ConsumerNameIndexTracker();
+    }
+
+    private static Consumer mockConsumer() {
+        return mockConsumer("consumer");
+    }
+
+
+    private static Consumer mockConsumer(String consumerName) {
+        Consumer consumer = mock(Consumer.class);
+        when(consumer.consumerName()).thenReturn(consumerName);
+        return consumer;
+    }
+
+    @Test
+    public void testIncreaseConsumerRefCountAndReturnIndex() {
+        Consumer consumer1 = mockConsumer();
+        Consumer consumer2 = mockConsumer();
+        ConsumerIdentityWrapper wrapper1 = new 
ConsumerIdentityWrapper(consumer1);
+        ConsumerIdentityWrapper wrapper2 = new 
ConsumerIdentityWrapper(consumer2);
+        int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+        int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2);
+        assertNotEquals(index1, index2);
+        assertEquals(index1, tracker.getTrackedIndex(wrapper1));
+        assertEquals(index2, tracker.getTrackedIndex(wrapper2));
+    }
+
+    @Test
+    public void testTrackingReturnsStableIndexWhenRemovedAndAddedInSameOrder() 
{
+        List<ConsumerIdentityWrapper> consumerIdentityWrappers =
+                IntStream.range(0, 100).mapToObj(i -> 
mockConsumer()).map(ConsumerIdentityWrapper::new).toList();
+        Map<ConsumerIdentityWrapper, Integer> trackedIndexes =
+                consumerIdentityWrappers.stream().collect(Collectors.toMap(
+                        wrapper -> wrapper, wrapper -> 
tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+        // stop tracking every other consumer
+        for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+            if (i % 2 == 0) {
+                
tracker.decreaseConsumerRefCount(consumerIdentityWrappers.get(i));
+            }
+        }
+        // check that others are tracked
+        for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+            ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+            int trackedIndex = tracker.getTrackedIndex(wrapper);
+            assertEquals(trackedIndex, i % 2 == 0 ? -1 : 
trackedIndexes.get(wrapper));
+        }
+        // check that new consumers are tracked with the same index
+        for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+            ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+            if (i % 2 == 0) {
+                int trackedIndex = 
tracker.increaseConsumerRefCountAndReturnIndex(wrapper);
+                assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+            }
+        }
+        // check that all consumers are tracked with the original indexes
+        for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+            ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+            int trackedIndex = tracker.getTrackedIndex(wrapper);
+            assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+        }
+    }
+
+    @Test
+    public void testTrackingMultipleTimes() {
+        List<ConsumerIdentityWrapper> consumerIdentityWrappers =
+                IntStream.range(0, 100).mapToObj(i -> 
mockConsumer()).map(ConsumerIdentityWrapper::new).toList();
+        Map<ConsumerIdentityWrapper, Integer> trackedIndexes =
+                consumerIdentityWrappers.stream().collect(Collectors.toMap(
+                        wrapper -> wrapper, wrapper -> 
tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+        Map<ConsumerIdentityWrapper, Integer> trackedIndexes2 =
+                consumerIdentityWrappers.stream().collect(Collectors.toMap(
+                        wrapper -> wrapper, wrapper -> 
tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+        assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(1);
+        
assertThat(trackedIndexes).containsExactlyInAnyOrderEntriesOf(trackedIndexes2);
+        consumerIdentityWrappers.forEach(wrapper -> 
tracker.decreaseConsumerRefCount(wrapper));
+        for (ConsumerIdentityWrapper wrapper : consumerIdentityWrappers) {
+            int trackedIndex = tracker.getTrackedIndex(wrapper);
+            assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+        }
+        consumerIdentityWrappers.forEach(wrapper -> 
tracker.decreaseConsumerRefCount(wrapper));
+        assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0);
+        assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testDecreaseConsumerRefCount() {
+        Consumer consumer1 = mockConsumer();
+        ConsumerIdentityWrapper wrapper1 = new 
ConsumerIdentityWrapper(consumer1);
+        int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+        assertNotEquals(index1, -1);
+        tracker.decreaseConsumerRefCount(wrapper1);
+        assertEquals(tracker.getTrackedIndex(wrapper1), -1);
+    }
+
+    @Test
+    public void testGetTrackedIndex() {
+        Consumer consumer1 = mockConsumer();
+        Consumer consumer2 = mockConsumer();
+        ConsumerIdentityWrapper wrapper1 = new 
ConsumerIdentityWrapper(consumer1);
+        ConsumerIdentityWrapper wrapper2 = new 
ConsumerIdentityWrapper(consumer2);
+        int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+        int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2);
+        assertEquals(index1, tracker.getTrackedIndex(wrapper1));
+        assertEquals(index2, tracker.getTrackedIndex(wrapper2));
+    }
+
+    @Test
+    public void testTrackingMultipleNames() {
+        List<ConsumerIdentityWrapper> consumerIdentityWrappers =
+                IntStream.range(0, 100).mapToObj(i -> mockConsumer("consumer" 
+ i)).map(ConsumerIdentityWrapper::new)
+                        .toList();
+        consumerIdentityWrappers.forEach(wrapper -> 
tracker.increaseConsumerRefCountAndReturnIndex(wrapper));
+        assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(100);
+        assertThat(tracker.getTrackedConsumersCount()).isEqualTo(100);
+        consumerIdentityWrappers.forEach(wrapper -> 
tracker.decreaseConsumerRefCount(wrapper));
+        assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0);
+        assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 03eb01e958a..3fe3fef3299 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
@@ -262,7 +263,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 
1, "key1")));
         final List<Entry> readEntries = new ArrayList<>();
         readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
-        readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key22")));
+        readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key2")));
 
         try {
             Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
@@ -358,7 +359,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
         // Messages with key1 are routed to consumer1 and messages with key2 
are routed to consumer2
         final List<Entry> allEntries = new ArrayList<>();
-        allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, 
"key22")));
+        allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, 
"key2")));
         allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
         allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key1")));
         allEntries.forEach(entry -> ((EntryImpl) entry).retain());
@@ -459,8 +460,8 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
             persistentDispatcher.readMoreEntries();
         }
 
-        assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1);
-        assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2);
+        
assertThat(actualEntriesToConsumer1).containsExactlyElementsOf(expectedEntriesToConsumer1);
+        
assertThat(actualEntriesToConsumer2).containsExactlyElementsOf(expectedEntriesToConsumer2);
 
         allEntries.forEach(entry -> entry.release());
     }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
index 4437ffc4ac6..488083f484b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
@@ -27,7 +27,7 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class Range {
+public class Range implements Comparable<Range> {
 
     private final int start;
     private final int end;
@@ -84,4 +84,13 @@ public class Range {
     public String toString() {
         return "[" + start + ", " + end + "]";
     }
+
+    @Override
+    public int compareTo(Range o) {
+        int result = Integer.compare(start, o.start);
+        if (result == 0) {
+            result = Integer.compare(end, o.end);
+        }
+        return result;
+    }
 }

Reply via email to