jerrypeng commented on a change in pull request #6791:
URL: https://github.com/apache/pulsar/pull/6791#discussion_r413395480



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -18,86 +18,64 @@
  */
 package org.apache.pulsar.broker.service;
 
-import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 /**
  * This is a consumer selector based fixed hash range.
  *
- * 1.Each consumer serves a fixed range of hash value
- * 2.The whole range of hash value could be covered by all the consumers.
- * 3.Once a consumer is removed, the left consumers could still serve the 
whole range.
- *
- * Initializing with a fixed hash range, by default 2 << 5.
- * First consumer added, hash range looks like:
- *
- * 0 -> 65536(consumer-1)
- *
- * Second consumer added, will find a biggest range to split:
- *
- * 0 -> 32768(consumer-2) -> 65536(consumer-1)
- *
- * While a consumer removed, The range for this consumer will be taken over
- * by other consumer, consumer-2 removed:
- *
- * 0 -> 65536(consumer-1)
- *
- * In this approach use skip list map to maintain the hash range and consumers.
- *
- * Select consumer will return the ceiling key of message key hashcode % range 
size.
- *
+ * The implementation uses consistent hashing to evenly split, the
+ * number of keys assigned to each consumer.
  */
 public class HashRangeAutoSplitStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
 
-    private final int rangeSize;
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
-    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
-    private final Map<Consumer, Integer> consumerRange;
+    // Consistent-Hash ring
+    private final NavigableMap<Integer, Consumer> hashRing;
 
-    public HashRangeAutoSplitStickyKeyConsumerSelector() {
-        this(DEFAULT_RANGE_SIZE);
-    }
+    private final int numberOfPOints;
 
-    public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
-        if (rangeSize < 2) {
-            throw new IllegalArgumentException("range size must greater than 
2");
-        }
-        if (!is2Power(rangeSize)) {
-            throw new IllegalArgumentException("range size must be nth power 
with 2");
-        }
-        this.rangeMap = new ConcurrentSkipListMap<>();
-        this.consumerRange = new HashMap<>();
-        this.rangeSize = rangeSize;
+    public HashRangeAutoSplitStickyKeyConsumerSelector(int numberOfPOints) {
+        this.hashRing = new TreeMap<>();
+        this.numberOfPOints = numberOfPOints;
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) throws 
ConsumerAssignException {
-        if (rangeMap.size() == 0) {
-            rangeMap.put(rangeSize, consumer);
-            consumerRange.put(consumer, rangeSize);
-        } else {
-            splitRange(findBiggestRange(), consumer);
+    public void addConsumer(Consumer consumer) throws ConsumerAssignException {
+        rwLock.writeLock().lock();
+        try {
+            // Insert multiple points on the hash ring for every consumer
+            // The points are deterministically added based on the hash of the 
consumer name
+            for (int i = 0; i < numberOfPOints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.put(hash, consumer);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
-    public synchronized void removeConsumer(Consumer consumer) {
-        Integer removeRange = consumerRange.remove(consumer);
-        if (removeRange != null) {
-            if (removeRange == rangeSize && rangeMap.size() > 1) {
-                Map.Entry<Integer, Consumer> lowerEntry = 
rangeMap.lowerEntry(removeRange);
-                rangeMap.put(removeRange, lowerEntry.getValue());
-                rangeMap.remove(lowerEntry.getKey());
-                consumerRange.put(lowerEntry.getValue(), removeRange);
-            } else {
-                rangeMap.remove(removeRange);
+    public void removeConsumer(Consumer consumer) {
+        rwLock.writeLock().lock();
+        try {
+            // Remove all the points that were added for this consumer
+            for (int i = 0; i < numberOfPOints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.remove(hash, consumer);

Review comment:
       When removing consumers from the hash ring, is it necessary to check if 
the value of the key we are removing is also equals the consumer?  In what 
case, will that not be true?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to