This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ad99263314 feat(google): add enable_open_telemetry_tracing to
PubSubHook and PubSubPublishMessageOperator (#58766)
9ad99263314 is described below
commit 9ad99263314636dfacbe5ce8dbd1b7287975bb7d
Author: Nikhil Nadig <[email protected]>
AuthorDate: Tue Dec 2 00:49:57 2025 +0100
feat(google): add enable_open_telemetry_tracing to PubSubHook and
PubSubPublishMessageOperator (#58766)
* feat(google): add enable_open_telemetry_tracing to PubSubHook and
PubSubPublishMessageOperator
* chore(google): bump google-cloud-pubsub minimum to 2.24.0 for otel support
---
providers/google/README.rst | 2 +-
providers/google/docs/index.rst | 2 +-
providers/google/pyproject.toml | 2 +-
.../airflow/providers/google/cloud/hooks/pubsub.py | 3 ++
.../providers/google/cloud/operators/pubsub.py | 7 +++
.../tests/unit/google/cloud/hooks/test_pubsub.py | 50 +++++++++++++++++++++-
.../unit/google/cloud/operators/test_pubsub.py | 43 +++++++++++++++++++
7 files changed, 105 insertions(+), 4 deletions(-)
diff --git a/providers/google/README.rst b/providers/google/README.rst
index 47df431f2e4..81b6b59bbbd 100644
--- a/providers/google/README.rst
+++ b/providers/google/README.rst
@@ -101,7 +101,7 @@ PIP package Version required
``google-cloud-monitoring`` ``>=2.18.0``
``google-cloud-orchestration-airflow`` ``>=1.10.0``
``google-cloud-os-login`` ``>=2.9.1``
-``google-cloud-pubsub`` ``>=2.21.3``
+``google-cloud-pubsub`` ``>=2.24.0``
``google-cloud-redis`` ``>=2.12.0``
``google-cloud-secret-manager`` ``>=2.16.0``
``google-cloud-spanner`` ``>=3.50.0``
diff --git a/providers/google/docs/index.rst b/providers/google/docs/index.rst
index 02ffa3b9ed4..d018b141f1d 100644
--- a/providers/google/docs/index.rst
+++ b/providers/google/docs/index.rst
@@ -152,7 +152,7 @@ PIP package Version required
``google-cloud-monitoring`` ``>=2.18.0``
``google-cloud-orchestration-airflow`` ``>=1.10.0``
``google-cloud-os-login`` ``>=2.9.1``
-``google-cloud-pubsub`` ``>=2.21.3``
+``google-cloud-pubsub`` ``>=2.24.0``
``google-cloud-redis`` ``>=2.12.0``
``google-cloud-secret-manager`` ``>=2.16.0``
``google-cloud-spanner`` ``>=3.50.0``
diff --git a/providers/google/pyproject.toml b/providers/google/pyproject.toml
index f5f51ae24c1..13b0d798291 100644
--- a/providers/google/pyproject.toml
+++ b/providers/google/pyproject.toml
@@ -105,7 +105,7 @@ dependencies = [
"google-cloud-monitoring>=2.18.0",
"google-cloud-orchestration-airflow>=1.10.0",
"google-cloud-os-login>=2.9.1",
- "google-cloud-pubsub>=2.21.3",
+ "google-cloud-pubsub>=2.24.0",
"google-cloud-redis>=2.12.0",
"google-cloud-secret-manager>=2.16.0",
"google-cloud-spanner>=3.50.0",
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
index 58237c5c2ae..b4da615f389 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/pubsub.py
@@ -82,6 +82,7 @@ class PubSubHook(GoogleBaseHook):
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
enable_message_ordering: bool = False,
+ enable_open_telemetry_tracing: bool = False,
**kwargs,
) -> None:
super().__init__(
@@ -90,6 +91,7 @@ class PubSubHook(GoogleBaseHook):
**kwargs,
)
self.enable_message_ordering = enable_message_ordering
+ self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
self._client = None
def get_conn(self) -> PublisherClient:
@@ -104,6 +106,7 @@ class PubSubHook(GoogleBaseHook):
client_info=CLIENT_INFO,
publisher_options=PublisherOptions(
enable_message_ordering=self.enable_message_ordering,
+
enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
),
)
return self._client
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
index 250bcfde4ef..d02ac90171d 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/pubsub.py
@@ -680,6 +680,9 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
ordering_key in PubsubMessage will be delivered to the subscribers in
the order
in which they are received by the Pub/Sub system. Otherwise, they may
be
delivered in any order. Default is False.
+ :param enable_open_telemetry_tracing: If true, enables OpenTelemetry
tracing for
+ published messages. This allows distributed tracing of messages as
they flow
+ through Pub/Sub topics. Default is False.
:param impersonation_chain: Optional service account to impersonate using
short-term
credentials, or chained list of accounts required to get the
access_token
of the last account in the list, which will be impersonated in the
request.
@@ -695,6 +698,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
"topic",
"messages",
"enable_message_ordering",
+ "enable_open_telemetry_tracing",
"impersonation_chain",
)
ui_color = "#0273d4"
@@ -707,6 +711,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
project_id: str = PROVIDE_PROJECT_ID,
gcp_conn_id: str = "google_cloud_default",
enable_message_ordering: bool = False,
+ enable_open_telemetry_tracing: bool = False,
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
@@ -716,6 +721,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
self.messages = messages
self.gcp_conn_id = gcp_conn_id
self.enable_message_ordering = enable_message_ordering
+ self.enable_open_telemetry_tracing = enable_open_telemetry_tracing
self.impersonation_chain = impersonation_chain
@cached_property
@@ -724,6 +730,7 @@ class PubSubPublishMessageOperator(GoogleCloudBaseOperator):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
enable_message_ordering=self.enable_message_ordering,
+ enable_open_telemetry_tracing=self.enable_open_telemetry_tracing,
)
def execute(self, context: Context) -> None:
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
index fbf10fe6ae8..1375b592073 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_pubsub.py
@@ -86,11 +86,59 @@ class TestPubSubHook:
mock_client.assert_called_once_with(
credentials=mock_get_creds.return_value,
client_info=CLIENT_INFO,
- publisher_options=PublisherOptions(enable_message_ordering=False),
+ publisher_options=PublisherOptions(
+ enable_message_ordering=False,
+ enable_open_telemetry_tracing=False,
+ ),
)
assert mock_client.return_value == result
assert self.pubsub_hook._client == result
+
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
+ @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
+ def test_publisher_client_creation_with_open_telemetry_tracing(self,
mock_client, mock_get_creds):
+ with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"),
new=mock_init):
+ pubsub_hook_with_tracing = PubSubHook(
+ gcp_conn_id="test",
+ enable_open_telemetry_tracing=True,
+ )
+ assert pubsub_hook_with_tracing._client is None
+ result = pubsub_hook_with_tracing.get_conn()
+
+ mock_client.assert_called_once_with(
+ credentials=mock_get_creds.return_value,
+ client_info=CLIENT_INFO,
+ publisher_options=PublisherOptions(
+ enable_message_ordering=False,
+ enable_open_telemetry_tracing=True,
+ ),
+ )
+ assert mock_client.return_value == result
+ assert pubsub_hook_with_tracing._client == result
+
+
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
+ @mock.patch("airflow.providers.google.cloud.hooks.pubsub.PublisherClient")
+ def test_publisher_client_creation_with_message_ordering_and_tracing(self,
mock_client, mock_get_creds):
+ with mock.patch(BASE_STRING.format("GoogleBaseHook.__init__"),
new=mock_init):
+ pubsub_hook_with_both = PubSubHook(
+ gcp_conn_id="test",
+ enable_message_ordering=True,
+ enable_open_telemetry_tracing=True,
+ )
+ assert pubsub_hook_with_both._client is None
+ result = pubsub_hook_with_both.get_conn()
+
+ mock_client.assert_called_once_with(
+ credentials=mock_get_creds.return_value,
+ client_info=CLIENT_INFO,
+ publisher_options=PublisherOptions(
+ enable_message_ordering=True,
+ enable_open_telemetry_tracing=True,
+ ),
+ )
+ assert mock_client.return_value == result
+ assert pubsub_hook_with_both._client == result
+
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.PubSubHook.get_credentials")
@mock.patch("airflow.providers.google.cloud.hooks.pubsub.SubscriberClient")
def test_subscriber_client_creation(self, mock_client, mock_get_creds):
diff --git a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
index 289001e3bc8..c335c3baa73 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_pubsub.py
@@ -352,6 +352,49 @@ class TestPubSubPublishOperator:
project_id=TEST_PROJECT, topic=TEST_TOPIC,
messages=TEST_MESSAGES_ORDERING_KEY
)
+ @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+ def test_publish_with_open_telemetry_tracing(self, mock_hook):
+ operator = PubSubPublishMessageOperator(
+ task_id=TASK_ID,
+ project_id=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ messages=TEST_MESSAGES,
+ enable_open_telemetry_tracing=True,
+ )
+
+ operator.execute(None)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id="google_cloud_default",
+ impersonation_chain=None,
+ enable_message_ordering=False,
+ enable_open_telemetry_tracing=True,
+ )
+ mock_hook.return_value.publish.assert_called_once_with(
+ project_id=TEST_PROJECT, topic=TEST_TOPIC, messages=TEST_MESSAGES
+ )
+
+ @mock.patch("airflow.providers.google.cloud.operators.pubsub.PubSubHook")
+ def test_publish_with_ordering_and_tracing(self, mock_hook):
+ operator = PubSubPublishMessageOperator(
+ task_id=TASK_ID,
+ project_id=TEST_PROJECT,
+ topic=TEST_TOPIC,
+ messages=TEST_MESSAGES_ORDERING_KEY,
+ enable_message_ordering=True,
+ enable_open_telemetry_tracing=True,
+ )
+
+ operator.execute(None)
+ mock_hook.assert_called_once_with(
+ gcp_conn_id="google_cloud_default",
+ impersonation_chain=None,
+ enable_message_ordering=True,
+ enable_open_telemetry_tracing=True,
+ )
+ mock_hook.return_value.publish.assert_called_once_with(
+ project_id=TEST_PROJECT, topic=TEST_TOPIC,
messages=TEST_MESSAGES_ORDERING_KEY
+ )
+
@pytest.mark.parametrize(
("project_id", "expected_dataset"),
[