This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ad99263314 feat(google): add enable_open_telemetry_tracing to 
PubSubHook and PubSubPublishMessageOperator (#58766)
9ad99263314 is described below

commit 9ad99263314636dfacbe5ce8dbd1b7287975bb7d
Author: Nikhil Nadig <[email protected]>
AuthorDate: Tue Dec 2 00:49:57 2025 +0100

    feat(google): add enable_open_telemetry_tracing to PubSubHook and 
PubSubPublishMessageOperator (#58766)
    
    * feat(google): add enable_open_telemetry_tracing to PubSubHook and 
PubSubPublishMessageOperator
    
    * chore(google): bump google-cloud-pubsub minimum to 2.24.0 for otel support
---
 providers/google/README.rst                        |  2 +-
 providers/google/docs/index.rst                    |  2 +-
 providers/google/pyproject.toml                    |  2 +-
 .../airflow/providers/google/cloud/hooks/pubsub.py |  3 ++
 .../providers/google/cloud/operators/pubsub.py     |  7 +++
 .../tests/unit/google/cloud/hooks/test_pubsub.py   | 50 +++++++++++++++++++++-
 .../unit/google/cloud/operators/test_pubsub.py     | 43 +++++++++++++++++++
 7 files changed, 105 insertions(+), 4 deletions(-)

diff --git a/providers/google/README.rst b/providers/google/README.rst
index 47df431f2e4..81b6b59bbbd 100644
--- a/providers/google/README.rst
+++ b/providers/google/README.rst
@@ -101,7 +101,7 @@ PIP package                                 Version required
 ``google-cloud-monitoring``                 ``>=2.18.0``
 ``google-cloud-orchestration-airflow``      ``>=1.10.0``
 ``google-cloud-os-login``                   ``>=2.9.1``
-``google-cloud-pubsub``                     ``>=2.21.3``
+``google-cloud-pubsub``                     ``>=2.24.0``
 ``google-cloud-redis``                      ``>=2.12.0``
 ``google-cloud-secret-manager``             ``>=2.16.0``
 ``google-cloud-spanner``                    ``>=3.50.0``
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 02ffa3b9ed4..d018b141f1d 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -152,7 +152,7 @@ PIP package                                 Version required
 ``google-cloud-monitoring``                 ``>=2.18.0``
 ``google-cloud-orchestration-airflow``      ``>=1.10.0``
 ``google-cloud-os-login``                   ``>=2.9.1``
-``google-cloud-pubsub``                     ``>=2.21.3``
+``google-cloud-pubsub``                     ``>=2.24.0``
 ``google-cloud-redis``                      ``>=2.12.0``
 ``google-cloud-secret-manager``             ``>=2.16.0``
 ``google-cloud-spanner``                    ``>=3.50.0``
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index f5f51ae24c1..13b0d798291 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -105,7 +105,7 @@ dependencies = [
     "google-cloud-monitoring>=2.18.0",
     "google-cloud-orchestration-airflow>=1.10.0",
     "google-cloud-os-login>=2.9.1",
-    "google-cloud-pubsub>=2.21.3",
+    "google-cloud-pubsub>=2.24.0",
     "google-cloud-redis>=2.12.0",
     "google-cloud-secret-manager>=2.16.0",
     "google-cloud-spanner>=3.50.0",
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
index 58237c5c2ae..b4da615f389 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
@@ -82,6 +82,7 @@ class PubSubHook(GoogleBaseHook):
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         enable_message_ordering: bool = False,
+        enable_open_telemetry_tracing: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(
@@ -90,6 +91,7 @@ class PubSubHook(GoogleBaseHook):
             **kwargs,
         )
         self.enable_message_ordering = enable_message_ordering
+        self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
         self._client = None
 
     def get_conn(self) -> PublisherClient:
@@ -104,6 +106,7 @@ class PubSubHook(GoogleBaseHook):
                 client_info=CLIENT_INFO,
                 publisher_options=PublisherOptions(
                     enable_message_ordering=self.enable_message_ordering,
+                    
enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
                 ),
             )
         return self._client
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py 
b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
index 250bcfde4ef..d02ac90171d 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
@@ -680,6 +680,9 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         ordering_key in PubsubMessage will be delivered to the subscribers in 
the order
         in which they are received by the Pub/Sub system. Otherwise, they may 
be
         delivered in any order. Default is False.
+    :param enable_open_telemetry_tracing: If true, enables OpenTelemetry 
tracing for
+        published messages. This allows distributed tracing of messages as 
they flow
+        through Pub/Sub topics. Default is False.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
         credentials, or chained list of accounts required to get the 
access_token
         of the last account in the list, which will be impersonated in the 
request.
@@ -695,6 +698,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         "topic",
         "messages",
         "enable_message_ordering",
+        "enable_open_telemetry_tracing",
         "impersonation_chain",
     )
     ui_color = "#0273d4"
@@ -707,6 +711,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         project_id: str = PROVIDE_PROJECT_ID,
         gcp_conn_id: str = "google_cloud_default",
         enable_message_ordering: bool = False,
+        enable_open_telemetry_tracing: bool = False,
         impersonation_chain: str | Sequence[str] | None = None,
         **kwargs,
     ) -> None:
