This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 296830a [cpp] receiver queue size config acorss partitions in
multi-topics-consumer (#2311)
296830a is described below
commit 296830a37d82de65d30dfd74d9a5893033056f18
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Aug 14 06:05:59 2018 +0800
[cpp] receiver queue size config acorss partitions in multi-topics-consumer
(#2311)
* catch up receiver queue size support in multi topics consumer
* add python config
---
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 5 ++++-
pulsar-client-cpp/python/pulsar/__init__.py | 13 ++++++++++++
pulsar-client-cpp/python/src/config.cc | 4 ++++
.../client/impl/MultiTopicsConsumerImpl.java | 23 +++++++++++++---------
4 files changed, 35 insertions(+), 10 deletions(-)
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 7be197c..6750273 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -160,9 +160,12 @@ void
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
config.setMessageListener(
boost::bind(&MultiTopicsConsumerImpl::messageReceived,
shared_from_this(), _1, _2));
- config.setReceiverQueueSize(conf_.getReceiverQueueSize());
int numPartitions = partitionMetadata->getPartitions() >= 1 ?
partitionMetadata->getPartitions() : 1;
+ // Apply total limit of receiver queue size across partitions
+ config.setReceiverQueueSize(
+ std::min(conf_.getReceiverQueueSize(),
+ (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions)));
Lock lock(mutex_);
topicsPartitions_.insert(std::make_pair(topicName->toString(),
numPartitions));
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 434fb07..f3b560b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -315,6 +315,7 @@ class Client:
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
max_pending_messages=1000,
+ max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_max_messages=1000,
@@ -352,6 +353,9 @@ class Client:
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
+ * `max_pending_messages_across_partitions`:
+ Set the max size of the queue holding the messages pending to receive
+ an acknowledgment across partitions from the broker.
* `block_if_queue_full`: Set whether `send_async` operations should
block when the outgoing message queue is full.
* `message_routing_mode`:
@@ -364,6 +368,7 @@ class Client:
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
_check_type(int, max_pending_messages, 'max_pending_messages')
+ _check_type(int, max_pending_messages_across_partitions,
'max_pending_messages_across_partitions')
_check_type(bool, block_if_queue_full, 'block_if_queue_full')
_check_type(bool, batching_enabled, 'batching_enabled')
_check_type(int, batching_max_messages, 'batching_max_messages')
@@ -374,6 +379,7 @@ class Client:
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
conf.block_if_queue_full(block_if_queue_full)
conf.batching_enabled(batching_enabled)
conf.batching_max_messages(batching_max_messages)
@@ -392,6 +398,7 @@ class Client:
consumer_type=ConsumerType.Exclusive,
message_listener=None,
receiver_queue_size=1000,
+ max_total_receiver_queue_size_across_partitions=50000,
consumer_name=None,
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
@@ -434,6 +441,9 @@ class Client:
should not be interrupted when the consumer queue size is zero. The
default value is 1000 messages and should work well for most use
cases.
+ * `max_total_receiver_queue_size_across_partitions`
+ Set the max total receiver queue size across partitions.
+ This setting will be used to reduce the receiver queue size for
individual partitions
* `consumer_name`:
Sets the consumer name.
* `unacked_messages_timeout_ms`:
@@ -450,6 +460,8 @@ class Client:
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
+ _check_type(int, max_total_receiver_queue_size_across_partitions,
+ 'max_total_receiver_queue_size_across_partitions')
_check_type_or_none(str, consumer_name, 'consumer_name')
_check_type_or_none(int, unacked_messages_timeout_ms,
'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms,
'broker_consumer_stats_cache_time_ms')
@@ -461,6 +473,7 @@ class Client:
if message_listener:
conf.message_listener(message_listener)
conf.receiver_queue_size(receiver_queue_size)
+
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
conf.consumer_name(consumer_name)
if unacked_messages_timeout_ms:
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index 9149add..9deee9a 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -108,6 +108,8 @@ void export_config() {
.def("compression_type",
&ProducerConfiguration::setCompressionType, return_self<>())
.def("max_pending_messages",
&ProducerConfiguration::getMaxPendingMessages)
.def("max_pending_messages",
&ProducerConfiguration::setMaxPendingMessages, return_self<>())
+ .def("max_pending_messages_across_partitions",
&ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
+ .def("max_pending_messages_across_partitions",
&ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
.def("block_if_queue_full",
&ProducerConfiguration::getBlockIfQueueFull)
.def("block_if_queue_full",
&ProducerConfiguration::setBlockIfQueueFull, return_self<>())
.def("partitions_routing_mode",
&ProducerConfiguration::getPartitionsRoutingMode)
@@ -128,6 +130,8 @@ void export_config() {
.def("message_listener",
&ConsumerConfiguration_setMessageListener, return_self<>())
.def("receiver_queue_size",
&ConsumerConfiguration::getReceiverQueueSize)
.def("receiver_queue_size",
&ConsumerConfiguration::setReceiverQueueSize)
+ .def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
+ .def("max_total_receiver_queue_size_across_partitions",
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
.def("consumer_name", &ConsumerConfiguration::getConsumerName,
return_value_policy<copy_const_reference>())
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
.def("unacked_messages_timeout_ms",
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4a0c449..bbfb3f3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -64,7 +64,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// Map <topic+partition, consumer>, when get do ACK, consumer will by find
by topic name
private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
- // Map <topic, partitionNumber>, store partition number for each topic
+ // Map <topic, numPartitions>, store partition number for each topic
protected final ConcurrentHashMap<String, Integer> topics;
// Queue of partition consumers on which we have stopped calling
receiveAsync() because the
@@ -670,24 +670,29 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return subscribeResult;
}
- private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int partitionNumber) {
+ private void subscribeTopicPartitions(CompletableFuture<Void>
subscribeResult, String topicName, int numPartitions) {
if (log.isDebugEnabled()) {
- log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, partitionNumber);
+ log.debug("Subscribe to topic {} metadata.partitions: {}",
topicName, numPartitions);
}
List<CompletableFuture<Consumer<T>>> futureList;
- if (partitionNumber > 1) {
- this.topics.putIfAbsent(topicName, partitionNumber);
- allTopicPartitionsNumber.addAndGet(partitionNumber);
+ if (numPartitions > 1) {
+ this.topics.putIfAbsent(topicName, numPartitions);
+ allTopicPartitionsNumber.addAndGet(numPartitions);
+
+ int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
+ conf.getMaxTotalReceiverQueueSizeAcrossPartitions() /
numPartitions);
+ ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
+ configurationData.setReceiverQueueSize(receiverQueueSize);
futureList = IntStream
- .range(0, partitionNumber)
+ .range(0, numPartitions)
.mapToObj(
partitionIndex -> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, internalConfig,
+ ConsumerImpl<T> newConsumer = new
ConsumerImpl<>(client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture, schema);
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
return subFuture;
@@ -732,7 +737,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
subscribeResult.complete(null);
log.info("[{}] [{}] Success subscribe new topic {} in
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
- topic, subscription, topicName, partitionNumber,
allTopicPartitionsNumber.get());
+ topic, subscription, topicName, numPartitions,
allTopicPartitionsNumber.get());
if (this.namespaceName == null) {
this.namespaceName =
TopicName.get(topicName).getNamespaceObject();
}