jiazhai closed pull request #2451: Issue 2312: add python client multi-topics 
consumer support
URL: https://github.com/apache/incubator-pulsar/pull/2451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 1185812597..806c7e2032 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -503,6 +503,211 @@ def my_listener(consumer, message):
         self._consumers.append(c)
         return c
 
+    def subscribe_topics(self, topics, subscription_name,
+                         consumer_type=ConsumerType.Exclusive,
+                         message_listener=None,
+                         receiver_queue_size=1000,
+                         max_total_receiver_queue_size_across_partitions=50000,
+                         consumer_name=None,
+                         unacked_messages_timeout_ms=None,
+                         broker_consumer_stats_cache_time_ms=30000,
+                         is_read_compacted=False,
+                         properties=None
+                         ):
+        """
+        Subscribe to the given topics and subscription combination.
+
+        **Args**
+
+        * `topics`: The list name of the topics.
+        * `subscription`: The name of the subscription.
+
+        **Options**
+
+        * `consumer_type`:
+          Select the subscription type to be used when subscribing to the 
topic.
+        * `message_listener`:
+          Sets a message listener for the consumer. When the listener is set,
+          the application will receive messages through it. Calls to
+          `consumer.receive()` will not be allowed. The listener function needs
+          to accept (consumer, message), for example:
+
+                #!python
+                def my_listener(consumer, message):
+                    # process message
+                    consumer.acknowledge(message)
+
+        * `receiver_queue_size`:
+          Sets the size of the consumer receive queue. The consumer receive
+          queue controls how many messages can be accumulated by the consumer
+          before the application calls `receive()`. Using a higher value could
+          potentially increase the consumer throughput at the expense of higher
+          memory utilization. Setting the consumer queue size to zero decreases
+          the throughput of the consumer by disabling pre-fetching of messages.
+          This approach improves the message distribution on shared 
subscription
+          by pushing messages only to those consumers that are ready to process
+          them. Neither receive with timeout nor partitioned topics can be used
+          if the consumer queue size is zero. The `receive()` function call
+          should not be interrupted when the consumer queue size is zero. The
+          default value is 1000 messages and should work well for most use
+          cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
+        * `consumer_name`:
+          Sets the consumer name.
+        * `unacked_messages_timeout_ms`:
+          Sets the timeout in milliseconds for unacknowledged messages. The
+          timeout needs to be greater than 10 seconds. An exception is thrown 
if
+          the given value is less than 10 seconds. If a successful
+          acknowledgement is not sent within the timeout, all the 
unacknowledged
+          messages are redelivered.
+        * `broker_consumer_stats_cache_time_ms`:
+          Sets the time duration for which the broker-side consumer stats will
+          be cached in the client.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with 
a consumer
+          can be used for identify a consumer at broker side.
+        """
+        _check_type(list, topics, 'topics')
+        _check_type(str, subscription_name, 'subscription_name')
+        _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
+        _check_type_or_none(str, consumer_name, 'consumer_name')
+        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
+        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
+        _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type_or_none(dict, properties, 'properties')
+
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(message_listener)
+        conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        c = Consumer()
+        c._consumer = self._client.subscribe_topics(topics, subscription_name, 
conf)
+        c._client = self
+        self._consumers.append(c)
+        return c
+
+    def subscribe_pattern(self, topics_pattern, subscription_name,
+                          consumer_type=ConsumerType.Exclusive,
+                          message_listener=None,
+                          receiver_queue_size=1000,
+                          
max_total_receiver_queue_size_across_partitions=50000,
+                          consumer_name=None,
+                          unacked_messages_timeout_ms=None,
+                          broker_consumer_stats_cache_time_ms=30000,
+                          is_read_compacted=False,
+                          pattern_auto_discovery_period=60,
+                          properties=None
+                          ):
+        """
+        Subscribe to multiple topics, which match given regexPattern, under 
the same namespace.
+
+        **Args**
+
+        * `topics_pattern`: The regex pattern to match topics.
+        * `subscription`: The name of the subscription.
+
+        **Options**
+
+        * `consumer_type`:
+          Select the subscription type to be used when subscribing to the 
topic.
+        * `message_listener`:
+          Sets a message listener for the consumer. When the listener is set,
+          the application will receive messages through it. Calls to
+          `consumer.receive()` will not be allowed. The listener function needs
+          to accept (consumer, message), for example:
+
+                #!python
+                def my_listener(consumer, message):
+                    # process message
+                    consumer.acknowledge(message)
+
+        * `receiver_queue_size`:
+          Sets the size of the consumer receive queue. The consumer receive
+          queue controls how many messages can be accumulated by the consumer
+          before the application calls `receive()`. Using a higher value could
+          potentially increase the consumer throughput at the expense of higher
+          memory utilization. Setting the consumer queue size to zero decreases
+          the throughput of the consumer by disabling pre-fetching of messages.
+          This approach improves the message distribution on shared 
subscription
+          by pushing messages only to those consumers that are ready to process
+          them. Neither receive with timeout nor partitioned topics can be used
+          if the consumer queue size is zero. The `receive()` function call
+          should not be interrupted when the consumer queue size is zero. The
+          default value is 1000 messages and should work well for most use
+          cases.
+        * `max_total_receiver_queue_size_across_partitions`
+          Set the max total receiver queue size across partitions.
+          This setting will be used to reduce the receiver queue size for 
individual partitions
+        * `consumer_name`:
+          Sets the consumer name.
+        * `unacked_messages_timeout_ms`:
+          Sets the timeout in milliseconds for unacknowledged messages. The
+          timeout needs to be greater than 10 seconds. An exception is thrown 
if
+          the given value is less than 10 seconds. If a successful
+          acknowledgement is not sent within the timeout, all the 
unacknowledged
+          messages are redelivered.
+        * `broker_consumer_stats_cache_time_ms`:
+          Sets the time duration for which the broker-side consumer stats will
+          be cached in the client.
+        * `pattern_auto_discovery_period`:
+          Periods of seconds for consumer to auto discover match topics.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with 
a consumer
+          can be used for identify a consumer at broker side.
+        """
+        _check_type(str, topics_pattern, 'topics_pattern')
+        _check_type(str, subscription_name, 'subscription_name')
+        _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type(int, max_total_receiver_queue_size_across_partitions,
+                    'max_total_receiver_queue_size_across_partitions')
+        _check_type_or_none(str, consumer_name, 'consumer_name')
+        _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
+        _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
+        _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type(int, pattern_auto_discovery_period, 
'pattern_auto_discovery_period')
+        _check_type_or_none(dict, properties, 'properties')
+
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.read_compacted(is_read_compacted)
+        if message_listener:
+            conf.message_listener(message_listener)
+        conf.receiver_queue_size(receiver_queue_size)
+        
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        conf.pattern_auto_discovery_period(pattern_auto_discovery_period)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        c = Consumer()
+        c._consumer = self._client.subscribe_pattern(topics_pattern, 
subscription_name, conf)
+        c._client = self
+        self._consumers.append(c)
+        return c
+
     def create_reader(self, topic, start_message_id,
                       reader_listener=None,
                       receiver_queue_size=1000,
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 25564b7b67..7fd60e661a 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -650,6 +650,110 @@ def _v2_topics(self, url):
 
         client.close()
 
+    def test_topics_consumer(self):
+        client = Client(self.serviceUrl)
+        topic1 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-1'
+        topic2 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-2'
+        topic3 = 
'persistent://sample/standalone/ns/my-python-topics-consumer-3'
+        topics = [topic1, topic2, topic3]
+
+        url1 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-1/partitions'
+        url2 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-2/partitions'
+        url3 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-topics-consumer-3/partitions'
+
+        doHttpPut(url1, '2')
+        doHttpPut(url2, '3')
+        doHttpPut(url3, '4')
+
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer3 = client.create_producer(topic3)
+
+        consumer = client.subscribe_topics(topics,
+                                           'my-topics-consumer-sub',
+                                           consumer_type=ConsumerType.Shared,
+                                           receiver_queue_size=10
+                                           )
+
+        for i in range(100):
+            producer1.send('hello-1-%d' % i)
+
+        for i in range(100):
+            producer2.send('hello-2-%d' % i)
+
+        for i in range(100):
+            producer3.send('hello-3-%d' % i)
+
+
+        for i in range(300):
+            msg = consumer.receive()
+            consumer.acknowledge(msg)
+
+        try:
+        # No other messages should be received
+            consumer.receive(timeout_millis=500)
+            self.assertTrue(False)
+        except:
+            # Exception is expected
+            pass
+        client.close()
+
+    def test_topics_pattern_consumer(self):
+        client = Client(self.serviceUrl)
+
+        topics_pattern = 
'persistent://sample/standalone/ns/my-python-pattern-consumer.*'
+
+
+        consumer = client.subscribe_pattern(topics_pattern,
+                                            'my-pattern-consumer-sub',
+                                            consumer_type=ConsumerType.Shared,
+                                            receiver_queue_size=10,
+                                            pattern_auto_discovery_period=1
+                                            )
+
+        topic1 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-1'
+        topic2 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-2'
+        topic3 = 
'persistent://sample/standalone/ns/my-python-pattern-consumer-3'
+
+        url1 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-1/partitions'
+        url2 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-2/partitions'
+        url3 = self.adminUrl + 
'/admin/persistent/sample/standalone/ns/my-python-pattern-consumer-3/partitions'
+
+        doHttpPut(url1, '2')
+        doHttpPut(url2, '3')
+        doHttpPut(url3, '4')
+
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer3 = client.create_producer(topic3)
+
+        # wait enough time to trigger auto discovery
+        time.sleep(2)
+
+        for i in range(100):
+            producer1.send('hello-1-%d' % i)
+
+        for i in range(100):
+            producer2.send('hello-2-%d' % i)
+
+        for i in range(100):
+            producer3.send('hello-3-%d' % i)
+
+
+        for i in range(300):
+            msg = consumer.receive()
+            consumer.acknowledge(msg)
+
+        try:
+            # No other messages should be received
+            consumer.receive(timeout_millis=500)
+            self.assertTrue(False)
+        except:
+            # Exception is expected
+            pass
+        client.close()
+
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/python/src/client.cc 
b/pulsar-client-cpp/python/src/client.cc
index b5295acf26..4b6055a038 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -43,6 +43,39 @@ Consumer Client_subscribe(Client& client, const std::string& 
topic, const std::s
     return consumer;
 }
 
+Consumer Client_subscribe_topics(Client& client, boost::python::list& topics,
+                                 const std::string& subscriptionName, const 
ConsumerConfiguration& conf) {
+    Consumer consumer;
+    Result res;
+
+    std::vector<std::string> topics_vector;
+
+    for (int i = 0; i < len(topics); i ++) {
+        std::string content = boost::python::extract<std::string>(topics[i]);
+        topics_vector.push_back(content);
+    }
+
+    Py_BEGIN_ALLOW_THREADS
+        res = client.subscribe(topics_vector, subscriptionName, conf, 
consumer);
+    Py_END_ALLOW_THREADS
+
+    CHECK_RESULT(res);
+    return consumer;
+}
+
+Consumer Client_subscribe_pattern(Client& client, const std::string& 
topic_pattern, const std::string& subscriptionName,
+                                 const ConsumerConfiguration& conf) {
+    Consumer consumer;
+    Result res;
+
+    Py_BEGIN_ALLOW_THREADS
+        res = client.subscribeWithRegex(topic_pattern, subscriptionName, conf, 
consumer);
+    Py_END_ALLOW_THREADS
+
+    CHECK_RESULT(res);
+    return consumer;
+}
+
 Reader Client_createReader(Client& client, const std::string& topic,
                            const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
@@ -73,6 +106,8 @@ void export_client() {
     class_<Client>("Client", init<const std::string&, const 
ClientConfiguration& >())
             .def("create_producer", &Client_createProducer)
             .def("subscribe", &Client_subscribe)
+            .def("subscribe_topics", &Client_subscribe_topics)
+            .def("subscribe_pattern", &Client_subscribe_pattern)
             .def("create_reader", &Client_createReader)
             .def("close", &Client_close)
             .def("shutdown", &Client::shutdown)
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9626049255..c1bfeef9c8 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -139,6 +139,8 @@ void export_config() {
             .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
             .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
             .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
+            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
+            .def("pattern_auto_discovery_period", 
&ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
             .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
             .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
             .def("property", &ConsumerConfiguration::setProperty, 
return_self<>())


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to