This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 49aa3080d42 [improve][broker] PIP-392: Add configuration to enable
consistent hashing to select active consumer for partitioned topic (#23584)
49aa3080d42 is described below
commit 49aa3080d422994baa036ed0b743a2fa18a6d530
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Nov 21 10:59:56 2024 +0800
[improve][broker] PIP-392: Add configuration to enable consistent hashing
to select active consumer for partitioned topic (#23584)
---
conf/broker.conf | 4 +++
conf/standalone.conf | 4 +++
deployment/terraform-ansible/templates/broker.conf | 4 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 7 +++++
.../AbstractDispatcherSingleActiveConsumer.java | 2 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 33 ++++++++++++++++++++++
6 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index e745fcb2b0a..af335c14153 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -243,6 +243,10 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active
consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
+# Enable consistent hashing for selecting the active consumer in partitioned
topics with Failover subscription type.
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 535800a43f3..90cf3b57ff9 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -152,6 +152,10 @@ maxMessageSizeCheckIntervalInSeconds=60
# How long to delay rewinding cursor and dispatching messages when active
consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
+# Enable consistent hashing for selecting the active consumer in partitioned
topics with Failover subscription type.
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
diff --git a/deployment/terraform-ansible/templates/broker.conf
b/deployment/terraform-ansible/templates/broker.conf
index ff367717402..bae55cb69f1 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -148,6 +148,10 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active
consumer is changed
activeConsumerFailoverDelayTimeMillis=1000
+# Enable consistent hashing for selecting the active consumer in partitioned
topics with Failover subscription type.
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 19e9ff625ca..8b5a4ef270b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -760,6 +760,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "How long to delay rewinding cursor and dispatching messages
when active consumer is changed"
)
private int activeConsumerFailoverDelayTimeMillis = 1000;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "Enable consistent hashing for selecting the active consumer
in partitioned "
+ + "topics with Failover subscription type."
+ + "For non-partitioned topics, consistent hashing is used
by default."
+ )
+ private boolean activeConsumerFailoverConsistentHashing = false;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum time to spend while scanning a subscription to
calculate the accurate backlog"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 9980b6ae97c..baca6bf078c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -126,7 +126,7 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
}
}
}
- int index = partitionIndex >= 0
+ int index = partitionIndex >= 0 &&
!serviceConfig.isActiveConsumerFailoverConsistentHashing()
? partitionIndex % consumersSize
: peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 83cb5f2a440..1d5ac759625 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1380,6 +1380,39 @@ public class TopicsConsumerImplTest extends
ProducerConsumerBase {
}
}
+ @Test
+ public void testPartitionedTopicDistribution() throws Exception {
+ this.conf.setActiveConsumerFailoverConsistentHashing(true);
+ final String topic = "partitioned-topics-distribution";
+ final int topicCount = 100;
+ final int consumers = 10;
+
+ for (int i = 0; i < topicCount; i++) {
+ admin.topics().createPartitionedTopic(topic + "-" + i, 1);
+ }
+
+ CustomizedConsumerEventListener eventListener = new
CustomizedConsumerEventListener();
+
+ List<Consumer<?>> consumerList = new ArrayList<>(consumers);
+ for (int i = 0; i < consumers; i++) {
+ consumerList.add(pulsarClient.newConsumer()
+ .topics(IntStream.range(0, topicCount).mapToObj(j -> topic
+ "-" + j).toList())
+ .subscriptionType(SubscriptionType.Failover)
+ .subscriptionName("my-sub")
+ .consumerName("consumer-" + i)
+ .consumerEventListener(eventListener)
+ .subscribe());
+ }
+
+ log.info("Topics are distributed to consumers as {}",
eventListener.getActiveConsumers());
+ Map<String, Integer> assigned = new HashMap<>();
+ eventListener.getActiveConsumers().forEach((k, v) ->
assigned.compute(v, (t, c) -> c == null ? 1 : ++ c));
+ assertEquals(assigned.size(), consumers);
+ for (Consumer<?> consumer : consumerList) {
+ consumer.close();
+ }
+ }
+
private static class CustomizedConsumerEventListener implements
ConsumerEventListener {
private final Map<String, String> activeConsumers = new HashMap<>();