This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9f39cc6fedaaf60f8bc3760dfcc44533b38d879f Author: lipenghui <[email protected]> AuthorDate: Tue Sep 28 19:46:13 2021 +0800 Fix returned wrong hash ranges for the consumer with same consumer name (#12212) Currently, we are using the consumer name to generate the hash ranges to the admin client. If there are consumers with the same name, we will get same hash ranges for different consumers, this will confuse when troubleshooting issue. The following is an example: ``` "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 46320, "msgOutCounter" : 1020, "msgRateRedeliver" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "5253f", "availablePermits" : -20, "unackedMessages" : 1000, "avgMessagesPerEntry" : 56, "blockedConsumerOnUnackedMsgs" : false, "readPositionWhenJoining" : "10:11494", "lastAckedTimestamp" : 1632731049993, "lastConsumedTimestamp" : 1632731030268, "keyHashRanges" : [ "[0, 16384]" ], "metadata" : { }, "address" : "/127.0.0.1:54702", "connectedSince" : "2021-09-27T16:23:49.891+08:00", "clientVersion" : "2.8.1" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "my-name", "availablePermits" : 10, "unackedMessages" : 0, "avgMessagesPerEntry" : 1000, "blockedConsumerOnUnackedMsgs" : false, "readPositionWhenJoining" : "10:19505", "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0, "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ], "metadata" : { }, "address" : "/127.0.0.1:54708", "connectedSince" : "2021-09-27T16:23:59.031+08:00", "clientVersion" : "2.8.1" }, { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "my-name", "availablePermits" : 10, "unackedMessages" : 0, "avgMessagesPerEntry" : 1000, "blockedConsumerOnUnackedMsgs" : false, "readPositionWhenJoining" : "10:19514", "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0, "keyHashRanges" : [ "[16385, 40960]", "[40961, 65536]" ], "metadata" : { }, "address" : "/127.0.0.1:54717", "connectedSince" : "2021-09-27T16:24:03.927+08:00", "clientVersion" : "2.8.1" } ], ``` The fix is to use the equals method of the consumer to generate the key hash ranges. New tests added. (cherry picked from commit 9abd6d30f39f74e255cd5dac40b77af3dce5c468) --- ...ConsistentHashingStickyKeyConsumerSelector.java | 9 ++-- ...ashRangeAutoSplitStickyKeyConsumerSelector.java | 9 ++-- ...ashRangeExclusiveStickyKeyConsumerSelector.java | 9 ++-- .../broker/service/StickyKeyConsumerSelector.java | 3 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 3 +- .../service/persistent/PersistentSubscription.java | 10 +++-- ...istentHashingStickyKeyConsumerSelectorTest.java | 29 +++++++++---- ...angeAutoSplitStickyKeyConsumerSelectorTest.java | 40 ++++++++++++++---- ...angeExclusiveStickyKeyConsumerSelectorTest.java | 48 ++++++++++++++++++---- .../java/org/apache/pulsar/client/api/Range.java | 19 +++++++++ 10 files changed, 139 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index f0f6431..7b7a830 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -30,6 +30,7 @@ import java.util.TreeMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.util.Murmur3_32Hash; /** @@ -126,15 +127,15 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons } @Override - public Map<String, List<String>> getConsumerKeyHashRanges() { - Map<String, List<String>> result = new LinkedHashMap<>(); + public Map<Consumer, List<Range>> getConsumerKeyHashRanges() { + Map<Consumer, List<Range>> result = new LinkedHashMap<>(); rwLock.readLock().lock(); try { int start = 0; for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) { for (Consumer consumer: entry.getValue()) { - result.computeIfAbsent(consumer.consumerName(), key -> new ArrayList<>()) - .add("[" + start + ", " + entry.getKey() + "]"); + result.computeIfAbsent(consumer, key -> new ArrayList<>()) + .add(Range.of(start, entry.getKey())); } start = entry.getKey() + 1; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index 4f721c9..624108f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.client.api.Range; /** * This is a consumer selector based fixed hash range. @@ -112,12 +113,12 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon } @Override - public Map<String, List<String>> getConsumerKeyHashRanges() { - Map<String, List<String>> result = new HashMap<>(); + public Map<Consumer, List<Range>> getConsumerKeyHashRanges() { + Map<Consumer, List<Range>> result = new HashMap<>(); int start = 0; for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) { - result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>()) - .add("[" + start + ", " + entry.getKey() + "]"); + result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>()) + .add(Range.of(start, entry.getKey())); start = entry.getKey() + 1; } return result; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index f4909ae..7c49632 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; @@ -65,16 +66,16 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon } @Override - public Map<String, List<String>> getConsumerKeyHashRanges() { - Map<String, List<String>> result = new HashMap<>(); + public Map<Consumer, List<Range>> getConsumerKeyHashRanges() { + Map<Consumer, List<Range>> result = new HashMap<>(); Map.Entry<Integer, Consumer> prev = null; for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) { if (prev == null) { prev = entry; } else { if (prev.getValue().equals(entry.getValue())) { - result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList<>()) - .add("[" + prev.getKey() + ", " + entry.getKey() + "]"); + result.computeIfAbsent(entry.getValue(), key -> new ArrayList<>()) + .add(Range.of(prev.getKey(), entry.getKey())); } prev = null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index 83952fc..b129ef5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import java.util.List; import java.util.Map; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.util.Murmur3_32Hash; public interface StickyKeyConsumerSelector { @@ -66,5 +67,5 @@ public interface StickyKeyConsumerSelector { * Get key hash ranges handled by each consumer. * @return A map where key is a consumer name and value is list of hash range it receiving message for. */ - Map<String, List<String>> getConsumerKeyHashRanges(); + Map<Consumer, List<Range>> getConsumerKeyHashRanges(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index e62f63e..9a036f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelec import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -418,7 +419,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return recentlyJoinedConsumers; } - public Map<String, List<String>> getConsumerKeyHashRanges() { + public Map<Consumer, List<Range>> getConsumerKeyHashRanges() { return selector.getConsumerKeyHashRanges(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1eb99e9..b127b1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -30,6 +30,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -62,6 +63,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; @@ -1019,7 +1021,7 @@ public class PersistentSubscription implements Subscription { subStats.msgOutCounter = msgOutFromRemovedConsumer.longValue(); Dispatcher dispatcher = this.dispatcher; if (dispatcher != null) { - Map<String, List<String>> consumerKeyHashRanges = getType() == SubType.Key_Shared + Map<Consumer, List<Range>> consumerKeyHashRanges = getType() == SubType.Key_Shared ? ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getConsumerKeyHashRanges() : null; dispatcher.getConsumers().forEach(consumer -> { ConsumerStatsImpl consumerStats = consumer.getStats(); @@ -1034,8 +1036,10 @@ public class PersistentSubscription implements Subscription { subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp); - if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer.consumerName())) { - consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer.consumerName()); + if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) { + consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream() + .map(Range::toString) + .collect(Collectors.toList()); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index 29b40c2..e058492 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -24,9 +24,11 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.client.api.Range; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -146,22 +148,31 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException { ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3); List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3"); + List<Consumer> consumers = new ArrayList<>(); for (String s : consumerName) { Consumer consumer = mock(Consumer.class); when(consumer.consumerName()).thenReturn(s); selector.addConsumer(consumer); + consumers.add(consumer); } - - Map<String, Set<String>> expectedResult = new HashMap<>(); - expectedResult.put("consumer1", ImmutableSet.of("[0, 330121749]", "[330121750, 618146114]", "[1797637922, 1976098885]")); - expectedResult.put("consumer2", ImmutableSet.of("[938427576, 1094135919]", "[1138613629, 1342907082]", "[1342907083, 1797637921]")); - expectedResult.put("consumer3", ImmutableSet.of("[618146115, 772640562]", "[772640563, 938427575]", "[1094135920, 1138613628]")); - for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) { - Assert.assertEquals(new HashSet<>(entry.getValue()), expectedResult.get(entry.getKey())); + Map<Consumer, List<Range>> expectedResult = new HashMap<>(); + expectedResult.put(consumers.get(0), Arrays.asList( + Range.of(0, 330121749), + Range.of(330121750, 618146114), + Range.of(1797637922, 1976098885))); + expectedResult.put(consumers.get(1), Arrays.asList( + Range.of(938427576, 1094135919), + Range.of(1138613629, 1342907082), + Range.of(1342907083, 1797637921))); + expectedResult.put(consumers.get(2), Arrays.asList( + Range.of(618146115, 772640562), + Range.of(772640563, 938427575), + Range.of(1094135920, 1138613628))); + for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) { + System.out.println(entry.getValue()); + Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey())); expectedResult.remove(entry.getKey()); } Assert.assertEquals(expectedResult.size(), 0); } - - } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java index cb4b809..f17bb5a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service; -import com.google.common.collect.ImmutableList; +import org.apache.pulsar.client.api.Range; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,22 +39,46 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest { public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException { HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5); List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4"); + List<Consumer> consumers = new ArrayList<>(); for (String s : consumerName) { Consumer consumer = mock(Consumer.class); when(consumer.consumerName()).thenReturn(s); selector.addConsumer(consumer); + consumers.add(consumer); } - Map<String, List<String>> expectedResult = new HashMap<>(); - expectedResult.put("consumer1", ImmutableList.of("[49, 64]")); - expectedResult.put("consumer4", ImmutableList.of("[33, 48]")); - expectedResult.put("consumer2", ImmutableList.of("[17, 32]")); - expectedResult.put("consumer3", ImmutableList.of("[0, 16]")); - for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) { + Map<Consumer, List<Range>> expectedResult = new HashMap<>(); + expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(49, 64))); + expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(33, 48))); + expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(17, 32))); + expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(0, 16))); + for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) { Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey())); expectedResult.remove(entry.getKey()); } Assert.assertEquals(expectedResult.size(), 0); } + @Test + public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception { + HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5); + final String consumerName = "My-consumer"; + List<Consumer> consumers = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Consumer consumer = mock(Consumer.class); + when(consumer.consumerName()).thenReturn(consumerName); + selector.addConsumer(consumer); + consumers.add(consumer); + } + + List<Range> prev = null; + for (Consumer consumer : consumers) { + List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer); + Assert.assertEquals(ranges.size(), 1); + if (prev != null) { + Assert.assertNotEquals(prev, ranges); + } + prev = ranges; + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java index c63e57e..dde0a58 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelectorTest.java @@ -21,15 +21,16 @@ package org.apache.pulsar.broker.service; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -117,6 +118,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4"); List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}, new int[] {15, 20}); + List<Consumer> consumers = new ArrayList<>(); for (int index = 0; index < consumerName.size(); index++) { Consumer consumer = mock(Consumer.class); KeySharedMeta keySharedMeta = new KeySharedMeta() @@ -128,14 +130,15 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { when(consumer.consumerName()).thenReturn(consumerName.get(index)); Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); selector.addConsumer(consumer); + consumers.add(consumer); } - Map<String, List<String>> expectedResult = new HashMap<>(); - expectedResult.put("consumer1", ImmutableList.of("[0, 2]")); - expectedResult.put("consumer2", ImmutableList.of("[3, 7]")); - expectedResult.put("consumer3", ImmutableList.of("[9, 12]")); - expectedResult.put("consumer4", ImmutableList.of("[15, 20]")); - for (Map.Entry<String, List<String>> entry : selector.getConsumerKeyHashRanges().entrySet()) { + Map<Consumer, List<Range>> expectedResult = new HashMap<>(); + expectedResult.put(consumers.get(0), Collections.singletonList(Range.of(0, 2))); + expectedResult.put(consumers.get(1), Collections.singletonList(Range.of(3, 7))); + expectedResult.put(consumers.get(2), Collections.singletonList(Range.of(9, 12))); + expectedResult.put(consumers.get(3), Collections.singletonList(Range.of(15, 20))); + for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) { Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey())); expectedResult.remove(entry.getKey()); } @@ -143,6 +146,37 @@ public class HashRangeExclusiveStickyKeyConsumerSelectorTest { } @Test + public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception { + HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); + final String consumerName = "My-consumer"; + List<int[]> range = Arrays.asList(new int[] {0, 2}, new int[] {3, 7}, new int[] {9, 12}); + List<Consumer> consumers = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Consumer consumer = mock(Consumer.class); + KeySharedMeta keySharedMeta = new KeySharedMeta() + .setKeySharedMode(KeySharedMode.STICKY); + keySharedMeta.addHashRange() + .setStart(range.get(i)[0]) + .setEnd(range.get(i)[1]); + when(consumer.getKeySharedMeta()).thenReturn(keySharedMeta); + when(consumer.consumerName()).thenReturn(consumerName); + Assert.assertEquals(consumer.getKeySharedMeta(), keySharedMeta); + selector.addConsumer(consumer); + consumers.add(consumer); + } + + List<Range> prev = null; + for (Consumer consumer : consumers) { + List<Range> ranges = selector.getConsumerKeyHashRanges().get(consumer); + Assert.assertEquals(ranges.size(), 1); + if (prev != null) { + Assert.assertNotEquals(prev, ranges); + } + prev = ranges; + } + } + + @Test public void testSingleRangeConflict() throws BrokerServiceException.ConsumerAssignException { HashRangeExclusiveStickyKeyConsumerSelector selector = new HashRangeExclusiveStickyKeyConsumerSelector(10); Consumer consumer1 = mock(Consumer.class); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java index 57a408b..75b1cbc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java @@ -21,6 +21,8 @@ package org.apache.pulsar.client.api; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import java.util.Objects; + /** * Int range. */ @@ -63,6 +65,23 @@ public class Range { } @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Range range = (Range) o; + return start == range.start && end == range.end; + } + + @Override + public int hashCode() { + return Objects.hash(start, end); + } + + @Override public String toString() { return "[" + start + ", " + end + "]"; }
