merlimat closed pull request #2311: [cpp] receiver queue size config acorss
partitions in multi-topics-consumer
URL: https://github.com/apache/incubator-pulsar/pull/2311
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c89c..6750273649 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
config.setMessageListener(
boost::bind(&MultiTopicsConsumerImpl::messageReceived,
shared_from_this(), _1, _2));
- config.setReceiverQueueSize(conf_.getReceiverQueueSize());
int numPartitions = partitionMetadata->getPartitions() >= 1 ?
partitionMetadata->getPartitions() : 1;
+ // Apply total limit of receiver queue size across partitions
+ config.setReceiverQueueSize(
+ std::min(conf_.getReceiverQueueSize(),
+ (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions)));
Lock lock(mutex_);
topicsPartitions_.insert(std::make_pair(topicName->toString(),
numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07a41..f3b560b747 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ def create_producer(self, topic,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
max_pending_messages=1000,
+ max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
@@ -352,6 +353,9 @@ def create_producer(self, topic,
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
+ * `max_pending_messages_across_partitions`:
+ Set the max size of the queue holding the messages pending to receive
+ an acknowledgment across partitions from the broker.
* `block_if_queue_full`: Set whether `send_async` operations should
block when the outgoing message queue is full.
* `message_routing_mode`:
@@ -364,6 +368,7 @@ def create_producer(self, topic,
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
+ _check_type(int, max_pending_messages_across_partitions,
'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ def create_producer(self, topic,
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
message_listener=None,
receiver_queue_size=1000,
+ max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
@@ -434,6 +441,9 @@ def my_listener(consumer, message):
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.
+ * `max_total_receiver_queue_size_across_partitions`
+ Set the max total receiver queue size across partitions.
+ This setting will be used to reduce the receiver queue size for
individual partitions
* `consumer_name`:
Sets the consumer name.
* `unacked_messages_timeout_ms`:
@@ -450,6 +460,8 @@ def my_listener(consumer, message):
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
+ _check_type(int, max_total_receiver_queue_size_across_partitions,
+ 'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms,
'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms,
'broker_consumer_stats_cache_time_ms')
@@ -461,6 +473,7 @@ def my_listener(consumer, message):
if message_listener:
conf.message_listener(message_listener)
conf.receiver_queue_size(receiver_queue_size)
+
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index 9149addc6f..9deee9af78 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -108,6 +108,8 @@ void export_config() {
.def("compression_type",
&ProducerConfiguration::setCompressionType, return_self<>())
.def("max_pending_messages",
&ProducerConfiguration::getMaxPendingMessages)
.def("max_pending_messages",
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
+ .def("max_pending_messages_across_partitions",
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+ .def("max_pending_messages_across_partitions",
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
.def("block_if_queue_full",
&ProducerConfiguration::getBlockIfQueueFull)
.def("block_if_queue_full",
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode",
&ProducerConfiguration::getPartitionsRoutingMode)
@@ -128,6 +130,8 @@ void export_config() {
.def("message_listener",
&ConsumerConfiguration_setMessageListener, return_self<>())
.def("receiver_queue_size",
&ConsumerConfiguration::getReceiverQueueSize)
.def("receiver_queue_size",
&ConsumerConfiguration::setReceiverQueueSize)
+ .def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+ .def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
.def("consumer_name", &ConsumerConfiguration::getConsumerName,
return_value_policy<copy_const_reference>())
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
.def("unacked_messages_timeout_ms",
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eedc09..ab3324764f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,7 +64,7 @@
// Map <topic+partition, consumer>, when get do ACK, consumer will by find
by topic name
private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
- // Map <topic, partitionNumber>, store partition number for each topic
+ // Map <topic, numPartitions>, store partition number for each topic
protected final ConcurrentHashMap<String, Integer> topics;
// Queue of partition consumers on which we have stopped calling
receiveAsync() because the
@@ -665,24 +665,29 @@ private boolean topicNameValid(String topicName) {
return subscribeResult;
}
- private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int partitionNumber) {
+ private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int numPartitions) {
if (log.isDebugEnabled()) {
- log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, partitionNumber);
+ log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, numPartitions);
}
List<CompletableFuture<Consumer<T>>> futureList;
- if (partitionNumber > 1) {
- this.topics.putIfAbsent(topicName, partitionNumber);
- allTopicPartitionsNumber.addAndGet(partitionNumber);
+ if (numPartitions > 1) {
+ this.topics.putIfAbsent(topicName, numPartitions);
+ allTopicPartitionsNumber.addAndGet(numPartitions);
+
+ int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+ conf.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions);
+ ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
+ configurationData.setReceiverQueueSize(receiverQueueSize);
futureList = IntStream
- .range(0, partitionNumber)
+ .range(0, numPartitions)
.mapToObj(
partitionIndex -> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
+ ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture, schema);
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
return subFuture;
@@ -727,7 +732,7 @@ private void
subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
subscribeResult.complete(null);
log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
- topic, subscription, topicName, partitionNumber,
allTopicPartitionsNumber.get());
+ topic, subscription, topicName, numPartitions,
allTopicPartitionsNumber.get());
if (this.namespaceName == null) {
this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services