This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f39cc6fedaaf60f8bc3760dfcc44533b38d879f
Author: lipenghui <[email protected]>
AuthorDate: Tue Sep 28 19:46:13 2021 +0800

    Fix returned wrong hash ranges for the consumer with same consumer name 
(#12212)
    
    Currently, we are using the consumer name to generate the hash ranges to 
the admin client.
    If there are consumers with the same name, we will get same hash ranges for 
different consumers,
    this will confuse when troubleshooting issue. The following is an example:
    
    ```
    "consumers" : [ {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 46320,
            "msgOutCounter" : 1020,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "5253f",
            "availablePermits" : -20,
            "unackedMessages" : 1000,
            "avgMessagesPerEntry" : 56,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:11494",
            "lastAckedTimestamp" : 1632731049993,
            "lastConsumedTimestamp" : 1632731030268,
            "keyHashRanges" : [ "[0, 16384]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54702",
            "connectedSince" : "2021-09-27T16:23:49.891+08:00",
            "clientVersion" : "2.8.1"
          }, {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 0,
            "msgOutCounter" : 0,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "my-name",
            "availablePermits" : 10,
            "unackedMessages" : 0,
            "avgMessagesPerEntry" : 1000,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:19505",
            "lastAckedTimestamp" : 0,
            "lastConsumedTimestamp" : 0,
            "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54708",
            "connectedSince" : "2021-09-27T16:23:59.031+08:00",
            "clientVersion" : "2.8.1"
          }, {
            "msgRateOut" : 0.0,
            "msgThroughputOut" : 0.0,
            "bytesOutCounter" : 0,
            "msgOutCounter" : 0,
            "msgRateRedeliver" : 0.0,
            "chunkedMessageRate" : 0.0,
            "consumerName" : "my-name",
            "availablePermits" : 10,
            "unackedMessages" : 0,
            "avgMessagesPerEntry" : 1000,
            "blockedConsumerOnUnackedMsgs" : false,
            "readPositionWhenJoining" : "10:19514",
            "lastAckedTimestamp" : 0,
            "lastConsumedTimestamp" : 0,
            "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ],
            "metadata" : { },
            "address" : "/127.0.0.1:54717",
            "connectedSince" : "2021-09-27T16:24:03.927+08:00",
            "clientVersion" : "2.8.1"
          } ],
    ```
    
    The fix is to use the equals method of the consumer to generate the key 
hash ranges.
    New tests added.
    
    (cherry picked from commit 9abd6d30f39f74e255cd5dac40b77af3dce5c468)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java |  9 ++--
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |  9 ++--
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  9 ++--
 .../broker/service/StickyKeyConsumerSelector.java  |  3 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  3 +-
 .../service/persistent/PersistentSubscription.java | 10 +++--
 ...istentHashingStickyKeyConsumerSelectorTest.java | 29 +++++++++----
 ...angeAutoSplitStickyKeyConsumerSelectorTest.java | 40 ++++++++++++++----
 ...angeExclusiveStickyKeyConsumerSelectorTest.java | 48 ++++++++++++++++++----
 .../java/org/apache/pulsar/client/api/Range.java   | 19 +++++++++
 10 files changed, 139 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index f0f6431..7b7a830 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 /**
@@ -126,15 +127,15 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new LinkedHashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new LinkedHashMap<>();
         rwLock.readLock().lock();
         try {
             int start = 0;
             for (Map.Entry<Integer, List<Consumer>> entry: 
hashRing.entrySet()) {
                 for (Consumer consumer: entry.getValue()) {
-                    result.computeIfAbsent(consumer.consumerName(), key -> new 
ArrayList<>())
-                            .add("[" + start + ", " + entry.getKey() + "]");
+                    result.computeIfAbsent(consumer, key -> new ArrayList<>())
+                            .add(Range.of(start, entry.getKey()));
                 }
                 start = entry.getKey() + 1;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 4f721c9..624108f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 
 /**
  * This is a consumer selector based fixed hash range.
@@ -112,12 +113,12 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector 
implements StickyKeyCon
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new HashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new HashMap<>();
         int start = 0;
         for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
-            result.computeIfAbsent(entry.getValue().consumerName(), key -> new 
ArrayList<>())
-                    .add("[" + start + ", " + entry.getKey() + "]");
+            result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>())
+                    .add(Range.of(start, entry.getKey()));
             start = entry.getKey() + 1;
         }
         return result;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index f4909ae..7c49632 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 
@@ -65,16 +66,16 @@ public class HashRangeExclusiveStickyKeyConsumerSelector 
implements StickyKeyCon
     }
 
     @Override
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
-        Map<String, List<String>> result = new HashMap<>();
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
+        Map<Consumer, List<Range>> result = new HashMap<>();
         Map.Entry<Integer, Consumer> prev = null;
         for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
             if (prev == null) {
                 prev = entry;
             } else {
                 if (prev.getValue().equals(entry.getValue())) {
-                    result.computeIfAbsent(entry.getValue().consumerName(), 
key -> new ArrayList<>())
-                            .add("[" + prev.getKey() + ", " + entry.getKey() + 
"]");
+                    result.computeIfAbsent(entry.getValue(), key -> new 
ArrayList<>())
+                            .add(Range.of(prev.getKey(), entry.getKey()));
                 }
                 prev = null;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 83952fc..b129ef5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import java.util.List;
 import java.util.Map;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 public interface StickyKeyConsumerSelector {
@@ -66,5 +67,5 @@ public interface StickyKeyConsumerSelector {
      * Get key hash ranges handled by each consumer.
      * @return A map where key is a consumer name and value is list of hash 
range it receiving message for.
      */
-    Map<String, List<String>> getConsumerKeyHashRanges();
+    Map<Consumer, List<Range>> getConsumerKeyHashRanges();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index e62f63e..9a036f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -46,6 +46,7 @@ import 
org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelec
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -418,7 +419,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
         return recentlyJoinedConsumers;
     }
 
