This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 99ad32b [Python] Consolidated duplicated subscribe_*() methods into a single one (#2580) 99ad32b is described below commit 99ad32b68565e8ee25049741336e5f58571b078a Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Sep 19 10:14:59 2018 -0700 [Python] Consolidated duplicated subscribe_*() methods into a single one (#2580) * [Python] Consolidated duplicated subscribe_*() methods into a single one * Fixed missing parameter pattern_auto_discovery_period --- pulsar-client-cpp/python/pulsar/__init__.py | 229 +++------------------------- pulsar-client-cpp/python/pulsar_test.py | 26 ++-- 2 files changed, 35 insertions(+), 220 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 806c7e2..6849ecc 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -104,6 +104,9 @@ from pulsar.functions.function import Function from pulsar.functions.context import Context from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe +import re +_retype = type(re.compile('x')) + class MessageId: """ Represents a message id @@ -412,114 +415,19 @@ class Client: unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, is_read_compacted=False, - properties=None + properties=None, + pattern_auto_discovery_period=60 ): """ Subscribe to the given topic and subscription combination. **Args** - * `topic`: The name of the topic. - * `subscription`: The name of the subscription. - - **Options** - - * `consumer_type`: - Select the subscription type to be used when subscribing to the topic. - * `message_listener`: - Sets a message listener for the consumer. When the listener is set, - the application will receive messages through it. Calls to - `consumer.receive()` will not be allowed. The listener function needs - to accept (consumer, message), for example: - - #!python - def my_listener(consumer, message): - # process message - consumer.acknowledge(message) - - * `receiver_queue_size`: - Sets the size of the consumer receive queue. The consumer receive - queue controls how many messages can be accumulated by the consumer - before the application calls `receive()`. Using a higher value could - potentially increase the consumer throughput at the expense of higher - memory utilization. Setting the consumer queue size to zero decreases - the throughput of the consumer by disabling pre-fetching of messages. - This approach improves the message distribution on shared subscription - by pushing messages only to those consumers that are ready to process - them. Neither receive with timeout nor partitioned topics can be used - if the consumer queue size is zero. The `receive()` function call - should not be interrupted when the consumer queue size is zero. The - default value is 1000 messages and should work well for most use - cases. - * `max_total_receiver_queue_size_across_partitions` - Set the max total receiver queue size across partitions. - This setting will be used to reduce the receiver queue size for individual partitions - * `consumer_name`: - Sets the consumer name. - * `unacked_messages_timeout_ms`: - Sets the timeout in milliseconds for unacknowledged messages. The - timeout needs to be greater than 10 seconds. An exception is thrown if - the given value is less than 10 seconds. If a successful - acknowledgement is not sent within the timeout, all the unacknowledged - messages are redelivered. - * `broker_consumer_stats_cache_time_ms`: - Sets the time duration for which the broker-side consumer stats will - be cached in the client. - * `properties`: - Sets the properties for the consumer. The properties associated with a consumer - can be used for identify a consumer at broker side. - """ - _check_type(str, topic, 'topic') - _check_type(str, subscription_name, 'subscription_name') - _check_type(ConsumerType, consumer_type, 'consumer_type') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type(int, max_total_receiver_queue_size_across_partitions, - 'max_total_receiver_queue_size_across_partitions') - _check_type_or_none(str, consumer_name, 'consumer_name') - _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') - _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(dict, properties, 'properties') - - conf = _pulsar.ConsumerConfiguration() - conf.consumer_type(consumer_type) - conf.read_compacted(is_read_compacted) - if message_listener: - conf.message_listener(message_listener) - conf.receiver_queue_size(receiver_queue_size) - conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) - if consumer_name: - conf.consumer_name(consumer_name) - if unacked_messages_timeout_ms: - conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) - conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) - if properties: - for k, v in properties.items(): - conf.property(k, v) - - c = Consumer() - c._consumer = self._client.subscribe(topic, subscription_name, conf) - c._client = self - self._consumers.append(c) - return c - - def subscribe_topics(self, topics, subscription_name, - consumer_type=ConsumerType.Exclusive, - message_listener=None, - receiver_queue_size=1000, - max_total_receiver_queue_size_across_partitions=50000, - consumer_name=None, - unacked_messages_timeout_ms=None, - broker_consumer_stats_cache_time_ms=30000, - is_read_compacted=False, - properties=None - ): - """ - Subscribe to the given topics and subscription combination. - - **Args** - - * `topics`: The list name of the topics. + * `topic`: The name of the topic, list of topics or regex pattern. + This method will accept these forms: + - `topic='my-topic'` + - `topic=['topic-1', 'topic-2', 'topic-3']` + - `topic=re.compile('topic-.*')` * `subscription`: The name of the subscription. **Options** @@ -568,111 +476,9 @@ class Client: * `properties`: Sets the properties for the consumer. The properties associated with a consumer can be used for identify a consumer at broker side. - """ - _check_type(list, topics, 'topics') - _check_type(str, subscription_name, 'subscription_name') - _check_type(ConsumerType, consumer_type, 'consumer_type') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type(int, max_total_receiver_queue_size_across_partitions, - 'max_total_receiver_queue_size_across_partitions') - _check_type_or_none(str, consumer_name, 'consumer_name') - _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') - _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(dict, properties, 'properties') - - conf = _pulsar.ConsumerConfiguration() - conf.consumer_type(consumer_type) - conf.read_compacted(is_read_compacted) - if message_listener: - conf.message_listener(message_listener) - conf.receiver_queue_size(receiver_queue_size) - conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) - if consumer_name: - conf.consumer_name(consumer_name) - if unacked_messages_timeout_ms: - conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) - conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) - if properties: - for k, v in properties.items(): - conf.property(k, v) - - c = Consumer() - c._consumer = self._client.subscribe_topics(topics, subscription_name, conf) - c._client = self - self._consumers.append(c) - return c - - def subscribe_pattern(self, topics_pattern, subscription_name, - consumer_type=ConsumerType.Exclusive, - message_listener=None, - receiver_queue_size=1000, - max_total_receiver_queue_size_across_partitions=50000, - consumer_name=None, - unacked_messages_timeout_ms=None, - broker_consumer_stats_cache_time_ms=30000, - is_read_compacted=False, - pattern_auto_discovery_period=60, - properties=None - ): - """ - Subscribe to multiple topics, which match given regexPattern, under the same namespace. - - **Args** - - * `topics_pattern`: The regex pattern to match topics. - * `subscription`: The name of the subscription. - - **Options** - - * `consumer_type`: - Select the subscription type to be used when subscribing to the topic. - * `message_listener`: - Sets a message listener for the consumer. When the listener is set, - the application will receive messages through it. Calls to - `consumer.receive()` will not be allowed. The listener function needs - to accept (consumer, message), for example: - - #!python - def my_listener(consumer, message): - # process message - consumer.acknowledge(message) - - * `receiver_queue_size`: - Sets the size of the consumer receive queue. The consumer receive - queue controls how many messages can be accumulated by the consumer - before the application calls `receive()`. Using a higher value could - potentially increase the consumer throughput at the expense of higher - memory utilization. Setting the consumer queue size to zero decreases - the throughput of the consumer by disabling pre-fetching of messages. - This approach improves the message distribution on shared subscription - by pushing messages only to those consumers that are ready to process - them. Neither receive with timeout nor partitioned topics can be used - if the consumer queue size is zero. The `receive()` function call - should not be interrupted when the consumer queue size is zero. The - default value is 1000 messages and should work well for most use - cases. - * `max_total_receiver_queue_size_across_partitions` - Set the max total receiver queue size across partitions. - This setting will be used to reduce the receiver queue size for individual partitions - * `consumer_name`: - Sets the consumer name. - * `unacked_messages_timeout_ms`: - Sets the timeout in milliseconds for unacknowledged messages. The - timeout needs to be greater than 10 seconds. An exception is thrown if - the given value is less than 10 seconds. If a successful - acknowledgement is not sent within the timeout, all the unacknowledged - messages are redelivered. - * `broker_consumer_stats_cache_time_ms`: - Sets the time duration for which the broker-side consumer stats will - be cached in the client. * `pattern_auto_discovery_period`: Periods of seconds for consumer to auto discover match topics. - * `properties`: - Sets the properties for the consumer. The properties associated with a consumer - can be used for identify a consumer at broker side. """ - _check_type(str, topics_pattern, 'topics_pattern') _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') _check_type(int, receiver_queue_size, 'receiver_queue_size') @@ -682,7 +488,6 @@ class Client: _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') _check_type_or_none(dict, properties, 'properties') conf = _pulsar.ConsumerConfiguration() @@ -697,13 +502,23 @@ class Client: if unacked_messages_timeout_ms: conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) - conf.pattern_auto_discovery_period(pattern_auto_discovery_period) if properties: for k, v in properties.items(): conf.property(k, v) c = Consumer() - c._consumer = self._client.subscribe_pattern(topics_pattern, subscription_name, conf) + if isinstance(topic, str): + # Single topic + c._consumer = self._client.subscribe(topic, subscription_name, conf) + elif isinstance(topic, list): + # List of topics + c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) + elif isinstance(topic, _retype): + # Regex pattern + c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) + else: + raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") + c._client = self self._consumers.append(c) return c diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 7fd60e6..d5163bc 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -669,11 +669,11 @@ class PulsarTest(TestCase): producer2 = client.create_producer(topic2) producer3 = client.create_producer(topic3) - consumer = client.subscribe_topics(topics, - 'my-topics-consumer-sub', - consumer_type=ConsumerType.Shared, - receiver_queue_size=10 - ) + consumer = client.subscribe(topics, + 'my-topics-consumer-sub', + consumer_type=ConsumerType.Shared, + receiver_queue_size=10 + ) for i in range(100): producer1.send('hello-1-%d' % i) @@ -699,18 +699,11 @@ class PulsarTest(TestCase): client.close() def test_topics_pattern_consumer(self): + import re client = Client(self.serviceUrl) topics_pattern = 'persistent://sample/standalone/ns/my-python-pattern-consumer.*' - - consumer = client.subscribe_pattern(topics_pattern, - 'my-pattern-consumer-sub', - consumer_type=ConsumerType.Shared, - receiver_queue_size=10, - pattern_auto_discovery_period=1 - ) - topic1 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-1' topic2 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-2' topic3 = 'persistent://sample/standalone/ns/my-python-pattern-consumer-3' @@ -727,6 +720,13 @@ class PulsarTest(TestCase): producer2 = client.create_producer(topic2) producer3 = client.create_producer(topic3) + consumer = client.subscribe(re.compile(topics_pattern), + 'my-pattern-consumer-sub', + consumer_type = ConsumerType.Shared, + receiver_queue_size = 10, + pattern_auto_discovery_period = 1 + ) + # wait enough time to trigger auto discovery time.sleep(2)