This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 385d79bfd7 Added ordering key option for PubSubPublishMessageOperator 
GCP Operator (#39955)
385d79bfd7 is described below

commit 385d79bfd75f236f59dab012d3e5952021e4965e
Author: Mehdi GATI <[email protected]>
AuthorDate: Sun Jun 2 07:18:11 2024 +0200

    Added ordering key option for PubSubPublishMessageOperator GCP Operator 
(#39955)
    
    * feature/gcp-pubsub-operator-ordering-key
    
    * fix provider checks test
    
    ---------
    
    Co-authored-by: Mehdi GATI <[email protected]>
---
 airflow/providers/google/cloud/hooks/pubsub.py        | 11 ++++++++++-
 airflow/providers/google/cloud/operators/pubsub.py    | 18 ++++++++++++++++++
 tests/providers/google/cloud/hooks/test_pubsub.py     |  9 +++++++--
 tests/providers/google/cloud/operators/test_pubsub.py | 18 ++++++++++++++++++
 4 files changed, 53 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/pubsub.py 
b/airflow/providers/google/cloud/hooks/pubsub.py
index ea0bb819bb..d8da265a44 100644
--- a/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/airflow/providers/google/cloud/hooks/pubsub.py
@@ -36,6 +36,7 @@ from google.api_core.exceptions import AlreadyExists, 
GoogleAPICallError
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.cloud.exceptions import NotFound
 from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
+from google.cloud.pubsub_v1.types import PublisherOptions
 from google.pubsub_v1.services.subscriber.async_client import 
SubscriberAsyncClient
 from googleapiclient.errors import HttpError
 
@@ -79,6 +80,7 @@ class PubSubHook(GoogleBaseHook):
         self,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        enable_message_ordering: bool = False,
         **kwargs,
     ) -> None:
         if kwargs.get("delegate_to") is not None:
@@ -90,6 +92,7 @@ class PubSubHook(GoogleBaseHook):
             gcp_conn_id=gcp_conn_id,
             impersonation_chain=impersonation_chain,
         )
+        self.enable_message_ordering = enable_message_ordering
         self._client = None
 
     def get_conn(self) -> PublisherClient:
@@ -99,7 +102,13 @@ class PubSubHook(GoogleBaseHook):
         :return: Google Cloud Pub/Sub client object.
         """
         if not self._client:
-            self._client = PublisherClient(credentials=self.get_credentials(), 
client_info=CLIENT_INFO)
+            self._client = PublisherClient(
+                credentials=self.get_credentials(),
+                client_info=CLIENT_INFO,
+                publisher_options=PublisherOptions(
+                    enable_message_ordering=self.enable_message_ordering,
+                ),
+            )
         return self._client
 
     @cached_property
diff --git a/airflow/providers/google/cloud/operators/pubsub.py 
b/airflow/providers/google/cloud/operators/pubsub.py
index 41e1a03830..91760ab428 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -604,6 +604,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}}
         m2 = {"data": b"Knock, knock"}
         m3 = {"attributes": {"foo": ""}}
+        m4 = {"data": b"Who's there?", "attributes": {"ordering_key": 
"knock_knock"}}
 
         t1 = PubSubPublishMessageOperator(
             project_id="my-project",
@@ -613,6 +614,15 @@ class 
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
             dag=dag,
         )
 
+        t2 = PubSubPublishMessageOperator(
+            project_id="my-project",
+            topic="my_topic",
+            messages=[m4],
+            create_topic=True,
+            enable_message_ordering=True,
+            dag=dag,
+        )
+
     ``project_id``, ``topic``, and ``messages`` are templated so you can use 
Jinja templating
     in their values.
 
@@ -632,6 +642,10 @@ class 
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
     :param gcp_conn_id: The connection ID to use connecting to
         Google Cloud.
+    :param enable_message_ordering: If true, messages published with the same
+        ordering_key in PubsubMessage will be delivered to the subscribers in 
the order
+        in which they are received by the Pub/Sub system. Otherwise, they may 
be
+        delivered in any order. Default is False.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
         credentials, or chained list of accounts required to get the 
access_token
         of the last account in the list, which will be impersonated in the 
request.
@@ -646,6 +660,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         "project_id",
         "topic",
         "messages",
+        "enable_message_ordering",
         "impersonation_chain",
     )
     ui_color = "#0273d4"
@@ -657,6 +672,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         messages: list,
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
+        enable_message_ordering: bool = False,
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
@@ -665,12 +681,14 @@ class 
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         self.topic = topic
         self.messages = messages
         self.gcp_conn_id = gcp_conn_id
+        self.enable_message_ordering = enable_message_ordering
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: Context) -> None:
         hook = PubSubHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
+            enable_message_ordering=self.enable_message_ordering,
         )
 
         self.log.info("Publishing to topic %s", self.topic)
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py 
b/tests/providers/google/cloud/hooks/test_pubsub.py
index eab743b0e2..29c286b091 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -25,7 +25,7 @@ import pytest
 from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
 from google.api_core.gapic_v1.method import DEFAULT
 from google.cloud.exceptions import NotFound
-from google.cloud.pubsub_v1.types import ReceivedMessage
+from google.cloud.pubsub_v1.types import PublisherOptions, ReceivedMessage
 from googleapiclient.errors import HttpError
 
 from airflow.providers.google.cloud.hooks.pubsub import PubSubAsyncHook, 
PubSubException, PubSubHook
@@ -86,7 +86,12 @@ class TestPubSubHook:
     def test_publisher_client_creation(self, mock_client, mock_get_creds):
         assert self.pubsub_hook._client is None
         result = self.pubsub_hook.get_conn()
-        
mock_client.assert_called_once_with(credentials=mock_get_creds.return_value, 
client_info=CLIENT_INFO)
+
+        mock_client.assert_called_once_with(
+            credentials=mock_get_creds.return_value,
+            client_info=CLIENT_INFO,
+            publisher_options=PublisherOptions(enable_message_ordering=False),
+        )
         assert mock_client.return_value == result
         assert self.pubsub_hook._client == result
 
diff --git a/tests/providers/google/cloud/operators/test_pubsub.py 
b/tests/providers/google/cloud/operators/test_pubsub.py
index 0598214f10..ed65f16577 100644
--- a/tests/providers/google/cloud/operators/test_pubsub.py
+++ b/tests/providers/google/cloud/operators/test_pubsub.py
@@ -41,6 +41,9 @@ TEST_MESSAGES = [
     {"data": b"Knock, knock"},
     {"attributes": {"foo": ""}},
 ]
+TEST_MESSAGES_ORDERING_KEY = [
+    {"data": b"Hello, World!", "attributes": {"ordering_key": "key"}},
+]
 
 
 class TestPubSubTopicCreateOperator:
@@ -235,6 +238,21 @@ class TestPubSubPublishOperator:
             project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES
         )
 
+    @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+    def test_publish_with_ordering_key(self, mock_hook):
+        operator = PubSubPublishMessageOperator(
+            task_id=TASK_ID,
+            project_id=TEST_PROJECT,
+            topic=TEST_TOPIC,
+            messages=TEST_MESSAGES_ORDERING_KEY,
+            enable_message_ordering=True,
+        )
+
+        operator.execute(None)
+        mock_hook.return_value.publish.assert_called_once_with(
+            project_id=TEST_PROJECT, topic=TEST_TOPIC, 
messages=TEST_MESSAGES_ORDERING_KEY
+        )
+
 
 class TestPubSubPullOperator:
     def _generate_messages(self, count):

Reply via email to