This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 3e37e44 Support start message id inclusive (#19)
3e37e44 is described below
commit 3e37e447e6d59c12b3dac56a13b7ad7364621d80
Author: Kai Wang <[email protected]>
AuthorDate: Fri Oct 21 15:48:55 2022 +0800
Support start message id inclusive (#19)
### Motivation
Currently, the pulsar client CPP already supports start message id
inclusive,
We need to add the feature config to the Python client.
### Modifications
* Add `start_message_id_inclusive ` to the consumer configuration
---
pulsar/__init__.py | 9 ++++++++-
src/config.cc | 5 ++++-
tests/pulsar_test.py | 31 +++++++++++++++++++++++++++++++
3 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 5474c16..6fd38d1 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -697,7 +697,8 @@ class Client:
crypto_key_reader=None,
replicate_subscription_state_enabled=False,
max_pending_chunked_message=10,
- auto_ack_oldest_chunked_message_on_queue_full=False
+ auto_ack_oldest_chunked_message_on_queue_full=False,
+ start_message_id_inclusive=False
):
"""
Subscribe to the given topic and subscription combination.
@@ -791,6 +792,10 @@ class Client:
can be guarded by providing the maxPendingChunkedMessage threshold.
See setMaxPendingChunkedMessage.
Once, consumer reaches this threshold, it drops the outstanding
unchunked-messages by silently acking
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them
for redelivery.
+ Default: `False`.
+ * start_message_id_inclusive:
+ Set the consumer to include the given position of any reset
operation like Consumer::seek.
+
Default: `False`.
"""
_check_type(str, subscription_name, 'subscription_name')
@@ -810,6 +815,7 @@ class Client:
_check_type_or_none(CryptoKeyReader, crypto_key_reader,
'crypto_key_reader')
_check_type(int, max_pending_chunked_message,
'max_pending_chunked_message')
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full,
'auto_ack_oldest_chunked_message_on_queue_full')
+ _check_type(bool, start_message_id_inclusive,
'start_message_id_inclusive')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
@@ -838,6 +844,7 @@ class Client:
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
conf.max_pending_chunked_message(max_pending_chunked_message)
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
+ conf.start_message_id_inclusive(start_message_id_inclusive)
c = Consumer()
if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 3a9c14b..d2ed103 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -294,7 +294,10 @@ void export_config() {
.def("auto_ack_oldest_chunked_message_on_queue_full",
&ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull)
.def("auto_ack_oldest_chunked_message_on_queue_full",
-
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull,
return_self<>());
+
&ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull,
return_self<>())
+ .def("start_message_id_inclusive",
&ConsumerConfiguration::isStartMessageIdInclusive)
+
.def("start_message_id_inclusive",&ConsumerConfiguration::setStartMessageIdInclusive,
+ return_self<>());
class_<ReaderConfiguration>("ReaderConfiguration")
.def("reader_listener", &ReaderConfiguration_setReaderListener,
return_self<>())
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 71af279..d0f1ba0 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -920,6 +920,37 @@ class PulsarTest(TestCase):
reader.close()
client.close()
+ def test_seek_inclusive(self):
+ client = Client(self.serviceUrl)
+ topic = "my-python-topic-seek-inclusive-" + str(time.time())
+ consumer = client.subscribe(topic, "my-sub",
consumer_type=ConsumerType.Shared, start_message_id_inclusive=True)
+ producer = client.create_producer(topic)
+
+ for i in range(100):
+ if i > 0:
+ time.sleep(0.02)
+ producer.send(b"hello-%d" % i)
+
+ ids = []
+ for i in range(100):
+ msg = consumer.receive(TM)
+ self.assertEqual(msg.data(), b"hello-%d" % i)
+ ids.append(msg.message_id())
+ consumer.acknowledge(msg)
+
+ # seek, and after reconnect, expected receive first message.
+ consumer.seek(MessageId.earliest)
+ time.sleep(0.5)
+ msg = consumer.receive(TM)
+ self.assertEqual(msg.data(), b"hello-0")
+
+ # seek on messageId
+ consumer.seek(ids[50])
+ time.sleep(0.5)
+ msg = consumer.receive(TM)
+ self.assertEqual(msg.data(), b"hello-50")
+ client.close()
+
def test_v2_topics(self):
self._v2_topics(self.serviceUrl)