This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 19df7c3 feat: support pattern subscription non persistent topic.
(#134)
19df7c3 is described below
commit 19df7c349cefbcc03a8ef9054c30deb53b03c249
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Aug 13 14:08:59 2023 +0800
feat: support pattern subscription non persistent topic. (#134)
---
pulsar/__init__.py | 13 ++++++++-
src/config.cc | 2 ++
src/enums.cc | 5 ++++
tests/pulsar_test.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++--
4 files changed, 92 insertions(+), 3 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 39c3cee..6a6ee6c 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,7 @@ from typing import List, Tuple, Optional
import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition,
PartitionsRoutingMode, BatchingType, \
- LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode,
ProducerAccessMode # noqa: F401
+ LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode,
ProducerAccessMode, RegexSubscriptionMode # noqa: F401
from pulsar.__about__ import __version__
@@ -707,6 +707,7 @@ class Client:
batch_receive_policy=None,
key_shared_policy=None,
batch_index_ack_enabled=False,
+ regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
):
"""
Subscribe to the given topic and subscription combination.
@@ -796,6 +797,14 @@ class Client:
batch_index_ack_enabled: Enable the batch index acknowledgement.
It should be noted that this option can only work when the broker
side also enables the batch index
acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled`
config in `broker.conf`.
+ regex_subscription_mode: RegexSubscriptionMode, optional
+ Set the regex subscription mode for use when the topic is a regex
pattern.
+
+ Supported modes:
+
+ * PersistentOnly: By default only subscribe to persistent topics.
+ * NonPersistentOnly: Only subscribe to non-persistent topics.
+ * AllTopics: Subscribe to both persistent and non-persistent
topics.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -818,9 +827,11 @@ class Client:
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy,
'batch_receive_policy')
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy,
'key_shared_policy')
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
+ _check_type(RegexSubscriptionMode, regex_subscription_mode,
'regex_subscription_mode')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
+ conf.regex_subscription_mode(regex_subscription_mode)
conf.read_compacted(is_read_compacted)
if message_listener:
conf.message_listener(_listener_wrapper(message_listener, schema))
diff --git a/src/config.cc b/src/config.cc
index 4b661a9..23a5b80 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -265,6 +265,8 @@ void export_config(py::module_& m) {
.def("property", &ConsumerConfiguration::setProperty,
return_value_policy::reference)
.def("subscription_initial_position",
&ConsumerConfiguration::getSubscriptionInitialPosition)
.def("subscription_initial_position",
&ConsumerConfiguration::setSubscriptionInitialPosition)
+ .def("regex_subscription_mode",
&ConsumerConfiguration::setRegexSubscriptionMode)
+ .def("regex_subscription_mode",
&ConsumerConfiguration::getRegexSubscriptionMode,
return_value_policy::reference)
.def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader,
return_value_policy::reference)
.def("replicate_subscription_state_enabled",
&ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
diff --git a/src/enums.cc b/src/enums.cc
index 33affd0..198edfa 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -120,6 +120,11 @@ void export_enums(py::module_& m) {
.value("Latest", InitialPositionLatest)
.value("Earliest", InitialPositionEarliest);
+ enum_<RegexSubscriptionMode>(m, "RegexSubscriptionMode", "Regex
subscription mode")
+ .value("PersistentOnly", PersistentOnly)
+ .value("NonPersistentOnly", NonPersistentOnly)
+ .value("AllTopics", AllTopics);
+
enum_<ProducerConfiguration::BatchingType>(m, "BatchingType", "Supported
batching types")
.value("Default", ProducerConfiguration::DefaultBatching)
.value("KeyBased", ProducerConfiguration::KeyBasedBatching);
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 903f143..80c98f5 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -24,6 +24,7 @@ import logging
from unittest import TestCase, main
import time
import os
+import re
import pulsar
import uuid
from datetime import timedelta
@@ -46,7 +47,7 @@ from pulsar import (
)
from pulsar.schema import JsonSchema, Record, Integer
-from _pulsar import ProducerConfiguration, ConsumerConfiguration
+from _pulsar import ProducerConfiguration, ConsumerConfiguration,
RegexSubscriptionMode
from schema_test import *
@@ -1100,7 +1101,6 @@ class PulsarTest(TestCase):
client.close()
def test_topics_pattern_consumer(self):
- import re
client = Client(self.serviceUrl)
@@ -1717,6 +1717,77 @@ class PulsarTest(TestCase):
client.close()
+ def test_regex_subscription(self):
+ client = Client(self.serviceUrl)
+ topic1 = "persistent://public/default/test-regex-sub-1"
+ topic2 = "persistent://public/default/test-regex-sub-2"
+ topic3 = "non-persistent://public/default/test-regex-sub-3"
+ topic4 = "persistent://public/default/no-match-test-regex-sub-3" # no
match pattern rule topic.
+
+ producer1 = client.create_producer(topic1)
+ producer2 = client.create_producer(topic2)
+ producer3 = client.create_producer(topic3)
+ producer4 = client.create_producer(topic4)
+
+ consumer_all = client.subscribe(
+ re.compile('public/default/test-regex-sub-.*'), "regex-sub-all",
+ consumer_type=ConsumerType.Shared,
regex_subscription_mode=RegexSubscriptionMode.AllTopics
+ )
+
+ consumer_persistent = client.subscribe(
+ re.compile('public/default/test-regex-sub-.*'),
"regex-sub-persistent",
+ consumer_type=ConsumerType.Shared,
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly
+ )
+
+ consumer_non_persistent = client.subscribe(
+ re.compile('public/default/test-regex-sub-.*'),
"regex-sub-non-persistent",
+ consumer_type=ConsumerType.Shared,
regex_subscription_mode=RegexSubscriptionMode.NonPersistentOnly
+ )
+
+ num = 10
+ for i in range(num):
+ producer1.send(b"hello-1-%d" % i)
+ producer2.send(b"hello-2-%d" % i)
+ producer3.send(b"hello-3-%d" % i)
+ producer4.send(b"hello-4-%d" % i)
+
+ # Assert consumer_all.
+ received_topics = set()
+ for i in range(3 * num):
+ msg = consumer_all.receive(TM)
+ topic_name = msg.topic_name()
+ self.assertIn(topic_name, [topic1, topic2, topic3])
+ received_topics.add(topic_name)
+ consumer_all.acknowledge(msg)
+ self.assertEqual(received_topics, {topic1, topic2, topic3})
+ with self.assertRaises(pulsar.Timeout):
+ consumer_all.receive(100)
+
+ # Assert consumer_persistent.
+ received_topics.clear()
+ for i in range(2 * num):
+ msg = consumer_persistent.receive(TM)
+ topic_name = msg.topic_name()
+ self.assertIn(topic_name, [topic1, topic2])
+ received_topics.add(topic_name)
+ consumer_persistent.acknowledge(msg)
+ self.assertEqual(received_topics, {topic1, topic2})
+ with self.assertRaises(pulsar.Timeout):
+ consumer_persistent.receive(100)
+
+ # Assert consumer_non_persistent.
+ received_topics.clear()
+ for i in range(num):
+ msg = consumer_non_persistent.receive(TM)
+ topic_name = msg.topic_name()
+ self.assertIn(topic_name, [topic3])
+ received_topics.add(topic_name)
+ consumer_non_persistent.acknowledge(msg)
+ self.assertEqual(received_topics, {topic3})
+ with self.assertRaises(pulsar.Timeout):
+ consumer_non_persistent.receive(100)
+
+ client.close()
if __name__ == "__main__":
main()