This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 385d79bfd7 Added ordering key option for PubSubPublishMessageOperator
GCP Operator (#39955)
385d79bfd7 is described below
commit 385d79bfd75f236f59dab012d3e5952021e4965e
Author: Mehdi GATI <[email protected]>
AuthorDate: Sun Jun 2 07:18:11 2024 +0200
Added ordering key option for PubSubPublishMessageOperator GCP Operator
(#39955)
* feature/gcp-pubsub-operator-ordering-key
* fix provider checks test
---------
Co-authored-by: Mehdi GATI <[email protected]>
---
airflow/providers/google/cloud/hooks/pubsub.py | 11 ++++++++++-
airflow/providers/google/cloud/operators/pubsub.py | 18 ++++++++++++++++++
tests/providers/google/cloud/hooks/test_pubsub.py | 9 +++++++--
tests/providers/google/cloud/operators/test_pubsub.py | 18 ++++++++++++++++++
4 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/pubsub.py
b/airflow/providers/google/cloud/hooks/pubsub.py
index ea0bb819bb..d8da265a44 100644
--- a/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/airflow/providers/google/cloud/hooks/pubsub.py
@@ -36,6 +36,7 @@ from google.api_core.exceptions import AlreadyExists,
GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.exceptions import NotFound
from google.cloud.pubsub_v1 import PublisherClient, SubscriberClient
+from google.cloud.pubsub_v1.types import PublisherOptions
from google.pubsub_v1.services.subscriber.async_client import
SubscriberAsyncClient
from googleapiclient.errors import HttpError
@@ -79,6 +80,7 @@ class PubSubHook(GoogleBaseHook):
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ enable_message_ordering: bool = False,
**kwargs,
) -> None:
if kwargs.get("delegate_to") is not None:
@@ -90,6 +92,7 @@ class PubSubHook(GoogleBaseHook):
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
+ self.enable_message_ordering = enable_message_ordering
self._client = None
def get_conn(self) -> PublisherClient:
@@ -99,7 +102,13 @@ class PubSubHook(GoogleBaseHook):
:return: Google Cloud Pub/Sub client object.
"""
if not self._client:
- self._client = PublisherClient(credentials=self.get_credentials(),
client_info=CLIENT_INFO)
+ self._client = PublisherClient(
+ credentials=self.get_credentials(),
+ client_info=CLIENT_INFO,
+ publisher_options=PublisherOptions(
+ enable_message_ordering=self.enable_message_ordering,
+ ),
+ )
return self._client
@cached_property
diff --git a/airflow/providers/google/cloud/operators/pubsub.py
b/airflow/providers/google/cloud/operators/pubsub.py
index 41e1a03830..91760ab428 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -604,6 +604,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}}
m2 = {"data": b"Knock, knock"}
m3 = {"attributes": {"foo": ""}}
+ m4 = {"data": b"Who's there?", "attributes": {"ordering_key":
"knock_knock"}}
t1 = PubSubPublishMessageOperator(
project_id="my-project",
@@ -613,6 +614,15 @@ class
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
dag=dag,
)
+ t2 = PubSubPublishMessageOperator(
+ project_id="my-project",
+ topic="my_topic",
+ messages=[m4],
+ create_topic=True,
+ enable_message_ordering=True,
+ dag=dag,
+ )
+
``project_id``, ``topic``, and ``messages`` are templated so you can use
Jinja templating
in their values.
@@ -632,6 +642,10 @@ class
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
:param gcp_conn_id: The connection ID to use connecting to
Google Cloud.
+ :param enable_message_ordering: If true, messages published with the same
+ ordering_key in PubsubMessage will be delivered to the subscribers in
the order
+ in which they are received by the Pub/Sub system. Otherwise, they may
be
+ delivered in any order. Default is False.
:param impersonation_chain: Optional service account to impersonate using
short-term
credentials, or chained list of accounts required to get the
access_token
of the last account in the list, which will be impersonated in the
request.
@@ -646,6 +660,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
"project_id",
"topic",
"messages",
+ "enable_message_ordering",
"impersonation_chain",
)
ui_color = "#0273d4"
@@ -657,6 +672,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
messages: list,
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
+ enable_message_ordering: bool = False,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
@@ -665,12 +681,14 @@ class
PubSubPublishMessageOperator(GoogleCloudBaseOperator):
self.topic = topic
self.messages = messages
self.gcp_conn_id = gcp_conn_id
+ self.enable_message_ordering = enable_message_ordering
self.impersonation_chain = impersonation_chain
def execute(self, context: Context) -> None:
hook = PubSubHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
+ enable_message_ordering=self.enable_message_ordering,
)
self.log.info("Publishing to topic %s", self.topic)
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py
b/tests/providers/google/cloud/hooks/test_pubsub.py
index eab743b0e2..29c286b091 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -25,7 +25,7 @@ import pytest
from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT
from google.cloud.exceptions import NotFound
-from google.cloud.pubsub_v1.types import ReceivedMessage
+from google.cloud.pubsub_v1.types import PublisherOptions, ReceivedMessage
from googleapiclient.errors import HttpError
from airflow.providers.google.cloud.hooks.pubsub import PubSubAsyncHook,
PubSubException, PubSubHook
@@ -86,7 +86,12 @@ class TestPubSubHook:
def test_publisher_client_creation(self, mock_client, mock_get_creds):
assert self.pubsub_hook._client is None
result = self.pubsub_hook.get_conn()
-
mock_client.assert_called_once_with(credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO)
+
+ mock_client.assert_called_once_with(
+ credentials=mock_get_creds.return_value,
+ client_info=CLIENT_INFO,
+ publisher_options=PublisherOptions(enable_message_ordering=False),
+ )
assert mock_client.return_value == result
assert self.pubsub_hook._client == result
diff --git a/tests/providers/google/cloud/operators/test_pubsub.py
b/tests/providers/google/cloud/operators/test_pubsub.py
index 0598214f10..ed65f16577 100644
--- a/tests/providers/google/cloud/operators/test_pubsub.py
+++ b/tests/providers/google/cloud/operators/test_pubsub.py
@@ -41,6 +41,9 @@ TEST_MESSAGES = [
{"data": b"Knock, knock"},
{"attributes": {"foo": ""}},
]
+TEST_MESSAGES_ORDERING_KEY = [
+ {"data": b"Hello, World!", "attributes": {"ordering_key": "key"}},
+]
class TestPubSubTopicCreateOperator:
@@ -235,6 +238,21 @@ class TestPubSubPublishOperator:
project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES
)
+ @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+ def test_publish_with_ordering_key(self, mock_hook):
+ operator = PubSubPublishMessageOperator(
+ task_id=TASK_ID,
+ project_id=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ messages=TEST_MESSAGES_ORDERING_KEY,
+ enable_message_ordering=True,
+ )
+
+ operator.execute(None)
+ mock_hook.return_value.publish.assert_called_once_with(
+ project_id=TEST_PROJECT, topic=TEST_TOPIC,
messages=TEST_MESSAGES_ORDERING_KEY
+ )
+
class TestPubSubPullOperator:
def _generate_messages(self, count):