This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e37e44  Support start message id inclusive (#19)
3e37e44 is described below

commit 3e37e447e6d59c12b3dac56a13b7ad7364621d80
Author: Kai Wang <[email protected]>
AuthorDate: Fri Oct 21 15:48:55 2022 +0800

    Support start message id inclusive (#19)
    
    ### Motivation
    
    Currently, the pulsar client CPP already supports start message id 
inclusive,
    We need to add the feature config to the Python client.
    
    
    ### Modifications
    
    * Add `start_message_id_inclusive ` to the consumer configuration
---
 pulsar/__init__.py   |  9 ++++++++-
 src/config.cc        |  5 ++++-
 tests/pulsar_test.py | 31 +++++++++++++++++++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 5474c16..6fd38d1 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -697,7 +697,8 @@ class Client:
                   crypto_key_reader=None,
                   replicate_subscription_state_enabled=False,
                   max_pending_chunked_message=10,
-                  auto_ack_oldest_chunked_message_on_queue_full=False
+                  auto_ack_oldest_chunked_message_on_queue_full=False,
+                  start_message_id_inclusive=False
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -791,6 +792,10 @@ class Client:
           can be guarded by providing the maxPendingChunkedMessage threshold. 
See setMaxPendingChunkedMessage.
           Once, consumer reaches this threshold, it drops the outstanding 
unchunked-messages by silently acking
           if autoAckOldestChunkedMessageOnQueueFull is true else it marks them 
for redelivery.
+          Default: `False`.
+        * start_message_id_inclusive:
+          Set the consumer to include the given position of any reset 
operation like Consumer::seek.
+
           Default: `False`.
         """
         _check_type(str, subscription_name, 'subscription_name')
@@ -810,6 +815,7 @@ class Client:
         _check_type_or_none(CryptoKeyReader, crypto_key_reader, 
'crypto_key_reader')
         _check_type(int, max_pending_chunked_message, 
'max_pending_chunked_message')
         _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 
'auto_ack_oldest_chunked_message_on_queue_full')
+        _check_type(bool, start_message_id_inclusive, 
'start_message_id_inclusive')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -838,6 +844,7 @@ class Client:
         
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
         conf.max_pending_chunked_message(max_pending_chunked_message)
         
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
+        conf.start_message_id_inclusive(start_message_id_inclusive)
 
         c = Consumer()
         if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 3a9c14b..d2ed103 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -294,7 +294,10 @@ void export_config() {
         .def("auto_ack_oldest_chunked_message_on_queue_full",
              &ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull)
         .def("auto_ack_oldest_chunked_message_on_queue_full",
-             
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, 
return_self<>());
+             
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, 
return_self<>())
+        .def("start_message_id_inclusive", 
&ConsumerConfiguration::isStartMessageIdInclusive)
+        
.def("start_message_id_inclusive",&ConsumerConfiguration::setStartMessageIdInclusive,
+             return_self<>());
 
     class_<ReaderConfiguration>("ReaderConfiguration")
         .def("reader_listener", &ReaderConfiguration_setReaderListener, 
return_self<>())
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 71af279..d0f1ba0 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -920,6 +920,37 @@ class PulsarTest(TestCase):
         reader.close()
         client.close()
 
+    def test_seek_inclusive(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-seek-inclusive-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.Shared, start_message_id_inclusive=True)
+        producer = client.create_producer(topic)
+
+        for i in range(100):
+            if i > 0:
+                time.sleep(0.02)
+            producer.send(b"hello-%d" % i)
+
+        ids = []
+        for i in range(100):
+            msg = consumer.receive(TM)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
+            ids.append(msg.message_id())
+            consumer.acknowledge(msg)
+
+        # seek, and after reconnect, expected receive first message.
+        consumer.seek(MessageId.earliest)
+        time.sleep(0.5)
+        msg = consumer.receive(TM)
+        self.assertEqual(msg.data(), b"hello-0")
+
+        # seek on messageId
+        consumer.seek(ids[50])
+        time.sleep(0.5)
+        msg = consumer.receive(TM)
+        self.assertEqual(msg.data(), b"hello-50")
+        client.close()
+
     def test_v2_topics(self):
         self._v2_topics(self.serviceUrl)
 

Reply via email to