This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b340e8c101c Fix remote logging connection availability in Task SDK 
supervisor (#54679)
b340e8c101c is described below

commit b340e8c101c1ed12fb097f9aa5b41bb9c2599275
Author: Rahul Vats <[email protected]>
AuthorDate: Wed Aug 20 15:28:59 2025 +0530

    Fix remote logging connection availability in Task SDK supervisor (#54679)
    
    Using connections stored in the Airflow Metadata for Remote logging was 
partially fixed
    in 3.0.4, but the connection was only set/put-in-scope when the logger was 
created before
    task startup. However for blob stores (i.e. S3 or WASB) that is a no-op, 
and we needed the
    connection when we try to upload too.
---
 .../src/airflow/sdk/execution_time/supervisor.py   | 142 ++++++++++++---------
 .../task_sdk/execution_time/test_supervisor.py     |  31 ++++-
 2 files changed, 109 insertions(+), 64 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index b69a93eff4c..03e8c1ee345 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -33,6 +33,7 @@ from collections import deque
 from collections.abc import Callable, Generator
 from contextlib import contextmanager, suppress
 from datetime import datetime, timezone
+from functools import lru_cache
 from http import HTTPStatus
 from socket import socket, socketpair
 from typing import (
@@ -815,6 +816,82 @@ class WatchedSubprocess:
         return self._exit_code
 
 
+@lru_cache
+def _get_remote_logging_conn(conn_id: str, client: Client) -> Connection | 
None:
+    """
+    Fetch and cache connection for remote logging.
+
+    Args:
+        conn_id: Connection ID to fetch
+        client: API client for making requests
+
+    Returns:
+        Connection object or None if not found
+    """
+    # Since we need to use the API Client directly, we can't use 
Connection.get as that would try to use
+    # SUPERVISOR_COMMS
+
+    # TODO: Store in the SecretsCache if its enabled - see #48858
+
+    backends = ensure_secrets_backend_loaded()
+    for secrets_backend in backends:
+        try:
+            conn = secrets_backend.get_connection(conn_id=conn_id)
+            if conn:
+                return conn
+        except Exception:
+            log.exception(
+                "Unable to retrieve connection from secrets backend (%s). "
+                "Checking subsequent secrets backend.",
+                type(secrets_backend).__name__,
+            )
+
+    conn = client.connections.get(conn_id)
+    if isinstance(conn, ConnectionResponse):
+        conn_result = ConnectionResult.from_conn_response(conn)
+        from airflow.sdk.definitions.connection import Connection
+
+        return Connection(**conn_result.model_dump(exclude={"type"}, 
by_alias=True))
+    return None
+
+
[email protected]
+def _remote_logging_conn(client: Client):
+    """
+    Pre-fetch the needed remote logging connection with caching.
+
+    If a remote logger is in use, and has the logging/remote_logging option 
set, we try to fetch the
+    connection it needs, now, directly from the API client, and store it in an 
env var, so that when the logging
+    hook tries to get the connection it can find it easily from the env vars.
+
+    This is needed as the BaseHook.get_connection looks for SUPERVISOR_COMMS, 
but we are still in the
+    supervisor process when this is needed, so that doesn't exist yet.
+
+    This function uses @lru_cache for connection caching to avoid repeated API 
calls.
+    """
+    from airflow.sdk.log import load_remote_conn_id, load_remote_log_handler
+
+    if load_remote_log_handler() is None or not (conn_id := 
load_remote_conn_id()):
+        # Nothing to do
+        yield
+        return
+
+    # Use cached connection fetcher
+    conn = _get_remote_logging_conn(conn_id, client)
+
+    if conn:
+        key = f"AIRFLOW_CONN_{conn_id.upper()}"
+        old = os.getenv(key)
+        os.environ[key] = conn.get_uri()
+        try:
+            yield
+        finally:
+            if old is None:
+                del os.environ[key]
+            else:
+                os.environ[key] = old
+
+
 @attrs.define(kw_only=True)
 class ActivitySubprocess(WatchedSubprocess):
     client: Client
@@ -931,7 +1008,8 @@ class ActivitySubprocess(WatchedSubprocess):
         """
         from airflow.sdk.log import upload_to_remote
 
-        upload_to_remote(self.process_log, self.ti)
+        with _remote_logging_conn(self.client):
+            upload_to_remote(self.process_log, self.ti)
 
     def _monitor_subprocess(self):
         """
@@ -1651,68 +1729,6 @@ def ensure_secrets_backend_loaded() -> 
list[BaseSecretsBackend]:
     return backends
 
 
[email protected]
-def _remote_logging_conn(client: Client):
-    """
-    Pre-fetch the needed remote logging connection.
-
-    If a remote logger is in use, and has the logging/remote_logging option 
set, we try to fetch the
-    connection it needs, now, directly from the API client, and store it in an 
env var, so that when the logging
-    hook tries to get the connection it
-    can find it easily from the env vars
-
-    This is needed as the BaseHook.get_connection looks for SUPERVISOR_COMMS, 
but we are still in the
-    supervisor process when this is needed, so that doesn't exist yet.
-    """
-    from airflow.sdk.log import load_remote_conn_id, load_remote_log_handler
-
-    if load_remote_log_handler() is None or not (conn_id := 
load_remote_conn_id()):
-        # Nothing to do
-        yield
-        return
-
-    # Since we need to use the API Client directly, we can't use 
Connection.get as that would try to use
-    # SUPERVISOR_COMMS
-
-    # TODO: Store in the SecretsCache if its enabled - see #48858
-
-    def _get_conn() -> Connection | None:
-        backends = ensure_secrets_backend_loaded()
-        for secrets_backend in backends:
-            try:
-                conn = secrets_backend.get_connection(conn_id=conn_id)
-                if conn:
-                    return conn
-            except Exception:
-                log.exception(
-                    "Unable to retrieve connection from secrets backend (%s). "
-                    "Checking subsequent secrets backend.",
-                    type(secrets_backend).__name__,
-                )
-
-        conn = client.connections.get(conn_id)
-        if isinstance(conn, ConnectionResponse):
-            conn_result = ConnectionResult.from_conn_response(conn)
-            from airflow.sdk.definitions.connection import Connection
-
-            return Connection(**conn_result.model_dump(exclude={"type"}, 
by_alias=True))
-        return None
-
-    if conn := _get_conn():
-        key = f"AIRFLOW_CONN_{conn_id.upper()}"
-        old = os.getenv(key)
-
-        os.environ[key] = conn.get_uri()
-
-        try:
-            yield
-        finally:
-            if old is None:
-                del os.environ[key]
-            else:
-                os.environ[key] = old
-
-
 def _configure_logging(log_path: str, client: Client) -> 
tuple[FilteringBoundLogger, BinaryIO | TextIO]:
     # If we are told to write logs to a file, redirect the task logger to it. 
Make sure we append to the
     # file though, otherwise when we resume we would lose the logs from the 
start->deferral segment if it
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 086d545c768..8905124fc7d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2260,7 +2260,7 @@ class TestInProcessClient:
         pytest.param(False, "", "", id="no-remote-logging"),
     ),
 )
-def test_remote_logging_conn(remote_logging, remote_conn, expected_env, 
monkeypatch):
+def test_remote_logging_conn(remote_logging, remote_conn, expected_env, 
monkeypatch, mocker):
     # This doesn't strictly need the AWS provider, but it does need something 
that
     # airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG 
knows about
     pytest.importorskip("airflow.providers.amazon", reason="'amazon' provider 
not installed")
@@ -2279,6 +2279,9 @@ def test_remote_logging_conn(remote_logging, remote_conn, 
expected_env, monkeypa
             },
         )
 
+    mock_masker = mocker.Mock()
+    mocker.patch("airflow.sdk.execution_time.secrets_masker._secrets_masker", 
return_value=mock_masker)
+
     with conf_vars(
         {
             ("logging", "remote_logging"): str(remote_logging),
@@ -2295,3 +2298,29 @@ def test_remote_logging_conn(remote_logging, 
remote_conn, expected_env, monkeypa
                 assert new_keys == {expected_env}
             else:
                 assert not new_keys
+
+        if remote_logging and expected_env:
+            connection_available = {"available": False, "conn_uri": None}
+
+            def mock_upload_to_remote(process_log, ti):
+                connection_available["available"] = expected_env in os.environ
+                connection_available["conn_uri"] = os.environ.get(expected_env)
+
+            mocker.patch("airflow.sdk.log.upload_to_remote", 
side_effect=mock_upload_to_remote)
+
+            activity_subprocess = ActivitySubprocess(
+                process_log=mocker.MagicMock(),
+                id=TI_ID,
+                pid=12345,
+                stdin=mocker.MagicMock(),
+                client=client,
+                process=mocker.MagicMock(),
+            )
+            activity_subprocess.ti = mocker.MagicMock()
+
+            activity_subprocess._upload_logs()
+
+            assert connection_available["available"], (
+                f"Connection {expected_env} was not available during 
upload_to_remote call"
+            )
+            assert connection_available["conn_uri"] is not None, "Connection 
URI was None during upload"

Reply via email to