This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new c5c177a [feat] Support consumer batch receive. (#33)
c5c177a is described below
commit c5c177af647997d1182f3d4a69c74824f39af38f
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Nov 16 20:00:21 2022 +0800
[feat] Support consumer batch receive. (#33)
---
pulsar/__init__.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++--
src/config.cc | 7 +++++++
src/consumer.cc | 10 ++++++++++
tests/pulsar_test.py | 22 ++++++++++++++++++++++
4 files changed, 87 insertions(+), 2 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index c1195de..0007e61 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -46,7 +46,7 @@ import logging
import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition,
PartitionsRoutingMode, BatchingType, \
- LoggerLevel # noqa: F401
+ LoggerLevel, BatchReceivePolicy # noqa: F401
from pulsar.exceptions import *
@@ -657,7 +657,8 @@ class Client:
replicate_subscription_state_enabled=False,
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False,
- start_message_id_inclusive=False
+ start_message_id_inclusive=False,
+ batch_receive_policy=None
):
"""
Subscribe to the given topic and subscription combination.
@@ -740,6 +741,8 @@ class Client:
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them
for redelivery.
start_message_id_inclusive: bool, default=False
Set the consumer to include the given position of any reset
operation like Consumer::seek.
+ batch_receive_policy: class ConsumerBatchReceivePolicy
+ Set the batch collection policy for batch receiving.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -759,6 +762,7 @@ class Client:
_check_type(int, max_pending_chunked_message,
'max_pending_chunked_message')
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full,
'auto_ack_oldest_chunked_message_on_queue_full')
_check_type(bool, start_message_id_inclusive,
'start_message_id_inclusive')
+ _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy,
'batch_receive_policy')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
@@ -788,6 +792,8 @@ class Client:
conf.max_pending_chunked_message(max_pending_chunked_message)
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
conf.start_message_id_inclusive(start_message_id_inclusive)
+ if batch_receive_policy:
+ conf.batch_receive_policy(batch_receive_policy.policy())
c = Consumer()
if isinstance(topic, str):
@@ -1237,6 +1243,20 @@ class Consumer:
m._schema = self._schema
return m
+ def batch_receive(self):
+ """
+ Batch receiving messages.
+
+ This calls blocks until has enough messages or wait timeout, more
details to see {@link BatchReceivePolicy}.
+ """
+ messages = []
+ msgs = self._consumer.batch_receive()
+ for msg in msgs:
+ m = Message()
+ m._message = msg
+ messages.append(m)
+ return messages
+
def acknowledge(self, message):
"""
Acknowledge the reception of a single message.
@@ -1354,6 +1374,32 @@ class Consumer:
"""
return self._consumer.get_last_message_id()
+class ConsumerBatchReceivePolicy:
+ """
+ Batch receive policy can limit the number and bytes of messages in a
single batch,
+ and can specify a timeout for waiting for enough messages for this batch.
+
+ A batch receive action is completed as long as any one of the conditions
(the batch has enough number
+ or size of messages, or the waiting timeout is passed) are met.
+ """
+ def __init__(self, max_num_message, max_num_bytes, timeout_ms):
+ """
+ Wrapper BatchReceivePolicy.
+
+ Parameters
+ ----------
+
+ max_num_message: Max num message, if less than 0, it means no limit.
default: -1
+ max_num_bytes: Max num bytes, if less than 0, it means no limit.
default: 10 * 1024 * 1024
+ timeout_ms: If less than 0, it means no limit. default: 100
+ """
+ self._policy = BatchReceivePolicy(max_num_message, max_num_bytes,
timeout_ms)
+
+ def policy(self):
+ """
+ Returns the actual one BatchReceivePolicy.
+ """
+ return self._policy
class Reader:
"""
diff --git a/src/config.cc b/src/config.cc
index d2ed103..f2b7187 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -253,6 +253,11 @@ void export_config() {
.def("encryption_key", &ProducerConfiguration::addEncryptionKey,
return_self<>())
.def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader,
return_self<>());
+ class_<BatchReceivePolicy>("BatchReceivePolicy", init<int, int, long>())
+ .def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs)
+ .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
+ .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
+
class_<ConsumerConfiguration>("ConsumerConfiguration")
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
.def("consumer_type", &ConsumerConfiguration::setConsumerType,
return_self<>())
@@ -267,6 +272,8 @@ void export_config() {
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
.def("consumer_name", &ConsumerConfiguration::getConsumerName,
return_value_policy<copy_const_reference>())
+ .def("batch_receive_policy",
&ConsumerConfiguration::getBatchReceivePolicy,
return_value_policy<copy_const_reference>())
+ .def("batch_receive_policy",
&ConsumerConfiguration::setBatchReceivePolicy)
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
.def("unacked_messages_timeout_ms",
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
.def("unacked_messages_timeout_ms",
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
diff --git a/src/consumer.cc b/src/consumer.cc
index 811ceb3..5298fae 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -42,6 +42,15 @@ Message Consumer_receive_timeout(Consumer& consumer, int
timeoutMs) {
return msg;
}
+Messages Consumer_batch_receive(Consumer& consumer) {
+ Messages msgs;
+ Result res;
+ Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs);
+ Py_END_ALLOW_THREADS
+ CHECK_RESULT(res);
+ return msgs;
+}
+
void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
consumer.acknowledgeAsync(msg, nullptr); }
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId&
msgId) {
@@ -103,6 +112,7 @@ void export_consumer() {
.def("unsubscribe", &Consumer_unsubscribe)
.def("receive", &Consumer_receive)
.def("receive", &Consumer_receive_timeout)
+ .def("batch_receive", &Consumer_batch_receive)
.def("acknowledge", &Consumer_acknowledge)
.def("acknowledge", &Consumer_acknowledge_message_id)
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index d0f1ba0..30f451d 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -39,6 +39,7 @@ from pulsar import (
AuthenticationToken,
InitialPosition,
CryptoKeyReader,
+ ConsumerBatchReceivePolicy,
)
from pulsar.schema import JsonSchema, Record, Integer
@@ -1064,6 +1065,27 @@ class PulsarTest(TestCase):
consumer.receive(100)
client.close()
+ def test_batch_receive(self):
+ client = Client(self.serviceUrl)
+ topic = "my-python-topic-batch-receive-" + str(time.time())
+ consumer = client.subscribe(topic, "my-sub",
consumer_type=ConsumerType.Shared,
+ start_message_id_inclusive=True,
batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
+ producer = client.create_producer(topic)
+
+
+ for i in range(10):
+ if i > 0:
+ time.sleep(0.02)
+ producer.send(b"hello-%d" % i)
+
+ msgs = consumer.batch_receive()
+ i = 0
+ for msg in msgs:
+ self.assertEqual(msg.data(), b"hello-%d" % i)
+ i += 1
+
+ client.close()
+
def test_message_id(self):
s = MessageId.earliest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.earliest)