This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new c50ada7 feat: Support dead letter topic. (#135)
c50ada7 is described below
commit c50ada731241f343c38ef18272e553110ca43718
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 15 19:16:07 2023 +0800
feat: Support dead letter topic. (#135)
* feat: Support dead letter topic.
* Fix non space
* Let maxRedeliverCount param required.
---
pulsar/__init__.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++-
src/config.cc | 19 +++++++++++++-
tests/pulsar_test.py | 44 ++++++++++++++++++++++++++++++++
3 files changed, 132 insertions(+), 2 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 6a6ee6c..8e0907e 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,8 @@ from typing import List, Tuple, Optional
import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition,
PartitionsRoutingMode, BatchingType, \
- LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode,
ProducerAccessMode, RegexSubscriptionMode # noqa: F401
+ LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode,
ProducerAccessMode, RegexSubscriptionMode, \
+ DeadLetterPolicyBuilder # noqa: F401
from pulsar.__about__ import __version__
@@ -374,6 +375,65 @@ class AuthenticationBasic(Authentication):
_check_type(str, method, 'method')
self.auth = _pulsar.AuthenticationBasic.create(username, password,
method)
+class ConsumerDeadLetterPolicy:
+ """
+ Configuration for the "dead letter queue" feature in consumer.
+ """
+ def __init__(self,
+ max_redeliver_count: int,
+ dead_letter_topic: str = None,
+ initial_subscription_name: str = None):
+ """
+ Wrapper DeadLetterPolicy.
+
+ Parameters
+ ----------
+ max_redeliver_count: Maximum number of times that a message is
redelivered before being sent to the dead letter queue.
+ - The maxRedeliverCount must be greater than 0.
+ dead_letter_topic: Name of the dead topic where the failing messages
are sent.
+ The default value is: sourceTopicName + "-" + subscriptionName +
"-DLQ"
+ initial_subscription_name: Name of the initial subscription name of
the dead letter topic.
+ If this field is not set, the initial subscription for the dead
letter topic is not created.
+ If this field is set but the broker's
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+ fails to be created.
+ """
+ builder = DeadLetterPolicyBuilder()
+ if max_redeliver_count is None or max_redeliver_count < 1:
+ raise ValueError("max_redeliver_count must be greater than 0")
+ builder.maxRedeliverCount(max_redeliver_count)
+ if dead_letter_topic is not None:
+ builder.deadLetterTopic(dead_letter_topic)
+ if initial_subscription_name is not None:
+ builder.initialSubscriptionName(initial_subscription_name)
+ self._policy = builder.build()
+
+ @property
+ def dead_letter_topic(self) -> str:
+ """
+ Return the dead letter topic for dead letter policy.
+ """
+ return self._policy.getDeadLetterTopic()
+
+ @property
+ def max_redeliver_count(self) -> int:
+ """
+ Return the max redeliver count for dead letter policy.
+ """
+ return self._policy.getMaxRedeliverCount()
+
+ @property
+ def initial_subscription_name(self) -> str:
+ """
+ Return the initial subscription name for dead letter policy.
+ """
+ return self._policy.getInitialSubscriptionName()
+
+ def policy(self):
+ """
+ Returns the actual one DeadLetterPolicy.
+ """
+ return self._policy
+
class Client:
"""
The Pulsar client. A single client instance can be used to create producers
@@ -708,6 +768,7 @@ class Client:
key_shared_policy=None,
batch_index_ack_enabled=False,
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
+ dead_letter_policy: ConsumerDeadLetterPolicy = None,
):
"""
Subscribe to the given topic and subscription combination.
@@ -805,6 +866,12 @@ class Client:
* PersistentOnly: By default only subscribe to persistent topics.
* NonPersistentOnly: Only subscribe to non-persistent topics.
* AllTopics: Subscribe to both persistent and non-persistent
topics.
+ dead_letter_policy: class ConsumerDeadLetterPolicy
+ Set dead letter policy for consumer.
+ By default, some messages are redelivered many times, even to the
extent that they can never be
+ stopped. By using the dead letter mechanism, messages have the max
redelivery count, when they're
+ exceeding the maximum number of redeliveries. Messages are sent to
dead letter topics and acknowledged
+ automatically.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -864,6 +931,8 @@ class Client:
if key_shared_policy:
conf.key_shared_policy(key_shared_policy.policy())
conf.batch_index_ack_enabled(batch_index_ack_enabled)
+ if dead_letter_policy:
+ conf.dead_letter_policy(dead_letter_policy.policy())
c = Consumer()
if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 23a5b80..ac643b7 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -22,6 +22,7 @@
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/KeySharedPolicy.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
+ class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
+ .def(init<>())
+ .def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
+ .def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
+ .def("getInitialSubscriptionName",
&DeadLetterPolicy::getInitialSubscriptionName);
+
+ class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
+ .def(init<>())
+ .def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic,
return_value_policy::reference)
+ .def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount,
return_value_policy::reference)
+ .def("initialSubscriptionName",
&DeadLetterPolicyBuilder::initialSubscriptionName,
return_value_policy::reference)
+ .def("build", &DeadLetterPolicyBuilder::build,
return_value_policy::reference)
+ .def("build", &DeadLetterPolicyBuilder::build,
return_value_policy::reference);
+
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m,
"ConsumerConfiguration")
.def(init<>())
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -285,7 +300,9 @@ void export_config(py::module_& m) {
return_value_policy::reference)
.def("batch_index_ack_enabled",
&ConsumerConfiguration::isBatchIndexAckEnabled)
.def("batch_index_ack_enabled",
&ConsumerConfiguration::setBatchIndexAckEnabled,
- return_value_policy::reference);
+ return_value_policy::reference)
+ .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
+ .def("dead_letter_policy",
&ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m,
"ReaderConfiguration")
.def(init<>())
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 80c98f5..ae6aa3b 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -44,6 +44,7 @@ from pulsar import (
CryptoKeyReader,
ConsumerBatchReceivePolicy,
ProducerAccessMode,
+ ConsumerDeadLetterPolicy,
)
from pulsar.schema import JsonSchema, Record, Integer
@@ -1714,6 +1715,49 @@ class PulsarTest(TestCase):
# assert no more msgs.
with self.assertRaises(pulsar.Timeout):
consumer.receive(timeout_millis=1000)
+ client.close()
+
+ def test_dead_letter_policy_config(self):
+ with self.assertRaises(ValueError):
+ ConsumerDeadLetterPolicy(-1)
+
+ policy = ConsumerDeadLetterPolicy(10)
+ self.assertEqual(10, policy.max_redeliver_count)
+ self.assertEqual("", policy.dead_letter_topic)
+ self.assertEqual("", policy.initial_subscription_name)
+
+ def test_dead_letter_policy(self):
+ client = Client(self.serviceUrl)
+ topic = "my-python-topic-test-dlq" + str(time.time())
+ dlq_topic = 'dlq-' + topic
+ max_redeliver_count = 5
+ consumer = client.subscribe(topic, "my-sub",
consumer_type=ConsumerType.Shared,
+
dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic,
'init-sub'))
+ dlq_consumer = client.subscribe(dlq_topic, "my-sub",
consumer_type=ConsumerType.Shared)
+
+ # Sen num msgs.
+ producer = client.create_producer(topic)
+ num = 10
+ for i in range(num):
+ producer.send(b"hello-%d" % i)
+ producer.flush()
+
+ # Redelivery all messages maxRedeliverCountNum time.
+ for i in range(1, num * max_redeliver_count + num + 1):
+ msg = consumer.receive()
+ if i % num == 0:
+ consumer.redeliver_unacknowledged_messages()
+ print(f"Start redeliver msgs '{i}'")
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
+
+ for i in range(num):
+ msg = dlq_consumer.receive()
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b"hello-%d" % i)
+ dlq_consumer.acknowledge(msg)
+ with self.assertRaises(pulsar.Timeout):
+ dlq_consumer.receive(100)
client.close()