@@ -716,6 +721,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
         self.messages = messages
         self.gcp_conn_id = gcp_conn_id
         self.enable_message_ordering = enable_message_ordering
+        self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
         self.impersonation_chain = impersonation_chain
 
     @cached_property
@@ -724,6 +730,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
             enable_message_ordering=self.enable_message_ordering,
+            enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
         )
 
     def execute(self, context: Context) -> None:
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py 
b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
index fbf10fe6ae8..1375b592073 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
@@ -86,11 +86,59 @@ class TestPubSubHook:
         mock_client.assert_called_once_with(
             credentials=mock_get_creds.return_value,
             client_info=CLIENT_INFO,
-            publisher_options=PublisherOptions(enable_message_ordering=False),
+            publisher_options=PublisherOptions(
+                enable_message_ordering=False,
+                enable_open_telemetry_tracing=False,
+            ),
         )
         assert mock_client.return_value == result
         assert self.pubsub_hook._client == result
 
+    
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
+    @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
+    def test_publisher_client_creation_with_open_telemetry_tracing(self, 
mock_client, mock_get_creds):
+        with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), 
new=mock_init):
+            pubsub_hook_with_tracing = PubSubHook(
+                gcp_conn_id="test",
+                enable_open_telemetry_tracing=True,
+            )
+        assert pubsub_hook_with_tracing._client is None
+        result = pubsub_hook_with_tracing.get_conn()
+
+        mock_client.assert_called_once_with(
+            credentials=mock_get_creds.return_value,
+            client_info=CLIENT_INFO,
+            publisher_options=PublisherOptions(
+                enable_message_ordering=False,
+                enable_open_telemetry_tracing=True,
+            ),
+        )
+        assert mock_client.return_value == result
+        assert pubsub_hook_with_tracing._client == result
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
+    @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
+    def test_publisher_client_creation_with_message_ordering_and_tracing(self, 
mock_client, mock_get_creds):
+        with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"), 
new=mock_init):
+            pubsub_hook_with_both = PubSubHook(
+                gcp_conn_id="test",
+                enable_message_ordering=True,
+                enable_open_telemetry_tracing=True,
+            )
+        assert pubsub_hook_with_both._client is None
+        result = pubsub_hook_with_both.get_conn()
+
+        mock_client.assert_called_once_with(
+            credentials=mock_get_creds.return_value,
+            client_info=CLIENT_INFO,
+            publisher_options=PublisherOptions(
+                enable_message_ordering=True,
+                enable_open_telemetry_tracing=True,
+            ),
+        )
+        assert mock_client.return_value == result
+        assert pubsub_hook_with_both._client == result
+
     
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
     @mock.patch("airflow.providers.google.cloud.hooks.pubsub.SubscriberClient")
     def test_subscriber_client_creation(self, mock_client, mock_get_creds):
diff --git a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py 
b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
index 289001e3bc8..c335c3baa73 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
@@ -352,6 +352,49 @@ class TestPubSubPublishOperator:
             project_id=TEST_PROJECT, topic=TEST_TOPIC, 
messages=TEST_MESSAGES_ORDERING_KEY
         )
 
+    @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+    def test_publish_with_open_telemetry_tracing(self, mock_hook):
+        operator = PubSubPublishMessageOperator(
+            task_id=TASK_ID,
+            project_id=TEST_PROJECT,
+            topic=TEST_TOPIC,
+            messages=TEST_MESSAGES,
+            enable_open_telemetry_tracing=True,
+        )
+
+        operator.execute(None)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id="google_cloud_default",
+            impersonation_chain=None,
+            enable_message_ordering=False,
+            enable_open_telemetry_tracing=True,
+        )
+        mock_hook.return_value.publish.assert_called_once_with(
+            project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES
+        )
+
+    @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+    def test_publish_with_ordering_and_tracing(self, mock_hook):
+        operator = PubSubPublishMessageOperator(
+            task_id=TASK_ID,
+            project_id=TEST_PROJECT,
+            topic=TEST_TOPIC,
+            messages=TEST_MESSAGES_ORDERING_KEY,
+            enable_message_ordering=True,
+            enable_open_telemetry_tracing=True,
+        )
+
+        operator.execute(None)
+        mock_hook.assert_called_once_with(
+            gcp_conn_id="google_cloud_default",
+            impersonation_chain=None,
+            enable_message_ordering=True,
+            enable_open_telemetry_tracing=True,
+        )
+        mock_hook.return_value.publish.assert_called_once_with(
+            project_id=TEST_PROJECT, topic=TEST_TOPIC, 
messages=TEST_MESSAGES_ORDERING_KEY
+        )
+
     @pytest.mark.parametrize(
         ("project_id", "expected_dataset"),
         [

Reply via email to