This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b340e8c101c Fix remote logging connection availability in Task SDK
supervisor (#54679)
b340e8c101c is described below
commit b340e8c101c1ed12fb097f9aa5b41bb9c2599275
Author: Rahul Vats <[email protected]>
AuthorDate: Wed Aug 20 15:28:59 2025 +0530
Fix remote logging connection availability in Task SDK supervisor (#54679)
Using connections stored in the Airflow Metadata for Remote logging was
partially fixed
in 3.0.4, but the connection was only set/put-in-scope when the logger was
created before
task startup. However for blob stores (i.e. S3 or WASB) that is a no-op,
and we needed the
connection when we try to upload too.
---
.../src/airflow/sdk/execution_time/supervisor.py | 142 ++++++++++++---------
.../task_sdk/execution_time/test_supervisor.py | 31 ++++-
2 files changed, 109 insertions(+), 64 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b69a93eff4c..03e8c1ee345 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -33,6 +33,7 @@ from collections import deque
from collections.abc import Callable, Generator
from contextlib import contextmanager, suppress
from datetime import datetime, timezone
+from functools import lru_cache
from http import HTTPStatus
from socket import socket, socketpair
from typing import (
@@ -815,6 +816,82 @@ class WatchedSubprocess:
return self._exit_code
+@lru_cache
+def _get_remote_logging_conn(conn_id: str, client: Client) -> Connection |
None:
+ """
+ Fetch and cache connection for remote logging.
+
+ Args:
+ conn_id: Connection ID to fetch
+ client: API client for making requests
+
+ Returns:
+ Connection object or None if not found
+ """
+ # Since we need to use the API Client directly, we can't use
Connection.get as that would try to use
+ # SUPERVISOR_COMMS
+
+ # TODO: Store in the SecretsCache if its enabled - see #48858
+
+ backends = ensure_secrets_backend_loaded()
+ for secrets_backend in backends:
+ try:
+ conn = secrets_backend.get_connection(conn_id=conn_id)
+ if conn:
+ return conn
+ except Exception:
+ log.exception(
+ "Unable to retrieve connection from secrets backend (%s). "
+ "Checking subsequent secrets backend.",
+ type(secrets_backend).__name__,
+ )
+
+ conn = client.connections.get(conn_id)
+ if isinstance(conn, ConnectionResponse):
+ conn_result = ConnectionResult.from_conn_response(conn)
+ from airflow.sdk.definitions.connection import Connection
+
+ return Connection(**conn_result.model_dump(exclude={"type"},
by_alias=True))
+ return None
+
+
[email protected]
+def _remote_logging_conn(client: Client):
+ """
+ Pre-fetch the needed remote logging connection with caching.
+
+ If a remote logger is in use, and has the logging/remote_logging option
set, we try to fetch the
+ connection it needs, now, directly from the API client, and store it in an
env var, so that when the logging
+ hook tries to get the connection it can find it easily from the env vars.
+
+ This is needed as the BaseHook.get_connection looks for SUPERVISOR_COMMS,
but we are still in the
+ supervisor process when this is needed, so that doesn't exist yet.
+
+ This function uses @lru_cache for connection caching to avoid repeated API
calls.
+ """
+ from airflow.sdk.log import load_remote_conn_id, load_remote_log_handler
+
+ if load_remote_log_handler() is None or not (conn_id :=
load_remote_conn_id()):
+ # Nothing to do
+ yield
+ return
+
+ # Use cached connection fetcher
+ conn = _get_remote_logging_conn(conn_id, client)
+
+ if conn:
+ key = f"AIRFLOW_CONN_{conn_id.upper()}"
+ old = os.getenv(key)
+ os.environ[key] = conn.get_uri()
+ try:
+ yield
+ finally:
+ if old is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = old
+
+
@attrs.define(kw_only=True)
class ActivitySubprocess(WatchedSubprocess):
client: Client
@@ -931,7 +1008,8 @@ class ActivitySubprocess(WatchedSubprocess):
"""
from airflow.sdk.log import upload_to_remote
- upload_to_remote(self.process_log, self.ti)
+ with _remote_logging_conn(self.client):
+ upload_to_remote(self.process_log, self.ti)
def _monitor_subprocess(self):
"""
@@ -1651,68 +1729,6 @@ def ensure_secrets_backend_loaded() ->
list[BaseSecretsBackend]:
return backends
[email protected]
-def _remote_logging_conn(client: Client):
- """
- Pre-fetch the needed remote logging connection.
-
- If a remote logger is in use, and has the logging/remote_logging option
set, we try to fetch the
- connection it needs, now, directly from the API client, and store it in an
env var, so that when the logging
- hook tries to get the connection it
- can find it easily from the env vars
-
- This is needed as the BaseHook.get_connection looks for SUPERVISOR_COMMS,
but we are still in the
- supervisor process when this is needed, so that doesn't exist yet.
- """
- from airflow.sdk.log import load_remote_conn_id, load_remote_log_handler
-
- if load_remote_log_handler() is None or not (conn_id :=
load_remote_conn_id()):
- # Nothing to do
- yield
- return
-
- # Since we need to use the API Client directly, we can't use
Connection.get as that would try to use
- # SUPERVISOR_COMMS
-
- # TODO: Store in the SecretsCache if its enabled - see #48858
-
- def _get_conn() -> Connection | None:
- backends = ensure_secrets_backend_loaded()
- for secrets_backend in backends:
- try:
- conn = secrets_backend.get_connection(conn_id=conn_id)
- if conn:
- return conn
- except Exception:
- log.exception(
- "Unable to retrieve connection from secrets backend (%s). "
- "Checking subsequent secrets backend.",
- type(secrets_backend).__name__,
- )
-
- conn = client.connections.get(conn_id)
- if isinstance(conn, ConnectionResponse):
- conn_result = ConnectionResult.from_conn_response(conn)
- from airflow.sdk.definitions.connection import Connection
-
- return Connection(**conn_result.model_dump(exclude={"type"},
by_alias=True))
- return None
-
- if conn := _get_conn():
- key = f"AIRFLOW_CONN_{conn_id.upper()}"
- old = os.getenv(key)
-
- os.environ[key] = conn.get_uri()
-
- try:
- yield
- finally:
- if old is None:
- del os.environ[key]
- else:
- os.environ[key] = old
-
-
def _configure_logging(log_path: str, client: Client) ->
tuple[FilteringBoundLogger, BinaryIO | TextIO]:
# If we are told to write logs to a file, redirect the task logger to it.
Make sure we append to the
# file though, otherwise when we resume we would lose the logs from the
start->deferral segment if it
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 086d545c768..8905124fc7d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2260,7 +2260,7 @@ class TestInProcessClient:
pytest.param(False, "", "", id="no-remote-logging"),
),
)
-def test_remote_logging_conn(remote_logging, remote_conn, expected_env,
monkeypatch):
+def test_remote_logging_conn(remote_logging, remote_conn, expected_env,
monkeypatch, mocker):
# This doesn't strictly need the AWS provider, but it does need something
that
# airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG
knows about
pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider
not installed")
@@ -2279,6 +2279,9 @@ def test_remote_logging_conn(remote_logging, remote_conn,
expected_env, monkeypa
},
)
+ mock_masker = mocker.Mock()
+ mocker.patch("airflow.sdk.execution_time.secrets_masker._secrets_masker",
return_value=mock_masker)
+
with conf_vars(
{
("logging", "remote_logging"): str(remote_logging),
@@ -2295,3 +2298,29 @@ def test_remote_logging_conn(remote_logging,
remote_conn, expected_env, monkeypa
assert new_keys == {expected_env}
else:
assert not new_keys
+
+ if remote_logging and expected_env:
+ connection_available = {"available": False, "conn_uri": None}
+
+ def mock_upload_to_remote(process_log, ti):
+ connection_available["available"] = expected_env in os.environ
+ connection_available["conn_uri"] = os.environ.get(expected_env)
+
+ mocker.patch("airflow.sdk.log.upload_to_remote",
side_effect=mock_upload_to_remote)
+
+ activity_subprocess = ActivitySubprocess(
+ process_log=mocker.MagicMock(),
+ id=TI_ID,
+ pid=12345,
+ stdin=mocker.MagicMock(),
+ client=client,
+ process=mocker.MagicMock(),
+ )
+ activity_subprocess.ti = mocker.MagicMock()
+
+ activity_subprocess._upload_logs()
+
+ assert connection_available["available"], (
+ f"Connection {expected_env} was not available during
upload_to_remote call"
+ )
+ assert connection_available["conn_uri"] is not None, "Connection
URI was None during upload"