merlimat closed pull request #2311: [cpp] receiver queue size config acorss 
partitions in multi-topics-consumer
URL: https://github.com/apache/incubator-pulsar/pull/2311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c89c..6750273649 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
     config.setMessageListener(
         boost::bind(&MultiTopicsConsumerImpl::messageReceived, 
shared_from_this(), _1, _2));
-    config.setReceiverQueueSize(conf_.getReceiverQueueSize());
 
     int numPartitions = partitionMetadata->getPartitions() >= 1 ? 
partitionMetadata->getPartitions() : 1;
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions)));
 
     Lock lock(mutex_);
     topicsPartitions_.insert(std::make_pair(topicName->toString(), 
numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07a41..f3b560b747 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ def create_producer(self, topic,
                         send_timeout_millis=30000,
                         compression_type=CompressionType.NONE,
                         max_pending_messages=1000,
+                        max_pending_messages_across_partitions=50000,
                         block_if_queue_full=False,
                         batching_enabled=False,
                         batching_max_messages=1000,
@@ -352,6 +353,9 @@ def create_producer(self, topic,
         * `max_pending_messages`:
           Set the max size of the queue holding the messages pending to receive
           an acknowledgment from the broker.
+        * `max_pending_messages_across_partitions`:
+          Set the max size of the queue holding the messages pending to receive
+          an acknowledgment across partitions from the broker.
         * `block_if_queue_full`: Set whether `send_async` operations should
           block when the outgoing message queue is full.
         * `message_routing_mode`:
@@ -364,6 +368,7 @@ def create_producer(self, topic,
         _check_type(int, send_timeout_millis, 'send_timeout_millis')
         _check_type(CompressionType, compression_type, 'compression_type')
         _check_type(int, max_pending_messages, 'max_pending_messages')
+        _check_type(int, max_pending_messages_across_partitions, 
'max_pending_messages_across_partitions')
         _check_type(bool, block_if_queue_full, 'block_if_queue_full')
         _check_type(bool, batching_enabled, 'batching_enabled')
         _check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ def create_producer(self, topic,
         conf.send_timeout_millis(send_timeout_millis)
         conf.compression_type(compression_type)
         conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
         conf.block_if_queue_full(block_if_queue_full)
         conf.batching_enabled(batching_enabled)
         conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ def subscribe(self, topic, subscription_name,
                   consumer_type=ConsumerType.Exclusive,
                   message_listener=None,
                   receiver_queue_size=1000,
+                  max_total_receiver_queue_size_across_partitions=50000,
                   consumer_name=None,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
@@ -434,6 +441,9 @@ def my_listener(consumer, message):
           should not be interrupted when the consumer queue size is zero. The
           default value is 1000 messages and should work well for most use
           cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
         * `consumer_name`:
           Sets the consumer name.
         * `unacked_messages_timeout_ms`:
@@ -450,6 +460,8 @@ def my_listener(consumer, message):
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
         _check_type_or_none(str, consumer_name, 'consumer_name')
         _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
@@ -461,6 +473,7 @@ def my_listener(consumer, message):
         if message_listener:
             conf.message_listener(message_listener)
         conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
         if consumer_name:
             conf.consumer_name(consumer_name)
         if unacked_messages_timeout_ms:
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9149addc6f..9deee9af78 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -108,6 +108,8 @@ void export_config() {
             .def("compression_type", 
&ProducerConfiguration::setCompressionType, return_self<>())
             .def("max_pending_messages", 
&ProducerConfiguration::getMaxPendingMessages)
             .def("max_pending_messages", 
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
+            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+            .def("max_pending_messages_across_partitions", 
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
             .def("block_if_queue_full", 
&ProducerConfiguration::getBlockIfQueueFull)
             .def("block_if_queue_full", 
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
             .def("partitions_routing_mode", 
&ProducerConfiguration::getPartitionsRoutingMode)
@@ -128,6 +130,8 @@ void export_config() {
             .def("message_listener", 
&ConsumerConfiguration_setMessageListener, return_self<>())
             .def("receiver_queue_size", 
&ConsumerConfiguration::getReceiverQueueSize)
             .def("receiver_queue_size", 
&ConsumerConfiguration::setReceiverQueueSize)
+            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+            .def("max_total_receiver_queue_size_across_partitions", 
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
             .def("consumer_name", &ConsumerConfiguration::getConsumerName, 
return_value_policy<copy_const_reference>())
             .def("consumer_name", &ConsumerConfiguration::setConsumerName)
             .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eedc09..ab3324764f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,7 +64,7 @@
     // Map <topic+partition, consumer>, when get do ACK, consumer will by find 
by topic name
     private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
 
-    // Map <topic, partitionNumber>, store partition number for each topic
+    // Map <topic, numPartitions>, store partition number for each topic
     protected final ConcurrentHashMap<String, Integer> topics;
 
     // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
@@ -665,24 +665,29 @@ private boolean topicNameValid(String topicName) {
         return subscribeResult;
     }
 
-    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int partitionNumber) {
+    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int numPartitions) {
         if (log.isDebugEnabled()) {
-            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, partitionNumber);
+            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, numPartitions);
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
 
-        if (partitionNumber > 1) {
-            this.topics.putIfAbsent(topicName, partitionNumber);
-            allTopicPartitionsNumber.addAndGet(partitionNumber);
+        if (numPartitions > 1) {
+            this.topics.putIfAbsent(topicName, numPartitions);
+            allTopicPartitionsNumber.addAndGet(numPartitions);
+
+            int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
+            ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
+            configurationData.setReceiverQueueSize(receiverQueueSize);
 
             futureList = IntStream
-                .range(0, partitionNumber)
+                .range(0, numPartitions)
                 .mapToObj(
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
+                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         return subFuture;
@@ -727,7 +732,7 @@ private void 
subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
 
                     subscribeResult.complete(null);
                     log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
-                        topic, subscription, topicName, partitionNumber, 
allTopicPartitionsNumber.get());
+                        topic, subscription, topicName, numPartitions, 
allTopicPartitionsNumber.get());
                     if (this.namespaceName == null) {
                         this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
                     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to