This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cb3cfe  feat: support batch index ack. (#139)
2cb3cfe is described below

commit 2cb3cfec220121c427a41536e6357410ae4f08b9
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 10 17:57:34 2023 +0800

    feat: support batch index ack. (#139)
---
 pulsar/__init__.py                  |  8 ++++++-
 src/config.cc                       |  3 +++
 tests/pulsar_test.py                | 42 +++++++++++++++++++++++++++++++++++++
 tests/test-conf/standalone-ssl.conf |  3 +++
 tests/test-conf/standalone.conf     |  3 +++
 5 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 5d7cedb..a1b18c1 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -692,7 +692,8 @@ class Client:
                   auto_ack_oldest_chunked_message_on_queue_full=False,
                   start_message_id_inclusive=False,
                   batch_receive_policy=None,
-                  key_shared_policy=None
+                  key_shared_policy=None,
+                  batch_index_ack_enabled=False,
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -779,6 +780,9 @@ class Client:
           Set the batch collection policy for batch receiving.
         key_shared_policy: class ConsumerKeySharedPolicy
             Set the key shared policy for use when the ConsumerType is 
KeyShared.
+        batch_index_ack_enabled: Enable the batch index acknowledgement.
+            It should be noted that this option can only work when the broker 
side also enables the batch index
+            acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` 
config in `broker.conf`.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -800,6 +804,7 @@ class Client:
         _check_type(bool, start_message_id_inclusive, 
'start_message_id_inclusive')
         _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 
'batch_receive_policy')
         _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 
'key_shared_policy')
+        _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -834,6 +839,7 @@ class Client:
 
         if key_shared_policy:
             conf.key_shared_policy(key_shared_policy.policy())
+        conf.batch_index_ack_enabled(batch_index_ack_enabled)
 
         c = Consumer()
         if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 7e2d38d..c71d5b0 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -280,6 +280,9 @@ void export_config(py::module_& m) {
              return_value_policy::reference)
         .def("start_message_id_inclusive", 
&ConsumerConfiguration::isStartMessageIdInclusive)
         .def("start_message_id_inclusive", 
&ConsumerConfiguration::setStartMessageIdInclusive,
+             return_value_policy::reference)
+        .def("batch_index_ack_enabled", 
&ConsumerConfiguration::isBatchIndexAckEnabled)
+        .def("batch_index_ack_enabled", 
&ConsumerConfiguration::setBatchIndexAckEnabled,
              return_value_policy::reference);
 
     class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, 
"ReaderConfiguration")
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 8db4893..60801c9 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1617,6 +1617,48 @@ class PulsarTest(TestCase):
             consumer.acknowledge(msg_id)
         client.close()
 
+    def test_batch_index_ack(self):
+        topic_name = 'test-batch-index-ack-3'
+        client = pulsar.Client('pulsar://localhost:6650')
+        producer = client.create_producer(topic_name,
+                                          batching_enabled=True,
+                                          batching_max_messages=100,
+                                          batching_max_publish_delay_ms=10000)
+        consumer = client.subscribe(topic_name,
+                                    subscription_name='test-batch-index-ack',
+                                    batch_index_ack_enabled=True)
+
+        # Make sure send 0~5 is a batch msg.
+        for i in range(5):
+            producer.send_async(b"hello-%d" % i, callback=None)
+        producer.flush()
+
+        # Receive msgs and just ack 0, 1 msgs
+        results = []
+        for i in range(5):
+            msg = consumer.receive()
+            print("receive from {}".format(msg.message_id()))
+            results.append(msg)
+        assert len(results) == 5
+        for i in range(2):
+            consumer.acknowledge(results[i])
+            time.sleep(0.2)
+
+        # Restart consumer after, just receive 2~5 msg.
+        consumer.close()
+        consumer = client.subscribe(topic_name,
+                                    subscription_name='test-batch-index-ack',
+                                    batch_index_ack_enabled=True)
+        results2 = []
+        for i in range(2, 5):
+            msg = consumer.receive()
+            results2.append(msg)
+        assert len(results2) == 3
+        # assert no more msgs.
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(timeout_millis=1000)
+
+        client.close()
 
 
 if __name__ == "__main__":
diff --git a/tests/test-conf/standalone-ssl.conf 
b/tests/test-conf/standalone-ssl.conf
index 2ee4432..beed278 100644
--- a/tests/test-conf/standalone-ssl.conf
+++ b/tests/test-conf/standalone-ssl.conf
@@ -113,6 +113,9 @@ superUserRoles=localhost,superUser,admin
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
diff --git a/tests/test-conf/standalone.conf b/tests/test-conf/standalone.conf
index faa1277..0225e0d 100644
--- a/tests/test-conf/standalone.conf
+++ b/tests/test-conf/standalone.conf
@@ -100,6 +100,9 @@ superUserRoles=
 brokerClientAuthenticationPlugin=
 brokerClientAuthenticationParameters=
 
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
 
 ### --- BookKeeper Client --- ###
 

Reply via email to