This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99ad32b  [Python] Consolidated duplicated subscribe_*() methods into a 
single one (#2580)
99ad32b is described below

commit 99ad32b68565e8ee25049741336e5f58571b078a
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Sep 19 10:14:59 2018 -0700

    [Python] Consolidated duplicated subscribe_*() methods into a single one 
(#2580)
    
    * [Python] Consolidated duplicated subscribe_*() methods into a single one
    
    * Fixed missing parameter pattern_auto_discovery_period
---
 pulsar-client-cpp/python/pulsar/__init__.py | 229 +++-------------------------
 pulsar-client-cpp/python/pulsar_test.py     |  26 ++--
 2 files changed, 35 insertions(+), 220 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 806c7e2..6849ecc 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -104,6 +104,9 @@ from pulsar.functions.function import Function
 from pulsar.functions.context import Context
 from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
 
+import re
+_retype = type(re.compile('x'))
+
 class MessageId:
     """
     Represents a message id
@@ -412,114 +415,19 @@ class Client:
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
                   is_read_compacted=False,
-                  properties=None
+                  properties=None,
+                  pattern_auto_discovery_period=60
                   ):
         """
         Subscribe to the given topic and subscription combination.
 
         **Args**
 
-        * `topic`: The name of the topic.
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the 
topic.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared 
subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for 
individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown 
if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the 
unacknowledged
-          messages are redelivered.
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with 
a consumer
-          can be used for identify a consumer at broker side.
-        """
-        _check_type(str, topic, 'topic')
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(message_listener)
-        conf.receiver_queue_size(receiver_queue_size)
-        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        c = Consumer()
-        c._consumer = self._client.subscribe(topic, subscription_name, conf)
-        c._client = self
-        self._consumers.append(c)
-        return c
-
-    def subscribe_topics(self, topics, subscription_name,
-                         consumer_type=ConsumerType.Exclusive,
-                         message_listener=None,
-                         receiver_queue_size=1000,
-                         max_total_receiver_queue_size_across_partitions=50000,
-                         consumer_name=None,
-                         unacked_messages_timeout_ms=None,
-                         broker_consumer_stats_cache_time_ms=30000,
-                         is_read_compacted=False,
-                         properties=None
-                         ):
-        """
-        Subscribe to the given topics and subscription combination.
-
-        **Args**
-
-        * `topics`: The list name of the topics.
+        * `topic`: The name of the topic, list of topics or regex pattern.
+                  This method will accept these forms:
+                    - `topic='my-topic'`
+                    - `topic=['topic-1', 'topic-2', 'topic-3']`
+                    - `topic=re.compile('topic-.*')`
         * `subscription`: The name of the subscription.
 
         **Options**
@@ -568,111 +476,9 @@ class Client:
         * `properties`:
           Sets the properties for the consumer. The properties associated with 
a consumer
           can be used for identify a consumer at broker side.
-        """
-        _check_type(list, topics, 'topics')
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(message_listener)
-        conf.receiver_queue_size(receiver_queue_size)
-        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        c = Consumer()
-        c._consumer = self._client.subscribe_topics(topics, subscription_name, 
conf)
-        c._client = self
-        self._consumers.append(c)
-        return c
-
-    def subscribe_pattern(self, topics_pattern, subscription_name,
-                          consumer_type=ConsumerType.Exclusive,
-                          message_listener=None,
-                          receiver_queue_size=1000,
-                          
max_total_receiver_queue_size_across_partitions=50000,
-                          consumer_name=None,
-                          unacked_messages_timeout_ms=None,
-                          broker_consumer_stats_cache_time_ms=30000,
-                          is_read_compacted=False,
-                          pattern_auto_discovery_period=60,
-                          properties=None
-                          ):
-        """
-        Subscribe to multiple topics, which match given regexPattern, under 
the same namespace.
-
-        **Args**
-
-        * `topics_pattern`: The regex pattern to match topics.
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the 
topic.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared 
subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for 
individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown 
if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the 
unacknowledged
-          messages are redelivered.
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
         * `pattern_auto_discovery_period`:
           Periods of seconds for consumer to auto discover match topics.
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with 
a consumer
-          can be used for identify a consumer at broker side.
         """
-        _check_type(str, topics_pattern, 'topics_pattern')
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
@@ -682,7 +488,6 @@ class Client:
         _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
         _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type(int, pattern_auto_discovery_period, 
'pattern_auto_discovery_period')
         _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ConsumerConfiguration()
@@ -697,13 +502,23 @@ class Client:
         if unacked_messages_timeout_ms:
             conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
         
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        conf.pattern_auto_discovery_period(pattern_auto_discovery_period)
         if properties:
             for k, v in properties.items():
                 conf.property(k, v)
 
         c = Consumer()
-        c._consumer = self._client.subscribe_pattern(topics_pattern, 
subscription_name, conf)
+        if isinstance(topic, str):
+            # Single topic
+            c._consumer = self._client.subscribe(topic, subscription_name, 
conf)
+        elif isinstance(topic, list):
+            # List of topics
+            c._consumer = self._client.subscribe_topics(topic, 
subscription_name, conf)
+        elif isinstance(topic, _retype):
+            # Regex pattern
+            c._consumer = self._client.subscribe_pattern(topic.pattern, 
subscription_name, conf)
+        else:
+            raise ValueError("Argument 'topic' is expected to be of a type 
between (str, list, re.pattern)")
+
         c._client = self
         self._consumers.append(c)
         return c
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 7fd60e6..d5163bc 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -669,11 +669,11 @@ class PulsarTest(TestCase):
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
-        consumer = client.subscribe_topics(topics,
-                                           'my-topics-consumer-sub',
-                                           consumer_type=ConsumerType.Shared,
-                                           receiver_queue_size=10
-                                           )
+        consumer = client.subscribe(topics,
+                                    'my-topics-consumer-sub',
+                                    consumer_type=ConsumerType.Shared,
+                                    receiver_queue_size=10
+                                    )
 
         for i in range(100):
             producer1.send('hello-1-%d' % i)
@@ -699,18 +699,11 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_topics_pattern_consumer(self):
+        import re
         client = Client(self.serviceUrl)
 
         topics_pattern = 
'persistent://sample/standalone/ns/my-python-pattern-consumer.*'
 
-
-        consumer = client.subscribe_pattern(topics_pattern,
-                                            'my-pattern-consumer-sub',
-                                            consumer_type=ConsumerType.Shared,
-                                            receiver_queue_size=10,
-                                            pattern_auto_discovery_period=1
-                                            )
-
         topic1 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-1'
         topic2 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-2'
         topic3 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-3'
@@ -727,6 +720,13 @@ class PulsarTest(TestCase):
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
+        consumer = client.subscribe(re.compile(topics_pattern),
+                                    'my-pattern-consumer-sub',
+                                    consumer_type = ConsumerType.Shared,
+                                    receiver_queue_size = 10,
+                                    pattern_auto_discovery_period = 1
+                                   )
+
         # wait enough time to trigger auto discovery
         time.sleep(2)
 

Reply via email to