This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 390d7d92734 [improve][broker][PIP-379] Don't replace a consumer when
there's a collision (#23441)
390d7d92734 is described below
commit 390d7d92734538b7fa431044f25ad3c6c5574e78
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 11 20:54:52 2024 +0300
[improve][broker][PIP-379] Don't replace a consumer when there's a
collision (#23441)
---
...ConsistentHashingStickyKeyConsumerSelector.java | 32 ++++++++++++++++++----
.../service/ConsumerHashAssignmentsSnapshot.java | 4 +--
...istentHashingStickyKeyConsumerSelectorTest.java | 16 ++++-------
3 files changed, 33 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index fde140a299c..2559a02f87d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -27,12 +27,14 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Range;
/**
* This is a consumer selector using consistent hashing to evenly split
* the number of keys assigned to each consumer.
*/
+@Slf4j
public class ConsistentHashingStickyKeyConsumerSelector implements
StickyKeyConsumerSelector {
// use NUL character as field separator for hash key calculation
private static final String KEY_SEPARATOR = "\0";
@@ -76,18 +78,36 @@ public class ConsistentHashingStickyKeyConsumerSelector
implements StickyKeyCons
ConsumerIdentityWrapper consumerIdentityWrapper = new
ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the
consumer name
+ int hashPointsAdded = 0;
+ int hashPointCollisions = 0;
for (int i = 0; i < numberOfPoints; i++) {
int consumerNameIndex =
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
int hash = calculateHashForConsumerAndIndex(consumer,
consumerNameIndex, i);
- // When there's a collision, the new consumer will replace the
old one.
- // This is a rare case, and it is acceptable to replace the
old consumer since there
- // are multiple points for each consumer. This won't affect
the overall distribution significantly.
- ConsumerIdentityWrapper removed = hashRing.put(hash,
consumerIdentityWrapper);
- if (removed != null) {
- consumerNameIndexTracker.decreaseConsumerRefCount(removed);
+ // When there's a collision, the entry won't be added to the
hash ring.
+ // This isn't a problem with the consumerNameIndexTracker
solution since the collisions won't align
+ // for all hash ring points when using the same consumer name.
This won't affect the overall
+ // distribution significantly when the number of hash ring
points is sufficiently large (>100).
+ ConsumerIdentityWrapper existing = hashRing.putIfAbsent(hash,
consumerIdentityWrapper);
+ if (existing != null) {
+ hashPointCollisions++;
+ // reduce the ref count which was increased before adding
since the consumer was not added
+
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
+ } else {
+ hashPointsAdded++;
}
}
+ if (hashPointsAdded == 0) {
+ log.error("Failed to add consumer '{}' to the hash ring. There
were {} collisions. Consider increasing "
+ + "the number of points ({}) per consumer by
setting "
+ +
"subscriptionKeySharedConsistentHashingReplicaPoints={}",
+ consumer, hashPointCollisions, numberOfPoints,
+ Math.max((int) (numberOfPoints * 1.5d), numberOfPoints
+ 1));
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Added consumer '{}' with {} points, {} collisions",
consumer, hashPointsAdded,
+ hashPointCollisions);
+ }
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
index d2bd113e69d..b4add79294c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerHashAssignmentsSnapshot.java
@@ -71,8 +71,8 @@ public class ConsumerHashAssignmentsSnapshot {
return new ConsumerHashAssignmentsSnapshot(Collections.emptyList());
}
- public ImpactedConsumersResult
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot other) {
- return resolveConsumerRemovedHashRanges(this.hashRangeAssignments,
other.hashRangeAssignments);
+ public ImpactedConsumersResult
resolveImpactedConsumers(ConsumerHashAssignmentsSnapshot assignmentsAfter) {
+ return resolveConsumerRemovedHashRanges(this.hashRangeAssignments,
assignmentsAfter.hashRangeAssignments);
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 2b01256611b..6752cd7cfab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -475,11 +475,11 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
}
@Test
- public void
testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
+ public void
testShouldNotContainMappingChangesWhenConsumersLeaveAndRejoinInSameOrder() {
final ConsistentHashingStickyKeyConsumerSelector selector =
- new ConsistentHashingStickyKeyConsumerSelector(100, true);
+ new ConsistentHashingStickyKeyConsumerSelector(200, true);
final String consumerName = "consumer";
- final int numOfInitialConsumers = 10;
+ final int numOfInitialConsumers = 200;
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < numOfInitialConsumers; i++) {
final Consumer consumer = createMockConsumer(consumerName, "index
" + i, i);
@@ -498,14 +498,8 @@ public class
ConsistentHashingStickyKeyConsumerSelectorTest {
selector.addConsumer(consumers.get(numOfInitialConsumers / 2));
ConsumerHashAssignmentsSnapshot assignmentsAfter =
selector.getConsumerHashAssignmentsSnapshot();
- int removedRangesSize =
assignmentsBefore.diffRanges(assignmentsAfter).keySet().stream()
- .mapToInt(Range::size)
- .sum();
- double allowedremovedRangesPercentage = 1; // 1%
- int hashRangeSize = selector.getKeyHashRange().size();
- int allowedremovedRanges = (int) (hashRangeSize *
(allowedremovedRangesPercentage / 100.0d));
- assertThat(removedRangesSize).describedAs("Allow up to %d%% of total
hash range size to be impacted",
-
allowedremovedRangesPercentage).isLessThan(allowedremovedRanges);
+
+
assertThat(assignmentsBefore.resolveImpactedConsumers(assignmentsAfter).getRemovedHashRanges()).isEmpty();
}
@Test