This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2cb3cfe feat: support batch index ack. (#139)
2cb3cfe is described below
commit 2cb3cfec220121c427a41536e6357410ae4f08b9
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jul 10 17:57:34 2023 +0800
feat: support batch index ack. (#139)
---
pulsar/__init__.py | 8 ++++++-
src/config.cc | 3 +++
tests/pulsar_test.py | 42 +++++++++++++++++++++++++++++++++++++
tests/test-conf/standalone-ssl.conf | 3 +++
tests/test-conf/standalone.conf | 3 +++
5 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 5d7cedb..a1b18c1 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -692,7 +692,8 @@ class Client:
auto_ack_oldest_chunked_message_on_queue_full=False,
start_message_id_inclusive=False,
batch_receive_policy=None,
- key_shared_policy=None
+ key_shared_policy=None,
+ batch_index_ack_enabled=False,
):
"""
Subscribe to the given topic and subscription combination.
@@ -779,6 +780,9 @@ class Client:
Set the batch collection policy for batch receiving.
key_shared_policy: class ConsumerKeySharedPolicy
Set the key shared policy for use when the ConsumerType is
KeyShared.
+ batch_index_ack_enabled: Enable the batch index acknowledgement.
+ It should be noted that this option can only work when the broker
side also enables the batch index
+ acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled`
config in `broker.conf`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -800,6 +804,7 @@ class Client:
_check_type(bool, start_message_id_inclusive,
'start_message_id_inclusive')
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy,
'batch_receive_policy')
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy,
'key_shared_policy')
+ _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
@@ -834,6 +839,7 @@ class Client:
if key_shared_policy:
conf.key_shared_policy(key_shared_policy.policy())
+ conf.batch_index_ack_enabled(batch_index_ack_enabled)
c = Consumer()
if isinstance(topic, str):
diff --git a/src/config.cc b/src/config.cc
index 7e2d38d..c71d5b0 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -280,6 +280,9 @@ void export_config(py::module_& m) {
return_value_policy::reference)
.def("start_message_id_inclusive",
&ConsumerConfiguration::isStartMessageIdInclusive)
.def("start_message_id_inclusive",
&ConsumerConfiguration::setStartMessageIdInclusive,
+ return_value_policy::reference)
+ .def("batch_index_ack_enabled",
&ConsumerConfiguration::isBatchIndexAckEnabled)
+ .def("batch_index_ack_enabled",
&ConsumerConfiguration::setBatchIndexAckEnabled,
return_value_policy::reference);
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m,
"ReaderConfiguration")
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 8db4893..60801c9 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1617,6 +1617,48 @@ class PulsarTest(TestCase):
consumer.acknowledge(msg_id)
client.close()
+ def test_batch_index_ack(self):
+ topic_name = 'test-batch-index-ack-3'
+ client = pulsar.Client('pulsar://localhost:6650')
+ producer = client.create_producer(topic_name,
+ batching_enabled=True,
+ batching_max_messages=100,
+ batching_max_publish_delay_ms=10000)
+ consumer = client.subscribe(topic_name,
+ subscription_name='test-batch-index-ack',
+ batch_index_ack_enabled=True)
+
+ # Make sure send 0~5 is a batch msg.
+ for i in range(5):
+ producer.send_async(b"hello-%d" % i, callback=None)
+ producer.flush()
+
+ # Receive msgs and just ack 0, 1 msgs
+ results = []
+ for i in range(5):
+ msg = consumer.receive()
+ print("receive from {}".format(msg.message_id()))
+ results.append(msg)
+ assert len(results) == 5
+ for i in range(2):
+ consumer.acknowledge(results[i])
+ time.sleep(0.2)
+
+ # Restart consumer after, just receive 2~5 msg.
+ consumer.close()
+ consumer = client.subscribe(topic_name,
+ subscription_name='test-batch-index-ack',
+ batch_index_ack_enabled=True)
+ results2 = []
+ for i in range(2, 5):
+ msg = consumer.receive()
+ results2.append(msg)
+ assert len(results2) == 3
+ # assert no more msgs.
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(timeout_millis=1000)
+
+ client.close()
if __name__ == "__main__":
diff --git a/tests/test-conf/standalone-ssl.conf
b/tests/test-conf/standalone-ssl.conf
index 2ee4432..beed278 100644
--- a/tests/test-conf/standalone-ssl.conf
+++ b/tests/test-conf/standalone-ssl.conf
@@ -113,6 +113,9 @@ superUserRoles=localhost,superUser,admin
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
### --- BookKeeper Client --- ###
# Authentication plugin to use when connecting to bookies
diff --git a/tests/test-conf/standalone.conf b/tests/test-conf/standalone.conf
index faa1277..0225e0d 100644
--- a/tests/test-conf/standalone.conf
+++ b/tests/test-conf/standalone.conf
@@ -100,6 +100,9 @@ superUserRoles=
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
### --- BookKeeper Client --- ###