This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 9047170 Added missing publish option `ordering_key` (#152)
9047170 is described below
commit 904717079da07a19dba0bb06bf49f99258c96461
Author: Shane Breatnach <[email protected]>
AuthorDate: Wed Sep 27 07:17:36 2023 +0100
Added missing publish option `ordering_key` (#152)
The `ordering_key` was not available to set when publishing
messages. This has been added, aping how partition_key is set.
Added test to cover new functionality.
---
pulsar/__init__.py | 23 +++++++++++++++++++----
pulsar/functions/context.py | 6 ++++++
src/message.cc | 2 ++
tests/pulsar_test.py | 17 +++++++++++++++++
4 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 8e0907e..eec603a 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -146,6 +146,12 @@ class Message:
"""
return self._message.partition_key()
+ def ordering_key(self):
+ """
+ Get the ordering key for the message.
+ """
+ return self._message.ordering_key()
+
def publish_timestamp(self):
"""
Get the timestamp in milliseconds with the message publish time.
@@ -1020,7 +1026,7 @@ class Client:
Symmetric encryption class implementation, configuring public key
encryption messages for the producer
and private key decryption messages for the consumer
"""
-
+
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId
object
if isinstance(start_message_id, MessageId):
start_message_id = start_message_id._msg_id
@@ -1142,6 +1148,7 @@ class Producer:
def send(self, content,
properties=None,
partition_key=None,
+ ordering_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
@@ -1164,6 +1171,8 @@ class Producer:
partition_key: optional
Sets the partition key for message routing. A hash of this key is
used
to determine the message's topic partition.
+ ordering_key: optional
+ Sets the ordering key for message routing.
sequence_id: optional
Specify a custom sequence id for the message being published.
replication_clusters: optional
@@ -1180,7 +1189,7 @@ class Producer:
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.
"""
- msg = self._build_msg(content, properties, partition_key, sequence_id,
+ msg = self._build_msg(content, properties, partition_key,
ordering_key, sequence_id,
replication_clusters, disable_replication,
event_timestamp,
deliver_at, deliver_after)
return self._producer.send(msg)
@@ -1188,6 +1197,7 @@ class Producer:
def send_async(self, content, callback,
properties=None,
partition_key=None,
+ ordering_key=None,
sequence_id=None,
replication_clusters=None,
disable_replication=False,
@@ -1239,6 +1249,8 @@ class Producer:
partition_key: optional
Sets the partition key for the message routing. A hash of this key
is
used to determine the message's topic partition.
+ ordering_key: optional
+ Sets the ordering key for the message routing.
sequence_id: optional
Specify a custom sequence id for the message being published.
replication_clusters: optional
@@ -1254,7 +1266,7 @@ class Producer:
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.
"""
- msg = self._build_msg(content, properties, partition_key, sequence_id,
+ msg = self._build_msg(content, properties, partition_key,
ordering_key, sequence_id,
replication_clusters, disable_replication,
event_timestamp,
deliver_at, deliver_after)
self._producer.send_async(msg, callback)
@@ -1274,7 +1286,7 @@ class Producer:
"""
self._producer.close()
- def _build_msg(self, content, properties, partition_key, sequence_id,
+ def _build_msg(self, content, properties, partition_key, ordering_key,
sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after):
data = self._schema.encode(content)
@@ -1282,6 +1294,7 @@ class Producer:
_check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
+ _check_type_or_none(str, ordering_key, 'ordering_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
_check_type_or_none(list, replication_clusters, 'replication_clusters')
_check_type(bool, disable_replication, 'disable_replication')
@@ -1296,6 +1309,8 @@ class Producer:
mb.property(k, v)
if partition_key:
mb.partition_key(partition_key)
+ if ordering_key:
+ mb.ordering_key(ordering_key)
if sequence_id:
mb.sequence_id(sequence_id)
if replication_clusters:
diff --git a/pulsar/functions/context.py b/pulsar/functions/context.py
index 51b86f0..30c8859 100644
--- a/pulsar/functions/context.py
+++ b/pulsar/functions/context.py
@@ -125,6 +125,11 @@ class Context(object):
"""Returns partition key of the input message is one exists"""
pass
+ @abstractmethod
+ def get_ordering_key(self):
+ """Returns ordering key of the input message, if one exists"""
+ pass
+
@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy
isinstance(metric_value, numbers.Number)"""
@@ -140,6 +145,7 @@ class Context(object):
properties,
partition_key,
+ ordering_key,
sequence_id,
replication_clusters,
disable_replication,
diff --git a/src/message.cc b/src/message.cc
index 895209f..dec6f05 100644
--- a/src/message.cc
+++ b/src/message.cc
@@ -42,6 +42,7 @@ void export_message(py::module_& m) {
.def("deliver_after", &MessageBuilder::setDeliverAfter,
return_value_policy::reference)
.def("deliver_at", &MessageBuilder::setDeliverAt,
return_value_policy::reference)
.def("partition_key", &MessageBuilder::setPartitionKey,
return_value_policy::reference)
+ .def("ordering_key", &MessageBuilder::setOrderingKey,
return_value_policy::reference)
.def("event_timestamp", &MessageBuilder::setEventTimestamp,
return_value_policy::reference)
.def("replication_clusters", &MessageBuilder::setReplicationClusters,
return_value_policy::reference)
.def("disable_replication", &MessageBuilder::disableReplication,
return_value_policy::reference)
@@ -87,6 +88,7 @@ void export_message(py::module_& m) {
.def("data", [](const Message& msg) { return
bytes(msg.getDataAsString()); })
.def("length", &Message::getLength)
.def("partition_key", &Message::getPartitionKey,
return_value_policy::copy)
+ .def("ordering_key", &Message::getOrderingKey,
return_value_policy::copy)
.def("publish_timestamp", &Message::getPublishTimestamp)
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message::getMessageId, return_value_policy::copy)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index ae6aa3b..d14d39c 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -251,6 +251,23 @@ class PulsarTest(TestCase):
consumer.unsubscribe()
client.close()
+ def test_ordering_key(self):
+ client = Client(self.serviceUrl)
+ consumer = client.subscribe(
+ "my-python-topic-ordering-key", "my-sub",
consumer_type=ConsumerType.KeyShared
+ )
+ producer = client.create_producer("my-python-topic-ordering-key")
+ producer.send(b"ordered-hello", ordering_key="random-ordering-key")
+
+ # Message should be available immediately with ordering key set
+ msg = consumer.receive(TM)
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b"ordered-hello")
+ self.assertEqual(msg.ordering_key(), "random-ordering-key")
+ consumer.unsubscribe()
+ producer.close()
+ client.close()
+
def test_redelivery_count(self):
client = Client(self.serviceUrl)
consumer = client.subscribe(