This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4e26a618d77 Harden Stackdriver handler against Cloud Logging failures
(#67513)
4e26a618d77 is described below
commit 4e26a618d77b426a3080a4ce69899d3b1a6d6ab2
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Jun 2 17:09:09 2026 +0200
Harden Stackdriver handler against Cloud Logging failures (#67513)
* google: harden Stackdriver handler against Cloud Logging failures
Three failure modes in ``StackdriverTaskHandler`` exposed internal
details or broke shutdown:
1. ``read()`` did not wrap ``_read_logs()``. When Cloud Logging was
unavailable, gRPC errors propagated as HTTP 500 from the log
viewer instead of degrading gracefully (F-011).
2. gRPC errors from ``list_log_entries`` carry project IDs, resource
names, and service-account info in their ``__str__``, and were
forwarded into the user-visible error response (F-010).
3. ``close()`` called ``self._transport.flush()`` without exception
handling. A failed flush during shutdown raised through stdlib's
logging machinery, which does not handle exceptions from
``Handler.close()`` gracefully (F-013).
Wrap ``_read_logs()`` in ``read()`` with a try/except that surfaces a
short, generic user-facing message ("Cloud Logging is currently
unavailable.") and writes the full traceback to the handler's own
``_logger``. The outer guard catches the gRPC exceptions before they
reach the user, so F-010's leakage path is closed without adding a
second swallow inside ``_read_single_logs_page``.
Wrap ``_transport.flush()`` in ``close()`` with a try/except that
prints to stderr (since logging itself may be shutting down) so the
shutdown chain continues even when Cloud Logging is unreachable.
* Fix transport mock injection in test_close_swallows_transport_flush_errors
StackdriverRemoteLogIO is a slotted attrs class (@attrs.define), so the
cached_property `transport` is stored in a slot, not in the instance
__dict__. The test injected the broken transport via
`handler.io.__dict__["transport"] = ...`, which a slotted class silently
ignores — close() then built the real transport and hit
DefaultCredentialsError, so the assertion on the expected message failed.
Assign the attribute directly (`handler.io.transport = broken_transport`)
to pre-seed the slot without building a real transport.
---
.../google/cloud/log/stackdriver_task_handler.py | 33 +++++++-
.../cloud/log/test_stackdriver_task_handler.py | 91 ++++++++++++++++++++++
2 files changed, 122 insertions(+), 2 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 145c2564539..262f141fabb 100644
---
a/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++
b/providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -23,6 +23,7 @@ import copy
import logging
import os
import shutil
+import sys
import warnings
from collections.abc import Collection
from datetime import datetime
@@ -64,6 +65,11 @@ if TYPE_CHECKING:
DEFAULT_LOGGER_NAME = "airflow"
_GLOBAL_RESOURCE = Resource(type="global", labels={})
+# Dedicated logger for handler-internal failures (Cloud Logging unavailable,
gRPC errors).
+# Routed to the same
``airflow.providers.google.cloud.log.stackdriver_task_handler`` namespace
+# so operators see these alongside the rest of the handler's logs.
+_logger = logging.getLogger(__name__)
+
_DEFAULT_SCOPESS = frozenset(
["https://www.googleapis.com/auth/logging.read",
"https://www.googleapis.com/auth/logging.write"]
)
@@ -422,7 +428,19 @@ class StackdriverTaskHandler(logging.Handler):
next_page_token = metadata.get("next_page_token", None)
all_pages = "download_logs" in metadata and metadata["download_logs"]
- messages, end_of_log, next_page_token = self.io.read_logs(log_filter,
next_page_token, all_pages)
+ try:
+ messages, end_of_log, next_page_token =
self.io.read_logs(log_filter, next_page_token, all_pages)
+ except Exception:
+ # Cloud Logging unavailable / IAM glitch / gRPC error. Without a
guard, the
+ # exception used to propagate up as HTTP 500 from the log viewer.
Degrade
+ # gracefully instead: surface a short user-facing message, mark
the read
+ # complete (no spinning retry), and log the full traceback to the
handler's
+ # own logger for the operator.
+ _logger.exception("Failed to read logs from Cloud Logging for
filter %s", log_filter)
+ return (
+ [((self.task_instance_hostname, "Cloud Logging is currently
unavailable."),)],
+ [{"end_of_log": True}],
+ )
new_metadata: dict[str, str | bool] = {"end_of_log": end_of_log}
@@ -483,4 +501,15 @@ class StackdriverTaskHandler(logging.Handler):
return url
def close(self) -> None:
- self.io.transport.flush()
+ # ``flush()`` is best-effort during shutdown — if Cloud Logging is
unavailable or
+ # the transport raises, that's not a reason to break the rest of the
handler's
+ # shutdown chain (and the stdlib logging machinery does not handle
exceptions
+ # from ``Handler.close()`` gracefully). Print to stderr as last resort
since
+ # logging itself may be shutting down.
+ try:
+ self.io.transport.flush()
+ except Exception as exc:
+ print(
+ f"StackdriverTaskHandler.close: transport flush failed:
{type(exc).__name__}: {exc}",
+ file=sys.stderr,
+ )
diff --git
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
index a55b62fa4f4..0eb6c209f8f 100644
---
a/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
+++
b/providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py
@@ -672,3 +672,94 @@ class TestStackdriverLoggingHandlerTask:
f'labels.try_number="{self.ti.try_number}"',
]
assert set(expected_filter) == set(filter_params)
+
+
+class TestStackdriverTaskHandlerExceptionHandling:
+ """Cloud Logging failures must degrade gracefully, not leak internals."""
+
+
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
+
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client")
+ def test_read_falls_back_when_cloud_logging_unavailable(
+ self, mock_client, mock_get_creds_and_project_id, caplog
+ ):
+ """``read()`` must surface a user-facing message when Cloud Logging
raises.
+
+ Without a guard, a gRPC error from ``list_log_entries`` propagates as
HTTP 500
+ on the log viewer. The fix degrades gracefully and logs the full
traceback for
+ the operator.
+ """
+ from google.api_core import exceptions as gapi_exceptions
+
+ mock_get_creds_and_project_id.return_value = ("creds", "project_id")
+ mock_client.return_value.list_log_entries.side_effect =
gapi_exceptions.ServiceUnavailable(
+ "Stackdriver returned an internal error for project
secret-project-id"
+ )
+
+ handler = StackdriverTaskHandler()
+ ti = mock.MagicMock()
+ ti.task_id = "t"
+ ti.dag_id = "d"
+ ti.try_number = 1
+ ti.logical_date = mock.MagicMock(isoformat=lambda:
"2020-01-01T00:00:00+00:00")
+ ti.execution_date = ti.logical_date
+
+ with caplog.at_level(logging.ERROR):
+ logs, metadata = handler.read(ti, try_number=1)
+
+ # The user-facing message must NOT include the project id / internal
details.
+ message = logs[0][0][1]
+ assert "Cloud Logging is currently unavailable" in message
+ assert "secret-project-id" not in message
+ assert metadata == [{"end_of_log": True}]
+
+
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id")
+
@mock.patch("airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client")
+ def test_read_does_not_leak_internals_in_user_facing_message(
+ self, mock_client, mock_get_creds_and_project_id
+ ):
+ """``read()`` must not propagate gRPC error details into user-visible
messages.
+
+ A ``PermissionDenied`` from ``list_log_entries`` typically carries the
service
+ account email + the missing IAM permission. The outer guard in
``read()`` must
+ replace it with a generic message so an authenticated user sees no
internal
+ identifiers.
+ """
+ from google.api_core import exceptions as gapi_exceptions
+
+ mock_get_creds_and_project_id.return_value = ("creds", "project_id")
+ mock_client.return_value.list_log_entries.side_effect =
gapi_exceptions.PermissionDenied(
+ "service account '[email protected]' lacks
logging.logEntries.list"
+ )
+
+ handler = StackdriverTaskHandler()
+ ti = mock.MagicMock()
+ ti.task_id = "t"
+ ti.dag_id = "d"
+ ti.try_number = 1
+ ti.logical_date = mock.MagicMock(isoformat=lambda:
"2020-01-01T00:00:00+00:00")
+ ti.execution_date = ti.logical_date
+
+ logs, _ = handler.read(ti, try_number=1)
+
+ message = logs[0][0][1]
+ assert "Cloud Logging is currently unavailable" in message
+ assert "[email protected]" not in message
+ assert "logging.logEntries.list" not in message
+
+ def test_close_swallows_transport_flush_errors(self, capsys):
+ """``close()`` must never raise — even when transport ``flush()``
fails."""
+ handler = StackdriverTaskHandler()
+ broken_transport = mock.MagicMock()
+ broken_transport.flush.side_effect = RuntimeError("flush failed during
shutdown")
+ # ``transport`` is a cached_property on the slotted attrs class
+ # ``StackdriverRemoteLogIO``; its value lives in a slot, not
``__dict__``, so
+ # assign the attribute directly to pre-seed it without building a real
transport.
+ handler.io.transport = broken_transport
+
+ # Must not raise.
+ handler.close()
+
+ # The failure is surfaced on stderr because the logging machinery may
be shutting down.
+ captured = capsys.readouterr()
+ assert "transport flush failed" in captured.err
+ assert "flush failed during shutdown" in captured.err