This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 99ad32b [Python] Consolidated duplicated subscribe_*() methods into a
single one (#2580)
99ad32b is described below
commit 99ad32b68565e8ee25049741336e5f58571b078a
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Sep 19 10:14:59 2018 -0700
[Python] Consolidated duplicated subscribe_*() methods into a single one
(#2580)
* [Python] Consolidated duplicated subscribe_*() methods into a single one
* Fixed missing parameter pattern_auto_discovery_period
---
pulsar-client-cpp/python/pulsar/__init__.py | 229 +++-------------------------
pulsar-client-cpp/python/pulsar_test.py | 26 ++--
2 files changed, 35 insertions(+), 220 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 806c7e2..6849ecc 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -104,6 +104,9 @@ from pulsar.functions.function import Function
from pulsar.functions.context import Context
from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
+import re
+_retype = type(re.compile('x'))
+
class MessageId:
"""
Represents a message id
@@ -412,114 +415,19 @@ class Client:
unacked_messages_timeout_ms=None,
broker_consumer_stats_cache_time_ms=30000,
is_read_compacted=False,
- properties=None
+ properties=None,
+ pattern_auto_discovery_period=60
):
"""
Subscribe to the given topic and subscription combination.
**Args**
- * `topic`: The name of the topic.
- * `subscription`: The name of the subscription.
-
- **Options**
-
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the
topic.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
-
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared
subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for
individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown
if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the
unacknowledged
- messages are redelivered.
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
- * `properties`:
- Sets the properties for the consumer. The properties associated with
a consumer
- can be used for identify a consumer at broker side.
- """
- _check_type(str, topic, 'topic')
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms,
'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms,
'broker_consumer_stats_cache_time_ms')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
-
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(message_listener)
- conf.receiver_queue_size(receiver_queue_size)
-
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
-
- c = Consumer()
- c._consumer = self._client.subscribe(topic, subscription_name, conf)
- c._client = self
- self._consumers.append(c)
- return c
-
- def subscribe_topics(self, topics, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- message_listener=None,
- receiver_queue_size=1000,
- max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- is_read_compacted=False,
- properties=None
- ):
- """
- Subscribe to the given topics and subscription combination.
-
- **Args**
-
- * `topics`: The list name of the topics.
+ * `topic`: The name of the topic, list of topics or regex pattern.
+ This method will accept these forms:
+ - `topic='my-topic'`
+ - `topic=['topic-1', 'topic-2', 'topic-3']`
+ - `topic=re.compile('topic-.*')`
* `subscription`: The name of the subscription.
**Options**
@@ -568,111 +476,9 @@ class Client:
* `properties`:
Sets the properties for the consumer. The properties associated with
a consumer
can be used for identify a consumer at broker side.
- """
- _check_type(list, topics, 'topics')
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms,
'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms,
'broker_consumer_stats_cache_time_ms')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
-
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(message_listener)
- conf.receiver_queue_size(receiver_queue_size)
-
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
-
- c = Consumer()
- c._consumer = self._client.subscribe_topics(topics, subscription_name,
conf)
- c._client = self
- self._consumers.append(c)
- return c
-
- def subscribe_pattern(self, topics_pattern, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- message_listener=None,
- receiver_queue_size=1000,
-
max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- is_read_compacted=False,
- pattern_auto_discovery_period=60,
- properties=None
- ):
- """
- Subscribe to multiple topics, which match given regexPattern, under
the same namespace.
-
- **Args**
-
- * `topics_pattern`: The regex pattern to match topics.
- * `subscription`: The name of the subscription.
-
- **Options**
-
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the
topic.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
-
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared
subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for
individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown
if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the
unacknowledged
- messages are redelivered.
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
* `pattern_auto_discovery_period`:
Periods of seconds for consumer to auto discover match topics.
- * `properties`:
- Sets the properties for the consumer. The properties associated with
a consumer
- can be used for identify a consumer at broker side.
"""
- _check_type(str, topics_pattern, 'topics_pattern')
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
@@ -682,7 +488,6 @@ class Client:
_check_type_or_none(int, unacked_messages_timeout_ms,
'unacked_messages_timeout_ms')
_check_type(int, broker_consumer_stats_cache_time_ms,
'broker_consumer_stats_cache_time_ms')
_check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type(int, pattern_auto_discovery_period,
'pattern_auto_discovery_period')
_check_type_or_none(dict, properties, 'properties')
conf = _pulsar.ConsumerConfiguration()
@@ -697,13 +502,23 @@ class Client:
if unacked_messages_timeout_ms:
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- conf.pattern_auto_discovery_period(pattern_auto_discovery_period)
if properties:
for k, v in properties.items():
conf.property(k, v)
c = Consumer()
- c._consumer = self._client.subscribe_pattern(topics_pattern,
subscription_name, conf)
+ if isinstance(topic, str):
+ # Single topic
+ c._consumer = self._client.subscribe(topic, subscription_name,
conf)
+ elif isinstance(topic, list):
+ # List of topics
+ c._consumer = self._client.subscribe_topics(topic,
subscription_name, conf)
+ elif isinstance(topic, _retype):
+ # Regex pattern
+ c._consumer = self._client.subscribe_pattern(topic.pattern,
subscription_name, conf)
+ else:
+ raise ValueError("Argument 'topic' is expected to be of a type
between (str, list, re.pattern)")
+
c._client = self
self._consumers.append(c)
return c
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index 7fd60e6..d5163bc 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -669,11 +669,11 @@ class PulsarTest(TestCase):
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
- consumer = client.subscribe_topics(topics,
- 'my-topics-consumer-sub',
- consumer_type=ConsumerType.Shared,
- receiver_queue_size=10
- )
+ consumer = client.subscribe(topics,
+ 'my-topics-consumer-sub',
+ consumer_type=ConsumerType.Shared,
+ receiver_queue_size=10
+ )
for i in range(100):
producer1.send('hello-1-%d' % i)
@@ -699,18 +699,11 @@ class PulsarTest(TestCase):
client.close()
def test_topics_pattern_consumer(self):
+ import re
client = Client(self.serviceUrl)
topics_pattern =
'persistent://sample/standalone/ns/my-python-pattern-consumer.*'
-
- consumer = client.subscribe_pattern(topics_pattern,
- 'my-pattern-consumer-sub',
- consumer_type=ConsumerType.Shared,
- receiver_queue_size=10,
- pattern_auto_discovery_period=1
- )
-
topic1 =
'persistent://sample/standalone/ns/my-python-pattern-consumer-1'
topic2 =
'persistent://sample/standalone/ns/my-python-pattern-consumer-2'
topic3 =
'persistent://sample/standalone/ns/my-python-pattern-consumer-3'
@@ -727,6 +720,13 @@ class PulsarTest(TestCase):
producer2 = client.create_producer(topic2)
producer3 = client.create_producer(topic3)
+ consumer = client.subscribe(re.compile(topics_pattern),
+ 'my-pattern-consumer-sub',
+ consumer_type = ConsumerType.Shared,
+ receiver_queue_size = 10,
+ pattern_auto_discovery_period = 1
+ )
+
# wait enough time to trigger auto discovery
time.sleep(2)