This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new c50ada7  feat: Support dead letter topic. (#135)
c50ada7 is described below

commit c50ada731241f343c38ef18272e553110ca43718
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 15 19:16:07 2023 +0800

    feat: Support dead letter topic. (#135)
    
    * feat: Support dead letter topic.
    
    * Fix non space
    
    * Let maxRedeliverCount param required.
---
 pulsar/__init__.py   | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/config.cc        | 19 +++++++++++++-
 tests/pulsar_test.py | 44 ++++++++++++++++++++++++++++++++
 3 files changed, 132 insertions(+), 2 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 6a6ee6c..8e0907e 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,8 @@ from typing import List, Tuple, Optional
 import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, 
PartitionsRoutingMode, BatchingType, \
-    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, 
ProducerAccessMode, RegexSubscriptionMode  # noqa: F401
+    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, 
ProducerAccessMode, RegexSubscriptionMode, \
+    DeadLetterPolicyBuilder  # noqa: F401
 
 from pulsar.__about__ import __version__
 
@@ -374,6 +375,65 @@ class AuthenticationBasic(Authentication):
             _check_type(str, method, 'method')
             self.auth = _pulsar.AuthenticationBasic.create(username, password, 
method)
 
+class ConsumerDeadLetterPolicy:
+    """
+    Configuration for the "dead letter queue" feature in consumer.
+    """
+    def __init__(self,
+                 max_redeliver_count: int,
+                 dead_letter_topic: str = None,
+                 initial_subscription_name: str = None):
+        """
+        Wrapper DeadLetterPolicy.
+
+        Parameters
+        ----------
+        max_redeliver_count: Maximum number of times that a message is 
redelivered before being sent to the dead letter queue.
+            - The maxRedeliverCount must be greater than 0.
+        dead_letter_topic: Name of the dead topic where the failing messages 
are sent.
+            The default value is: sourceTopicName + "-" + subscriptionName + 
"-DLQ"
+        initial_subscription_name: Name of the initial subscription name of 
the dead letter topic.
+            If this field is not set, the initial subscription for the dead 
letter topic is not created.
+            If this field is set but the broker's 
`allowAutoSubscriptionCreation` is disabled, the DLQ producer
+            fails to be created.
+        """
+        builder = DeadLetterPolicyBuilder()
+        if max_redeliver_count is None or max_redeliver_count < 1:
+            raise ValueError("max_redeliver_count must be greater than 0")
+        builder.maxRedeliverCount(max_redeliver_count)
+        if dead_letter_topic is not None:
+            builder.deadLetterTopic(dead_letter_topic)
+        if initial_subscription_name is not None:
+            builder.initialSubscriptionName(initial_subscription_name)
+        self._policy = builder.build()
+
+    @property
+    def dead_letter_topic(self) -> str:
+        """
+        Return the dead letter topic for dead letter policy.
+        """
+        return self._policy.getDeadLetterTopic()
+
+    @property
+    def max_redeliver_count(self) -> int:
+        """
+        Return the max redeliver count for dead letter policy.
+        """
+        return self._policy.getMaxRedeliverCount()
+
+    @property
+    def initial_subscription_name(self) -> str:
+        """
+        Return the initial subscription name for dead letter policy.
+        """
+        return self._policy.getInitialSubscriptionName()
+
+    def policy(self):
+        """
+        Returns the actual one DeadLetterPolicy.
+        """
+        return self._policy
+
 class Client:
     """
     The Pulsar client. A single client instance can be used to create producers
@@ -708,6 +768,7 @@ class Client:
                   key_shared_policy=None,
                   batch_index_ack_enabled=False,
                   regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
+                  dead_letter_policy: ConsumerDeadLetterPolicy = None,
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -805,6 +866,12 @@ class Client:
             * PersistentOnly: By default only subscribe to persistent topics.
             * NonPersistentOnly: Only subscribe to non-persistent topics.
             * AllTopics: Subscribe to both persistent and non-persistent 
topics.
+        dead_letter_policy: class ConsumerDeadLetterPolicy
+          Set dead letter policy for consumer.
+          By default, some messages are redelivered many times, even to the 
extent that they can never be
+          stopped. By using the dead letter mechanism, messages have the max 
redelivery count, when they're
+          exceeding the maximum number of redeliveries. Messages are sent to 
dead letter topics and acknowledged
+          automatically.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -864,6 +931,8 @@ class Client:
         if key_shared_policy:
             conf.key_shared_policy(key_shared_policy.policy())
         conf.batch_index_ack_enabled(batch_index_ack_enabled)
+        if dead_letter_policy:
+            conf.dead_letter_policy(dead_letter_policy.policy())
 
         c = Consumer()
         if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 23a5b80..ac643b7 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -22,6 +22,7 @@
 #include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/KeySharedPolicy.h>
+#include <pulsar/DeadLetterPolicyBuilder.h>
 #include <pybind11/functional.h>
 #include <pybind11/pybind11.h>
 #include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
         .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
         .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
 
+    class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
+        .def(init<>())
+        .def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
+        .def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
+        .def("getInitialSubscriptionName", 
&DeadLetterPolicy::getInitialSubscriptionName);
+
+    class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
+        .def(init<>())
+        .def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, 
return_value_policy::reference)
+        .def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, 
return_value_policy::reference)
+        .def("initialSubscriptionName", 
&DeadLetterPolicyBuilder::initialSubscriptionName, 
return_value_policy::reference)
+        .def("build", &DeadLetterPolicyBuilder::build, 
return_value_policy::reference)
+        .def("build", &DeadLetterPolicyBuilder::build, 
return_value_policy::reference);
+
     class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, 
"ConsumerConfiguration")
         .def(init<>())
         .def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -285,7 +300,9 @@ void export_config(py::module_& m) {
              return_value_policy::reference)
         .def("batch_index_ack_enabled", 
&ConsumerConfiguration::isBatchIndexAckEnabled)
         .def("batch_index_ack_enabled", 
&ConsumerConfiguration::setBatchIndexAckEnabled,
-             return_value_policy::reference);
+             return_value_policy::reference)
+        .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
+        .def("dead_letter_policy", 
&ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
 
     class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, 
"ReaderConfiguration")
         .def(init<>())
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 80c98f5..ae6aa3b 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -44,6 +44,7 @@ from pulsar import (
     CryptoKeyReader,
     ConsumerBatchReceivePolicy,
     ProducerAccessMode,
+    ConsumerDeadLetterPolicy,
 )
 from pulsar.schema import JsonSchema, Record, Integer
 
@@ -1714,6 +1715,49 @@ class PulsarTest(TestCase):
         # assert no more msgs.
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(timeout_millis=1000)
+        client.close()
+
+    def test_dead_letter_policy_config(self):
+        with self.assertRaises(ValueError):
+            ConsumerDeadLetterPolicy(-1)
+
+        policy = ConsumerDeadLetterPolicy(10)
+        self.assertEqual(10, policy.max_redeliver_count)
+        self.assertEqual("", policy.dead_letter_topic)
+        self.assertEqual("", policy.initial_subscription_name)
+
+    def test_dead_letter_policy(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-test-dlq" + str(time.time())
+        dlq_topic = 'dlq-' + topic
+        max_redeliver_count = 5
+        consumer = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.Shared,
+                                    
dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 
'init-sub'))
+        dlq_consumer = client.subscribe(dlq_topic, "my-sub", 
consumer_type=ConsumerType.Shared)
+
+        # Sen num msgs.
+        producer = client.create_producer(topic)
+        num = 10
+        for i in range(num):
+            producer.send(b"hello-%d" % i)
+        producer.flush()
+
+        # Redelivery all messages maxRedeliverCountNum time.
+        for i in range(1, num * max_redeliver_count + num + 1):
+            msg = consumer.receive()
+            if i % num == 0:
+                consumer.redeliver_unacknowledged_messages()
+                print(f"Start redeliver msgs '{i}'")
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
+
+        for i in range(num):
+            msg = dlq_consumer.receive()
+            self.assertTrue(msg)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
+            dlq_consumer.acknowledge(msg)
+        with self.assertRaises(pulsar.Timeout):
+            dlq_consumer.receive(100)
 
         client.close()
 

Reply via email to