This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 390d7d92734 [improve][broker][PIP-379] Don't replace a consumer when 
there's a collision (#23441)
390d7d92734 is described below

commit 390d7d92734538b7fa431044f25ad3c6c5574e78
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 11 20:54:52 2024 +0300

    [improve][broker][PIP-379] Don't replace a consumer when there's a 
collision (#23441)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java | 32 ++++++++++++++++++----
 .../service/ConsumerHashAssignmentsSnapshot.java   |  4 +--
 ...istentHashingStickyKeyConsumerSelectorTest.java | 16 ++++-------
 3 files changed, 33 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index fde140a299c..2559a02f87d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -27,12 +27,14 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Range;
 
 /**
  * This is a consumer selector using consistent hashing to evenly split
  * the number of keys assigned to each consumer.
  */
+@Slf4j
 public class ConsistentHashingStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
     // use NUL character as field separator for hash key calculation
     private static final String KEY_SEPARATOR = "\0";
@@ -76,18 +78,36 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
             ConsumerIdentityWrapper consumerIdentityWrapper = new 
ConsumerIdentityWrapper(consumer);
             // Insert multiple points on the hash ring for every consumer
             // The points are deterministically added based on the hash of the 
consumer name
+            int hashPointsAdded = 0;
+            int hashPointCollisions = 0;
             for (int i = 0; i < numberOfPoints; i++) {
                 int consumerNameIndex =
                         
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
                 int hash = calculateHashForConsumerAndIndex(consumer, 
consumerNameIndex, i);
-                // When there's a collision, the new consumer will replace the 
old one.
-                // This is a rare case, and it is acceptable to replace the 
old consumer since there
-                // are multiple points for each consumer. This won't affect 
the overall distribution significantly.
-                ConsumerIdentityWrapper removed = hashRing.put(hash, 
consumerIdentityWrapper);
-                if (removed != null) {
-                    consumerNameIndexTracker.decreaseConsumerRefCount(removed);
+                // When there's a collision, the entry won't be added to the 
hash ring.
+                // This isn't a problem with the consumerNameIndexTracker 
solution since the collisions won't align
+                // for all hash ring points when using the same consumer name. 
This won't affect the overall
+                // distribution significantly when the number of hash ring 
points is sufficiently large (>100).
+                ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash, 
consumerIdentityWrapper);
+                if (existing != null) {
+                    hashPointCollisions++;
+                    // reduce the ref count which was increased before adding 
since the consumer was not added
+                    
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
+                } else {
+                    hashPointsAdded++;
                 }
             }
+            if (hashPointsAdded == 0) {
+                log.error("Failed to add consumer '{}' to the hash ring. There 
were {} collisions. Consider increasing "
+                                + "the number of points ({}) per consumer by 
setting "
+                                + 
"subscriptionKeySharedConsistentHashingReplicaPoints={}",
+                        consumer, hashPointCollisions, numberOfPoints,
+                        Math.max((int) (numberOfPoints * 1.5d), numberOfPoints 
+ 1));
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Added consumer '{}' with {} points, {} collisions", 
consumer, hashPointsAdded,
+                        hashPointCollisions);
+            }
             if (!addOrRemoveReturnsImpactedConsumersResult) {
                 return CompletableFuture.completedFuture(Optional.empty());
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index d2bd113e69d..b4add79294c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -71,8 +71,8 @@ public class ConsumerHashAssignmentsSnapshot {
         return new ConsumerHashAssignmentsSnapshot(Collections.emptyList());
     }
 
-    public ImpactedConsumersResult 
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot other) {
-        return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, 
other.hashRangeAssignments);
+    public ImpactedConsumersResult 
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
+        return resolveConsumerRemovedHashRanges(this.hashRangeAssignments, 
assignmentsAfter.hashRangeAssignments);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 2b01256611b..6752cd7cfab 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -475,11 +475,11 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
     }
 
     @Test
-    public void 
testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
+    public void 
testShouldNotContainMappingChangesWhenConsumersLeaveAndRejoinInSameOrder() {
         final ConsistentHashingStickyKeyConsumerSelector selector =
-                new ConsistentHashingStickyKeyConsumerSelector(100, true);
+                new ConsistentHashingStickyKeyConsumerSelector(200, true);
         final String consumerName = "consumer";
-        final int numOfInitialConsumers = 10;
+        final int numOfInitialConsumers = 200;
         List<Consumer> consumers = new ArrayList<>();
         for (int i = 0; i < numOfInitialConsumers; i++) {
             final Consumer consumer = createMockConsumer(consumerName, "index 
" + i, i);
@@ -498,14 +498,8 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
         selector.addConsumer(consumers.get(numOfInitialConsumers / 2));
 
         ConsumerHashAssignmentsSnapshot assignmentsAfter = 
selector.getConsumerHashAssignmentsSnapshot();
-        int removedRangesSize = 
assignmentsBefore.diffRanges(assignmentsAfter).keySet().stream()
-                .mapToInt(Range::size)
-                .sum();
-        double allowedremovedRangesPercentage = 1; // 1%
-        int hashRangeSize = selector.getKeyHashRange().size();
-        int allowedremovedRanges = (int) (hashRangeSize * 
(allowedremovedRangesPercentage / 100.0d));
-        assertThat(removedRangesSize).describedAs("Allow up to %d%% of total 
hash range size to be impacted",
-                
allowedremovedRangesPercentage).isLessThan(allowedremovedRanges);
+
+        
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()).isEmpty();
     }
 
     @Test

Reply via email to