This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 4ffdc2d60c1 [fix][broker] Fix hash collision when using a consumer
name that ends with a number (#22053)
4ffdc2d60c1 is described below
commit 4ffdc2d60c1c72d38b4f286ed54e68e580a90e0c
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 15 11:07:10 2024 +0200
[fix][broker] Fix hash collision when using a consumer name that ends with
a number (#22053)
---
...ConsistentHashingStickyKeyConsumerSelector.java | 14 ++--
...istentHashingStickyKeyConsumerSelectorTest.java | 74 +++++++++++++++++-----
...ntStickyKeyDispatcherMultipleConsumersTest.java | 4 +-
3 files changed, 70 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index ea491bd40d3..b2b2b512c8c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -39,7 +39,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
* number of keys assigned to each consumer.
*/
public class ConsistentHashingStickyKeyConsumerSelector implements
StickyKeyConsumerSelector {
-
+ // use NUL character as field separator for hash key calculation
+ private static final String KEY_SEPARATOR = "\0";
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// Consistent-Hash ring
@@ -59,8 +60,7 @@ public class ConsistentHashingStickyKeyConsumerSelector
implements StickyKeyCons
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the
consumer name
for (int i = 0; i < numberOfPoints; i++) {
- String key = consumer.consumerName() + i;
- int hash =
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+ int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return Lists.newArrayList(consumer);
@@ -79,14 +79,18 @@ public class ConsistentHashingStickyKeyConsumerSelector
implements StickyKeyCons
}
}
+ private static int calculateHashForConsumerAndIndex(Consumer consumer, int
index) {
+ String key = consumer.consumerName() + KEY_SEPARATOR + index;
+ return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+ }
+
@Override
public void removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
// Remove all the points that were added for this consumer
for (int i = 0; i < numberOfPoints; i++) {
- String key = consumer.consumerName() + i;
- int hash =
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+ int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index dbca31416bb..48311c57338 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -21,18 +21,18 @@ package org.apache.pulsar.broker.service;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-
-import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.client.api.Range;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
+import org.testng.Assert;
+import org.testng.annotations.Test;
@Test(groups = "broker")
public class ConsistentHashingStickyKeyConsumerSelectorTest {
@@ -154,17 +154,17 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
}
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Arrays.asList(
- Range.of(0, 330121749),
- Range.of(330121750, 618146114),
- Range.of(1797637922, 1976098885)));
+ Range.of(119056335, 242013991),
+ Range.of(722195657, 1656011842),
+ Range.of(1707482098, 1914695766)));
expectedResult.put(consumers.get(1), Arrays.asList(
- Range.of(938427576, 1094135919),
- Range.of(1138613629, 1342907082),
- Range.of(1342907083, 1797637921)));
+ Range.of(0, 90164503),
+ Range.of(90164504, 119056334),
+ Range.of(382436668, 722195656)));
expectedResult.put(consumers.get(2), Arrays.asList(
- Range.of(618146115, 772640562),
- Range.of(772640563, 938427575),
- Range.of(1094135920, 1138613628)));
+ Range.of(242013992, 242377547),
+ Range.of(242377548, 382436667),
+ Range.of(1656011843, 1707482097)));
for (Map.Entry<Consumer, List<Range>> entry :
selector.getConsumerKeyHashRanges().entrySet()) {
System.out.println(entry.getValue());
Assert.assertEquals(entry.getValue(),
expectedResult.get(entry.getKey()));
@@ -172,4 +172,48 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
}
Assert.assertEquals(expectedResult.size(), 0);
}
+
+ // reproduces https://github.com/apache/pulsar/issues/22050
+ @Test
+ public void shouldNotCollideWithConsumerNameEndsWithNumber() {
+ ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(12);
+ List<String> consumerName = Arrays.asList("consumer1", "consumer11");
+ List<Consumer> consumers = new ArrayList<>();
+ for (String s : consumerName) {
+ Consumer consumer = mock(Consumer.class);
+ when(consumer.consumerName()).thenReturn(s);
+ selector.addConsumer(consumer);
+ consumers.add(consumer);
+ }
+ Map<Range, Consumer> rangeToConsumer = new HashMap<>();
+ for (Map.Entry<Consumer, List<Range>> entry :
selector.getConsumerKeyHashRanges().entrySet()) {
+ for (Range range : entry.getValue()) {
+ Consumer previous = rangeToConsumer.put(range, entry.getKey());
+ if (previous != null) {
+ Assert.fail("Ranges are colliding between " +
previous.consumerName() + " and " + entry.getKey()
+ .consumerName());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void shouldRemoveConsumersFromConsumerKeyHashRanges() {
+ ConsistentHashingStickyKeyConsumerSelector selector = new
ConsistentHashingStickyKeyConsumerSelector(12);
+ List<Consumer> consumers = IntStream.range(1, 100).mapToObj(i ->
"consumer" + i)
+ .map(consumerName -> {
+ Consumer consumer = mock(Consumer.class);
+ when(consumer.consumerName()).thenReturn(consumerName);
+ return consumer;
+ }).collect(Collectors.toList());
+
+ // when consumers are added
+ consumers.forEach(selector::addConsumer);
+ // then each consumer should have a range
+ Assert.assertEquals(selector.getConsumerKeyHashRanges().size(),
consumers.size());
+ // when consumers are removed
+ consumers.forEach(selector::removeConsumer);
+ // then there should be no mapping remaining
+ Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 9082a9caafc..361945893ae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -255,7 +255,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1",
1, "key1")));
final List<Entry> readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2,
"key1")));
- readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3,
"key2")));
+ readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3,
"key22")));
try {
Field totalAvailablePermitsField =
PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
@@ -351,7 +351,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
// Messages with key1 are routed to consumer1 and messages with key2
are routed to consumer2
final List<Entry> allEntries = new ArrayList<>();
- allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1,
"key2")));
+ allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1,
"key22")));
allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2,
"key1")));
allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3,
"key1")));
allEntries.forEach(entry -> ((EntryImpl) entry).retain());