This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49aa3080d42 [improve][broker] PIP-392: Add configuration to enable 
consistent hashing to select active consumer for partitioned topic (#23584)
49aa3080d42 is described below

commit 49aa3080d422994baa036ed0b743a2fa18a6d530
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Nov 21 10:59:56 2024 +0800

    [improve][broker] PIP-392: Add configuration to enable consistent hashing 
to select active consumer for partitioned topic (#23584)
---
 conf/broker.conf                                   |  4 +++
 conf/standalone.conf                               |  4 +++
 deployment/terraform-ansible/templates/broker.conf |  4 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 +++++
 .../AbstractDispatcherSingleActiveConsumer.java    |  2 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 33 ++++++++++++++++++++++
 6 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e745fcb2b0a..af335c14153 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -243,6 +243,10 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active 
consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# Enable consistent hashing for selecting the active consumer in partitioned 
topics with Failover subscription type. 
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
 # How long to delete inactive subscriptions from last consuming
 # When it is 0, inactive subscriptions are not deleted automatically
 subscriptionExpirationTimeMinutes=0
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 535800a43f3..90cf3b57ff9 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -152,6 +152,10 @@ maxMessageSizeCheckIntervalInSeconds=60
 # How long to delay rewinding cursor and dispatching messages when active 
consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# Enable consistent hashing for selecting the active consumer in partitioned 
topics with Failover subscription type. 
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
 # How long to delete inactive subscriptions from last consuming
 # When it is 0, inactive subscriptions are not deleted automatically
 subscriptionExpirationTimeMinutes=0
diff --git a/deployment/terraform-ansible/templates/broker.conf 
b/deployment/terraform-ansible/templates/broker.conf
index ff367717402..bae55cb69f1 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -148,6 +148,10 @@ messageExpiryCheckIntervalInMinutes=5
 # How long to delay rewinding cursor and dispatching messages when active 
consumer is changed
 activeConsumerFailoverDelayTimeMillis=1000
 
+# Enable consistent hashing for selecting the active consumer in partitioned 
topics with Failover subscription type. 
+# For non-partitioned topics, consistent hashing is used by default.
+activeConsumerFailoverConsistentHashing=false
+
 # How long to delete inactive subscriptions from last consuming
 # When it is 0, inactive subscriptions are not deleted automatically
 subscriptionExpirationTimeMinutes=0
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 19e9ff625ca..8b5a4ef270b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -760,6 +760,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "How long to delay rewinding cursor and dispatching messages 
when active consumer is changed"
     )
     private int activeConsumerFailoverDelayTimeMillis = 1000;
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "Enable consistent hashing for selecting the active consumer 
in partitioned "
+                    + "topics with Failover subscription type."
+                    + "For non-partitioned topics, consistent hashing is used 
by default."
+    )
+    private boolean activeConsumerFailoverConsistentHashing = false;
     @FieldContext(
             category = CATEGORY_POLICIES,
             doc = "Maximum time to spend while scanning a subscription to 
calculate the accurate backlog"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 9980b6ae97c..baca6bf078c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -126,7 +126,7 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 }
             }
         }
-        int index = partitionIndex >= 0
+        int index = partitionIndex >= 0 && 
!serviceConfig.isActiveConsumerFailoverConsistentHashing()
                 ? partitionIndex % consumersSize
                 : peekConsumerIndexFromHashRing(makeHashRing(consumersSize));
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 83cb5f2a440..1d5ac759625 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1380,6 +1380,39 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testPartitionedTopicDistribution() throws Exception {
+        this.conf.setActiveConsumerFailoverConsistentHashing(true);
+        final String topic = "partitioned-topics-distribution";
+        final int topicCount = 100;
+        final int consumers = 10;
+
+        for (int i = 0; i < topicCount; i++) {
+            admin.topics().createPartitionedTopic(topic + "-" + i, 1);
+        }
+
+        CustomizedConsumerEventListener eventListener = new 
CustomizedConsumerEventListener();
+
+        List<Consumer<?>> consumerList = new ArrayList<>(consumers);
+        for (int i = 0; i < consumers; i++) {
+            consumerList.add(pulsarClient.newConsumer()
+                    .topics(IntStream.range(0, topicCount).mapToObj(j -> topic 
+ "-" + j).toList())
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscriptionName("my-sub")
+                    .consumerName("consumer-" + i)
+                    .consumerEventListener(eventListener)
+                    .subscribe());
+        }
+
+        log.info("Topics are distributed to consumers as {}", 
eventListener.getActiveConsumers());
+        Map<String, Integer> assigned = new HashMap<>();
+        eventListener.getActiveConsumers().forEach((k, v) -> 
assigned.compute(v, (t, c) -> c == null ? 1 : ++ c));
+        assertEquals(assigned.size(), consumers);
+        for (Consumer<?> consumer : consumerList) {
+            consumer.close();
+        }
+    }
+
     private static class CustomizedConsumerEventListener implements 
ConsumerEventListener {
 
         private final Map<String, String> activeConsumers = new HashMap<>();

Reply via email to