jerrypeng commented on a change in pull request #6791:
URL: https://github.com/apache/pulsar/pull/6791#discussion_r413394891
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -18,86 +18,64 @@
*/
package org.apache.pulsar.broker.service;
-import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
/**
* This is a consumer selector based fixed hash range.
*
- * 1.Each consumer serves a fixed range of hash value
- * 2.The whole range of hash value could be covered by all the consumers.
- * 3.Once a consumer is removed, the left consumers could still serve the
whole range.
- *
- * Initializing with a fixed hash range, by default 2 << 5.
- * First consumer added, hash range looks like:
- *
- * 0 -> 65536(consumer-1)
- *
- * Second consumer added, will find a biggest range to split:
- *
- * 0 -> 32768(consumer-2) -> 65536(consumer-1)
- *
- * While a consumer removed, The range for this consumer will be taken over
- * by other consumer, consumer-2 removed:
- *
- * 0 -> 65536(consumer-1)
- *
- * In this approach use skip list map to maintain the hash range and consumers.
- *
- * Select consumer will return the ceiling key of message key hashcode % range
size.
- *
+ * The implementation uses consistent hashing to evenly split, the
+ * number of keys assigned to each consumer.
*/
public class HashRangeAutoSplitStickyKeyConsumerSelector implements
StickyKeyConsumerSelector {
- private final int rangeSize;
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
- private final Map<Consumer, Integer> consumerRange;
+ // Consistent-Hash ring
+ private final NavigableMap<Integer, Consumer> hashRing;
- public HashRangeAutoSplitStickyKeyConsumerSelector() {
- this(DEFAULT_RANGE_SIZE);
- }
+ private final int numberOfPOints;
- public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
- if (rangeSize < 2) {
- throw new IllegalArgumentException("range size must greater than
2");
- }
- if (!is2Power(rangeSize)) {
- throw new IllegalArgumentException("range size must be nth power
with 2");
- }
- this.rangeMap = new ConcurrentSkipListMap<>();
- this.consumerRange = new HashMap<>();
- this.rangeSize = rangeSize;
+ public HashRangeAutoSplitStickyKeyConsumerSelector(int numberOfPOints) {
+ this.hashRing = new TreeMap<>();
+ this.numberOfPOints = numberOfPOints;
}
@Override
- public synchronized void addConsumer(Consumer consumer) throws
ConsumerAssignException {
- if (rangeMap.size() == 0) {
- rangeMap.put(rangeSize, consumer);
- consumerRange.put(consumer, rangeSize);
- } else {
- splitRange(findBiggestRange(), consumer);
+ public void addConsumer(Consumer consumer) throws ConsumerAssignException {
+ rwLock.writeLock().lock();
+ try {
+ // Insert multiple points on the hash ring for every consumer
+ // The points are deterministically added based on the hash of the
consumer name
+ for (int i = 0; i < numberOfPOints; i++) {
Review comment:
"numberOfPoints"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]