This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 19df7c3  feat: support pattern subscription non persistent topic. 
(#134)
19df7c3 is described below

commit 19df7c349cefbcc03a8ef9054c30deb53b03c249
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Aug 13 14:08:59 2023 +0800

    feat: support pattern subscription non persistent topic. (#134)
---
 pulsar/__init__.py   | 13 ++++++++-
 src/config.cc        |  2 ++
 src/enums.cc         |  5 ++++
 tests/pulsar_test.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 39c3cee..6a6ee6c 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,7 @@ from typing import List, Tuple, Optional
 import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, 
PartitionsRoutingMode, BatchingType, \
-    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, 
ProducerAccessMode  # noqa: F401
+    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, 
ProducerAccessMode, RegexSubscriptionMode  # noqa: F401
 
 from pulsar.__about__ import __version__
 
@@ -707,6 +707,7 @@ class Client:
                   batch_receive_policy=None,
                   key_shared_policy=None,
                   batch_index_ack_enabled=False,
+                  regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -796,6 +797,14 @@ class Client:
         batch_index_ack_enabled: Enable the batch index acknowledgement.
             It should be noted that this option can only work when the broker 
side also enables the batch index
             acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` 
config in `broker.conf`.
+        regex_subscription_mode: RegexSubscriptionMode, optional
+            Set the regex subscription mode for use when the topic is a regex 
pattern.
+
+            Supported modes:
+
+            * PersistentOnly: By default only subscribe to persistent topics.
+            * NonPersistentOnly: Only subscribe to non-persistent topics.
+            * AllTopics: Subscribe to both persistent and non-persistent 
topics.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -818,9 +827,11 @@ class Client:
         _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 
'batch_receive_policy')
         _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 
'key_shared_policy')
         _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
+        _check_type(RegexSubscriptionMode, regex_subscription_mode, 
'regex_subscription_mode')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
+        conf.regex_subscription_mode(regex_subscription_mode)
         conf.read_compacted(is_read_compacted)
         if message_listener:
             conf.message_listener(_listener_wrapper(message_listener, schema))
diff --git a/src/config.cc b/src/config.cc
index 4b661a9..23a5b80 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -265,6 +265,8 @@ void export_config(py::module_& m) {
         .def("property", &ConsumerConfiguration::setProperty, 
return_value_policy::reference)
         .def("subscription_initial_position", 
&ConsumerConfiguration::getSubscriptionInitialPosition)
         .def("subscription_initial_position", 
&ConsumerConfiguration::setSubscriptionInitialPosition)
+        .def("regex_subscription_mode", 
&ConsumerConfiguration::setRegexSubscriptionMode)
+        .def("regex_subscription_mode", 
&ConsumerConfiguration::getRegexSubscriptionMode, 
return_value_policy::reference)
         .def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader, 
return_value_policy::reference)
         .def("replicate_subscription_state_enabled",
              &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
diff --git a/src/enums.cc b/src/enums.cc
index 33affd0..198edfa 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -120,6 +120,11 @@ void export_enums(py::module_& m) {
         .value("Latest", InitialPositionLatest)
         .value("Earliest", InitialPositionEarliest);
 
+    enum_<RegexSubscriptionMode>(m, "RegexSubscriptionMode", "Regex 
subscription mode")
+        .value("PersistentOnly", PersistentOnly)
+        .value("NonPersistentOnly", NonPersistentOnly)
+        .value("AllTopics", AllTopics);
+
     enum_<ProducerConfiguration::BatchingType>(m, "BatchingType", "Supported 
batching types")
         .value("Default", ProducerConfiguration::DefaultBatching)
         .value("KeyBased", ProducerConfiguration::KeyBasedBatching);
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 903f143..80c98f5 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -24,6 +24,7 @@ import logging
 from unittest import TestCase, main
 import time
 import os
+import re
 import pulsar
 import uuid
 from datetime import timedelta
@@ -46,7 +47,7 @@ from pulsar import (
 )
 from pulsar.schema import JsonSchema, Record, Integer
 
-from _pulsar import ProducerConfiguration, ConsumerConfiguration
+from _pulsar import ProducerConfiguration, ConsumerConfiguration, 
RegexSubscriptionMode
 
 from schema_test import *
 
@@ -1100,7 +1101,6 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_topics_pattern_consumer(self):
-        import re
 
         client = Client(self.serviceUrl)
 
@@ -1717,6 +1717,77 @@ class PulsarTest(TestCase):
 
         client.close()
 
+    def test_regex_subscription(self):
+        client = Client(self.serviceUrl)
+        topic1 = "persistent://public/default/test-regex-sub-1"
+        topic2 = "persistent://public/default/test-regex-sub-2"
+        topic3 = "non-persistent://public/default/test-regex-sub-3"
+        topic4 = "persistent://public/default/no-match-test-regex-sub-3"  # no 
match pattern rule topic.
+
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer3 = client.create_producer(topic3)
+        producer4 = client.create_producer(topic4)
+
+        consumer_all = client.subscribe(
+            re.compile('public/default/test-regex-sub-.*'), "regex-sub-all",
+            consumer_type=ConsumerType.Shared, 
regex_subscription_mode=RegexSubscriptionMode.AllTopics
+        )
+
+        consumer_persistent = client.subscribe(
+            re.compile('public/default/test-regex-sub-.*'), 
"regex-sub-persistent",
+            consumer_type=ConsumerType.Shared, 
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly
+        )
+
+        consumer_non_persistent = client.subscribe(
+            re.compile('public/default/test-regex-sub-.*'), 
"regex-sub-non-persistent",
+            consumer_type=ConsumerType.Shared, 
regex_subscription_mode=RegexSubscriptionMode.NonPersistentOnly
+        )
+
+        num = 10
+        for i in range(num):
+            producer1.send(b"hello-1-%d" % i)
+            producer2.send(b"hello-2-%d" % i)
+            producer3.send(b"hello-3-%d" % i)
+            producer4.send(b"hello-4-%d" % i)
+
+        # Assert consumer_all.
+        received_topics = set()
+        for i in range(3 * num):
+            msg = consumer_all.receive(TM)
+            topic_name = msg.topic_name()
+            self.assertIn(topic_name, [topic1, topic2, topic3])
+            received_topics.add(topic_name)
+            consumer_all.acknowledge(msg)
+        self.assertEqual(received_topics, {topic1, topic2, topic3})
+        with self.assertRaises(pulsar.Timeout):
+            consumer_all.receive(100)
+
+        # Assert consumer_persistent.
+        received_topics.clear()
+        for i in range(2 * num):
+            msg = consumer_persistent.receive(TM)
+            topic_name = msg.topic_name()
+            self.assertIn(topic_name, [topic1, topic2])
+            received_topics.add(topic_name)
+            consumer_persistent.acknowledge(msg)
+        self.assertEqual(received_topics, {topic1, topic2})
+        with self.assertRaises(pulsar.Timeout):
+            consumer_persistent.receive(100)
+
+        # Assert consumer_non_persistent.
+        received_topics.clear()
+        for i in range(num):
+            msg = consumer_non_persistent.receive(TM)
+            topic_name = msg.topic_name()
+            self.assertIn(topic_name, [topic3])
+            received_topics.add(topic_name)
+            consumer_non_persistent.acknowledge(msg)
+        self.assertEqual(received_topics, {topic3})
+        with self.assertRaises(pulsar.Timeout):
+            consumer_non_persistent.receive(100)
+
+        client.close()
 
 if __name__ == "__main__":
     main()

Reply via email to