This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 4ffdc2d60c1 [fix][broker] Fix hash collision when using a consumer 
name that ends with a number (#22053)
4ffdc2d60c1 is described below

commit 4ffdc2d60c1c72d38b4f286ed54e68e580a90e0c
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 15 11:07:10 2024 +0200

    [fix][broker] Fix hash collision when using a consumer name that ends with 
a number (#22053)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java | 14 ++--
 ...istentHashingStickyKeyConsumerSelectorTest.java | 74 +++++++++++++++++-----
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |  4 +-
 3 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index ea491bd40d3..b2b2b512c8c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -39,7 +39,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
  * number of keys assigned to each consumer.
  */
 public class ConsistentHashingStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
-
+    // use NUL character as field separator for hash key calculation
+    private static final String KEY_SEPARATOR = "\0";
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     // Consistent-Hash ring
@@ -59,8 +60,7 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
             // Insert multiple points on the hash ring for every consumer
             // The points are deterministically added based on the hash of the 
consumer name
             for (int i = 0; i < numberOfPoints; i++) {
-                String key = consumer.consumerName() + i;
-                int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                int hash = calculateHashForConsumerAndIndex(consumer, i);
                 hashRing.compute(hash, (k, v) -> {
                     if (v == null) {
                         return Lists.newArrayList(consumer);
@@ -79,14 +79,18 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
         }
     }
 
+    private static int calculateHashForConsumerAndIndex(Consumer consumer, int 
index) {
+        String key = consumer.consumerName() + KEY_SEPARATOR + index;
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+    }
+
     @Override
     public void removeConsumer(Consumer consumer) {
         rwLock.writeLock().lock();
         try {
             // Remove all the points that were added for this consumer
             for (int i = 0; i < numberOfPoints; i++) {
-                String key = consumer.consumerName() + i;
-                int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                int hash = calculateHashForConsumerAndIndex(consumer, i);
                 hashRing.compute(hash, (k, v) -> {
                     if (v == null) {
                         return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index dbca31416bb..48311c57338 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -21,18 +21,18 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-
-import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.client.api.Range;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class ConsistentHashingStickyKeyConsumerSelectorTest {
@@ -154,17 +154,17 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
         }
         Map<Consumer, List<Range>> expectedResult = new HashMap<>();
         expectedResult.put(consumers.get(0), Arrays.asList(
-                Range.of(0, 330121749),
-                Range.of(330121750, 618146114),
-                Range.of(1797637922, 1976098885)));
+                Range.of(119056335, 242013991),
+                Range.of(722195657, 1656011842),
+                Range.of(1707482098, 1914695766)));
         expectedResult.put(consumers.get(1), Arrays.asList(
-                Range.of(938427576, 1094135919),
-                Range.of(1138613629, 1342907082),
-                Range.of(1342907083, 1797637921)));
+                Range.of(0, 90164503),
+                Range.of(90164504, 119056334),
+                Range.of(382436668, 722195656)));
         expectedResult.put(consumers.get(2), Arrays.asList(
-                Range.of(618146115, 772640562),
-                Range.of(772640563, 938427575),
-                Range.of(1094135920, 1138613628)));
+                Range.of(242013992, 242377547),
+                Range.of(242377548, 382436667),
+                Range.of(1656011843, 1707482097)));
         for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
             System.out.println(entry.getValue());
             Assert.assertEquals(entry.getValue(), 
expectedResult.get(entry.getKey()));
@@ -172,4 +172,48 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
         }
         Assert.assertEquals(expectedResult.size(), 0);
     }
+
+    // reproduces https://github.com/apache/pulsar/issues/22050
+    @Test
+    public void shouldNotCollideWithConsumerNameEndsWithNumber() {
+        ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(12);
+        List<String> consumerName = Arrays.asList("consumer1", "consumer11");
+        List<Consumer> consumers = new ArrayList<>();
+        for (String s : consumerName) {
+            Consumer consumer = mock(Consumer.class);
+            when(consumer.consumerName()).thenReturn(s);
+            selector.addConsumer(consumer);
+            consumers.add(consumer);
+        }
+        Map<Range, Consumer> rangeToConsumer = new HashMap<>();
+        for (Map.Entry<Consumer, List<Range>> entry : 
selector.getConsumerKeyHashRanges().entrySet()) {
+            for (Range range : entry.getValue()) {
+                Consumer previous = rangeToConsumer.put(range, entry.getKey());
+                if (previous != null) {
+                    Assert.fail("Ranges are colliding between " + 
previous.consumerName() + " and " + entry.getKey()
+                            .consumerName());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void shouldRemoveConsumersFromConsumerKeyHashRanges() {
+        ConsistentHashingStickyKeyConsumerSelector selector = new 
ConsistentHashingStickyKeyConsumerSelector(12);
+        List<Consumer> consumers = IntStream.range(1, 100).mapToObj(i -> 
"consumer" + i)
+                .map(consumerName -> {
+                    Consumer consumer = mock(Consumer.class);
+                    when(consumer.consumerName()).thenReturn(consumerName);
+                    return consumer;
+                }).collect(Collectors.toList());
+
+        // when consumers are added
+        consumers.forEach(selector::addConsumer);
+        // then each consumer should have a range
+        Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 
consumers.size());
+        // when consumers are removed
+        consumers.forEach(selector::removeConsumer);
+        // then there should be no mapping remaining
+        Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 9082a9caafc..361945893ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -255,7 +255,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
         redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 
1, "key1")));
         final List<Entry> readEntries = new ArrayList<>();
         readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
-        readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key2")));
+        readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key22")));
 
         try {
             Field totalAvailablePermitsField = 
PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
@@ -351,7 +351,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersTest {
 
         // Messages with key1 are routed to consumer1 and messages with key2 
are routed to consumer2
         final List<Entry> allEntries = new ArrayList<>();
-        allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, 
"key2")));
+        allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, 
"key22")));
         allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, 
"key1")));
         allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, 
"key1")));
         allEntries.forEach(entry -> ((EntryImpl) entry).retain());

Reply via email to