-    public Map<String, List<String>> getConsumerKeyHashRanges() {
+    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
         return selector.getConsumerKeyHashRanges();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 1eb99e9..b127b1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -1019,7 +1021,7 @@ public class PersistentSubscription implements 
Subscription {
         subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue();
         Dispatcher dispatcher = this.dispatcher;
         if (dispatcher != null) {
-            Map<String, List<String>> consumerKeyHashRanges = getType() == 
SubType.Key_Shared
+            Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == 
SubType.Key_Shared
                     ? ((PersistentStickyKeyDispatcherMultipleConsumers) 
dispatcher).getConsumerKeyHashRanges() : null;
             dispatcher.getConsumers().forEach(consumer -> {
                 ConsumerStatsImpl consumerStats = consumer.getStats();
@@ -1034,8 +1036,10 @@ public class PersistentSubscription implements 
Subscription {
                 subStats.lastConsumedTimestamp =
                         Math.max(subStats.lastConsumedTimestamp, 
consumerStats.lastConsumedTimestamp);
                 subStats.lastAckedTimestamp = 
Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
-                if (consumerKeyHashRanges != null && 
consumerKeyHashRanges.containsKey(consumer.consumerName())) {
-                    consumerStats.keyHashRanges = 
consumerKeyHashRanges.get(consumer.consumerName());
+                if (consumerKeyHashRanges != null && 
consumerKeyHashRanges.containsKey(consumer)) {
+                    consumerStats.keyHashRanges = 
consumerKeyHashRanges.get(consumer).stream()
+                            .map(Range::toString)
+                            .collect(Collectors.toList());
                 }
             });
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 29b40c2..e058492 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -24,9 +24,11 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableSet;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -146,22 +148,31 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
     public void testGetConsumerKeyHashRanges() throws 
BrokerServiceException.ConsumerAssignException {
         ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(3);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3");
+        List<Consumer> consumers = new ArrayList<>();
         for (String s : consumerName) {
             Consumer consumer = mock(Consumer.class);
             when(consumer.consumerName()).thenReturn(s);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
-
-        Map<String, Set<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", 
"[330121750, 618146114]", "[1797637922, 1976098885]"));
-        expectedResult.put("consumer2", ImmutableSet.of("[938427576, 
1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]"));
-        expectedResult.put("consumer3", ImmutableSet.of("[618146115, 
772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]"));
-        for (Map.Entry<String, List<String>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
-            Assert.assertEquals(new HashSet<>(entry.getValue()), 
expectedResult.get(entry.getKey()));
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), Arrays.asList(
+                Range.of(0, 330121749),
+                Range.of(330121750, 618146114),
+                Range.of(1797637922, 1976098885)));
+        expectedResult.put(consumers.get(1), Arrays.asList(
+                Range.of(938427576, 1094135919),
+                Range.of(1138613629, 1342907082),
+                Range.of(1342907083, 1797637921)));
+        expectedResult.put(consumers.get(2), Arrays.asList(
+                Range.of(618146115, 772640562),
+                Range.of(772640563, 938427575),
+                Range.of(1094135920, 1138613628)));
+        for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
+            System.out.println(entry.getValue());
+            Assert.assertEquals(entry.getValue(), 
expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
         Assert.assertEquals(expectedResult.size(), 0);
     }
-
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
index cb4b809..f17bb5a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
-import com.google.common.collect.ImmutableList;
+import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,22 +39,46 @@ public class 
HashRangeAutoSplitStickyKeyConsumerSelectorTest {
     public void testGetConsumerKeyHashRanges() throws 
BrokerServiceException.ConsumerAssignException {
         HashRangeAutoSplitStickyKeyConsumerSelector selector = new 
HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3", "consumer4");
+        List<Consumer> consumers = new ArrayList<>();
         for (String s : consumerName) {
             Consumer consumer = mock(Consumer.class);
             when(consumer.consumerName()).thenReturn(s);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
 
-        Map<String, List<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableList.of("[49, 64]"));
-        expectedResult.put("consumer4", ImmutableList.of("[33, 48]"));
-        expectedResult.put("consumer2", ImmutableList.of("[17, 32]"));
-        expectedResult.put("consumer3", ImmutableList.of("[0, 16]"));
-        for (Map.Entry<String, List<String>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), 
Collections.singletonList(Range.of(49, 64)));
+        expectedResult.put(consumers.get(3), 
Collections.singletonList(Range.of(33, 48)));
+        expectedResult.put(consumers.get(1), 
Collections.singletonList(Range.of(17, 32)));
+        expectedResult.put(consumers.get(2), 
Collections.singletonList(Range.of(0, 16)));
+        for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
             Assert.assertEquals(entry.getValue(), 
expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
         Assert.assertEquals(expectedResult.size(), 0);
     }
 
+    @Test
+    public void testGetConsumerKeyHashRangesWithSameConsumerName() throws 
Exception {
+        HashRangeAutoSplitStickyKeyConsumerSelector selector = new 
HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
+        final String consumerName = "My-consumer";
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            Consumer consumer = mock(Consumer.class);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
+        }
+
+        List<Range> prev = null;
+        for (Consumer consumer : consumers) {
+            List<Range> ranges = 
selector.getConsumerKeyHashRanges().get(consumer);
+            Assert.assertEquals(ranges.size(), 1);
+            if (prev != null) {
+                Assert.assertNotEquals(prev, ranges);
+            }
+            prev = ranges;
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
index c63e57e..dde0a58 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java
@@ -21,15 +21,16 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.common.api.proto.IntRange;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
@@ -117,6 +118,7 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
         List<String> consumerName = Arrays.asList("consumer1", "consumer2", 
"consumer3", "consumer4");
         List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, 
new int[] {9, 12}, new int[] {15, 20});
+        List<Consumer> consumers = new ArrayList<>();
         for (int index = 0; index < consumerName.size(); index++) {
             Consumer consumer = mock(Consumer.class);
             KeySharedMeta keySharedMeta = new KeySharedMeta()
@@ -128,14 +130,15 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
             when(consumer.consumerName()).thenReturn(consumerName.get(index));
             Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
             selector.addConsumer(consumer);
+            consumers.add(consumer);
         }
 
-        Map<String, List<String>> expectedResult = new HashMap<>();
-        expectedResult.put("consumer1", ImmutableList.of("[0, 2]"));
-        expectedResult.put("consumer2", ImmutableList.of("[3, 7]"));
-        expectedResult.put("consumer3", ImmutableList.of("[9, 12]"));
-        expectedResult.put("consumer4", ImmutableList.of("[15, 20]"));
-        for (Map.Entry<String, List<String>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
+        Map<Consumer, List<Range>> expectedResult = new HashMap<>();
+        expectedResult.put(consumers.get(0), 
Collections.singletonList(Range.of(0, 2)));
+        expectedResult.put(consumers.get(1), 
Collections.singletonList(Range.of(3, 7)));
+        expectedResult.put(consumers.get(2), 
Collections.singletonList(Range.of(9, 12)));
+        expectedResult.put(consumers.get(3), 
Collections.singletonList(Range.of(15, 20)));
+        for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
             Assert.assertEquals(entry.getValue(), 
expectedResult.get(entry.getKey()));
             expectedResult.remove(entry.getKey());
         }
@@ -143,6 +146,37 @@ public class 
HashRangeExclusiveStickyKeyConsumerSelectorTest {
     }
 
     @Test
+    public void testGetConsumerKeyHashRangesWithSameConsumerName() throws 
Exception {
+        HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
+        final String consumerName = "My-consumer";
+        List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, 
new int[] {9, 12});
+        List<Consumer> consumers = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            Consumer consumer = mock(Consumer.class);
+            KeySharedMeta keySharedMeta = new KeySharedMeta()
+                    .setKeySharedMode(KeySharedMode.STICKY);
+            keySharedMeta.addHashRange()
+                    .setStart(range.get(i)[0])
+                    .setEnd(range.get(i)[1]);
+            when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
+        }
+
+        List<Range> prev = null;
+        for (Consumer consumer : consumers) {
+            List<Range> ranges = 
selector.getConsumerKeyHashRanges().get(consumer);
+            Assert.assertEquals(ranges.size(), 1);
+            if (prev != null) {
+                Assert.assertNotEquals(prev, ranges);
+            }
+            prev = ranges;
+        }
+    }
+
+    @Test
     public void testSingleRangeConflict() throws 
BrokerServiceException.ConsumerAssignException {
         HashRangeExclusiveStickyKeyConsumerSelector selector = new 
HashRangeExclusiveStickyKeyConsumerSelector(10);
         Consumer consumer1 = mock(Consumer.class);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
index 57a408b..75b1cbc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.api;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
+import java.util.Objects;
+
 /**
  * Int range.
  */
@@ -63,6 +65,23 @@ public class Range {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Range range = (Range) o;
+        return start == range.start && end == range.end;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(start, end);
+    }
+
+    @Override
     public String toString() {
         return "[" + start + ", " + end + "]";
     }

Reply via email to