This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 9047170  Added missing publish option `ordering_key` (#152)
9047170 is described below

commit 904717079da07a19dba0bb06bf49f99258c96461
Author: Shane Breatnach <[email protected]>
AuthorDate: Wed Sep 27 07:17:36 2023 +0100

    Added missing publish option `ordering_key` (#152)
    
    The `ordering_key` was not available to set when publishing
    messages. This has been added, aping how partition_key is set.
    Added test to cover new functionality.
---
 pulsar/__init__.py          | 23 +++++++++++++++++++----
 pulsar/functions/context.py |  6 ++++++
 src/message.cc              |  2 ++
 tests/pulsar_test.py        | 17 +++++++++++++++++
 4 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 8e0907e..eec603a 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -146,6 +146,12 @@ class Message:
         """
         return self._message.partition_key()
 
+    def ordering_key(self):
+        """
+        Get the ordering key for the message.
+        """
+        return self._message.ordering_key()
+
     def publish_timestamp(self):
         """
         Get the timestamp in milliseconds with the message publish time.
@@ -1020,7 +1026,7 @@ class Client:
             Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
             and private key decryption messages for the consumer
         """
-        
+
         # If a pulsar.MessageId object is passed, access the _pulsar.MessageId 
object
         if isinstance(start_message_id, MessageId):
             start_message_id = start_message_id._msg_id
@@ -1142,6 +1148,7 @@ class Producer:
     def send(self, content,
              properties=None,
              partition_key=None,
+             ordering_key=None,
              sequence_id=None,
              replication_clusters=None,
              disable_replication=False,
@@ -1164,6 +1171,8 @@ class Producer:
         partition_key: optional
             Sets the partition key for message routing. A hash of this key is 
used
             to determine the message's topic partition.
+        ordering_key: optional
+            Sets the ordering key for message routing.
         sequence_id:  optional
             Specify a custom sequence id for the message being published.
         replication_clusters:  optional
@@ -1180,7 +1189,7 @@ class Producer:
         deliver_after: optional
             Specify a delay in timedelta for the delivery of the messages.
         """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
+        msg = self._build_msg(content, properties, partition_key, 
ordering_key, sequence_id,
                               replication_clusters, disable_replication, 
event_timestamp,
                               deliver_at, deliver_after)
         return self._producer.send(msg)
@@ -1188,6 +1197,7 @@ class Producer:
     def send_async(self, content, callback,
                    properties=None,
                    partition_key=None,
+                   ordering_key=None,
                    sequence_id=None,
                    replication_clusters=None,
                    disable_replication=False,
@@ -1239,6 +1249,8 @@ class Producer:
         partition_key: optional
             Sets the partition key for the message routing. A hash of this key 
is
             used to determine the message's topic partition.
+        ordering_key: optional
+            Sets the ordering key for the message routing.
         sequence_id: optional
             Specify a custom sequence id for the message being published.
         replication_clusters: optional
@@ -1254,7 +1266,7 @@ class Producer:
         deliver_after: optional
             Specify a delay in timedelta for the delivery of the messages.
         """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
+        msg = self._build_msg(content, properties, partition_key, 
ordering_key, sequence_id,
                               replication_clusters, disable_replication, 
event_timestamp,
                               deliver_at, deliver_after)
         self._producer.send_async(msg, callback)
@@ -1274,7 +1286,7 @@ class Producer:
         """
         self._producer.close()
 
-    def _build_msg(self, content, properties, partition_key, sequence_id,
+    def _build_msg(self, content, properties, partition_key, ordering_key, 
sequence_id,
                    replication_clusters, disable_replication, event_timestamp,
                    deliver_at, deliver_after):
         data = self._schema.encode(content)
@@ -1282,6 +1294,7 @@ class Producer:
         _check_type(bytes, data, 'data')
         _check_type_or_none(dict, properties, 'properties')
         _check_type_or_none(str, partition_key, 'partition_key')
+        _check_type_or_none(str, ordering_key, 'ordering_key')
         _check_type_or_none(int, sequence_id, 'sequence_id')
         _check_type_or_none(list, replication_clusters, 'replication_clusters')
         _check_type(bool, disable_replication, 'disable_replication')
@@ -1296,6 +1309,8 @@ class Producer:
                 mb.property(k, v)
         if partition_key:
             mb.partition_key(partition_key)
+        if ordering_key:
+            mb.ordering_key(ordering_key)
         if sequence_id:
             mb.sequence_id(sequence_id)
         if replication_clusters:
diff --git a/pulsar/functions/context.py b/pulsar/functions/context.py
index 51b86f0..30c8859 100644
--- a/pulsar/functions/context.py
+++ b/pulsar/functions/context.py
@@ -125,6 +125,11 @@ class Context(object):
         """Returns partition key of the input message is one exists"""
         pass
 
+    @abstractmethod
+    def get_ordering_key(self):
+        """Returns ordering key of the input message, if one exists"""
+        pass
+
     @abstractmethod
     def record_metric(self, metric_name, metric_value):
         """Records the metric_value. metric_value has to satisfy 
isinstance(metric_value, numbers.Number)"""
@@ -140,6 +145,7 @@ class Context(object):
 
           properties,
           partition_key,
+          ordering_key,
           sequence_id,
           replication_clusters,
           disable_replication,
diff --git a/src/message.cc b/src/message.cc
index 895209f..dec6f05 100644
--- a/src/message.cc
+++ b/src/message.cc
@@ -42,6 +42,7 @@ void export_message(py::module_& m) {
         .def("deliver_after", &MessageBuilder::setDeliverAfter, 
return_value_policy::reference)
         .def("deliver_at", &MessageBuilder::setDeliverAt, 
return_value_policy::reference)
         .def("partition_key", &MessageBuilder::setPartitionKey, 
return_value_policy::reference)
+        .def("ordering_key", &MessageBuilder::setOrderingKey, 
return_value_policy::reference)
         .def("event_timestamp", &MessageBuilder::setEventTimestamp, 
return_value_policy::reference)
         .def("replication_clusters", &MessageBuilder::setReplicationClusters, 
return_value_policy::reference)
         .def("disable_replication", &MessageBuilder::disableReplication, 
return_value_policy::reference)
@@ -87,6 +88,7 @@ void export_message(py::module_& m) {
         .def("data", [](const Message& msg) { return 
bytes(msg.getDataAsString()); })
         .def("length", &Message::getLength)
         .def("partition_key", &Message::getPartitionKey, 
return_value_policy::copy)
+        .def("ordering_key", &Message::getOrderingKey, 
return_value_policy::copy)
         .def("publish_timestamp", &Message::getPublishTimestamp)
         .def("event_timestamp", &Message::getEventTimestamp)
         .def("message_id", &Message::getMessageId, return_value_policy::copy)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index ae6aa3b..d14d39c 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -251,6 +251,23 @@ class PulsarTest(TestCase):
         consumer.unsubscribe()
         client.close()
 
+    def test_ordering_key(self):
+        client = Client(self.serviceUrl)
+        consumer = client.subscribe(
+            "my-python-topic-ordering-key", "my-sub", 
consumer_type=ConsumerType.KeyShared
+        )
+        producer = client.create_producer("my-python-topic-ordering-key")
+        producer.send(b"ordered-hello", ordering_key="random-ordering-key")
+
+        # Message should be available immediately with ordering key set
+        msg = consumer.receive(TM)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b"ordered-hello")
+        self.assertEqual(msg.ordering_key(), "random-ordering-key")
+        consumer.unsubscribe()
+        producer.close()
+        client.close()
+
     def test_redelivery_count(self):
         client = Client(self.serviceUrl)
         consumer = client.subscribe(

Reply via email to