This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 60e23cc6241783d926644831c1a15fc45f1c761e Author: Dev-iL <[email protected]> AuthorDate: Fri Mar 6 09:38:51 2026 +0200 fix: Unhandled Exception in remote logging if connection doesn't exist(#59801) (#62979) Cherry-picked from 3428dc9 with conflict resolution: - context.py: Added `import inspect` (skip `import functools` as `from functools import cache` already exists) - supervisor.py: Adopted early-return pattern and explicit `del` for GC, kept simpler env var handling (no `_AIRFLOW_PROCESS_CONTEXT` which doesn't exist in v3-1-test) - test_supervisor.py: Replaced `@pytest.mark.xfail` workaround with proper `use_real_secrets_backends` fixture Co-authored-by: Claude Opus 4.6 <[email protected]> --- task-sdk/src/airflow/sdk/api/client.py | 3 +- task-sdk/src/airflow/sdk/execution_time/context.py | 8 +++-- .../src/airflow/sdk/execution_time/supervisor.py | 37 ++++++++++++---------- .../tests/task_sdk/execution_time/test_secrets.py | 22 ++++++++++++- .../task_sdk/execution_time/test_supervisor.py | 31 ++++++++++++++++-- 5 files changed, 79 insertions(+), 22 deletions(-) diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 2ef50a72ff7..8d9e48d6129 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -848,7 +848,8 @@ class Client(httpx.Client): kwargs.setdefault("base_url", "dry-run://server") else: kwargs["base_url"] = base_url - kwargs["verify"] = self._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH) + # Call via the class to avoid binding lru_cache wires to this instance. + kwargs["verify"] = type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH) # Set timeout if not explicitly provided kwargs.setdefault("timeout", API_TIMEOUT) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 379a13a930e..512468bb486 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -18,6 +18,7 @@ from __future__ import annotations import collections import contextlib +import inspect from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence from functools import cache from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload @@ -197,8 +198,11 @@ async def _async_get_connection(conn_id: str) -> Connection: for secrets_backend in backends: try: # Use async method if available, otherwise wrap sync method - if hasattr(secrets_backend, "aget_connection"): - conn = await secrets_backend.aget_connection(conn_id) # type: ignore[assignment] + # getattr avoids triggering AsyncMock coroutine creation under Python 3.13 + async_method = getattr(secrets_backend, "aget_connection", None) + if async_method is not None: + maybe_awaitable = async_method(conn_id) + conn = await maybe_awaitable if inspect.isawaitable(maybe_awaitable) else maybe_awaitable else: conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment] diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 206bf3b6878..0b02d215f27 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -36,14 +36,7 @@ from contextlib import contextmanager, suppress from datetime import datetime, timezone from http import HTTPStatus from socket import socket, socketpair -from typing import ( - TYPE_CHECKING, - BinaryIO, - ClassVar, - NoReturn, - TextIO, - cast, -) +from typing import TYPE_CHECKING, BinaryIO, ClassVar, NoReturn, TextIO, cast from urllib.parse import urlparse from uuid import UUID @@ -898,17 +891,29 @@ def _remote_logging_conn(client: Client): # Fetch connection details on-demand without caching the entire API client instance conn = _fetch_remote_logging_conn(conn_id, client) - if conn: - key = f"AIRFLOW_CONN_{conn_id.upper()}" - old = os.getenv(key) - os.environ[key] = conn.get_uri() + if not conn: try: yield finally: - if old is None: - del os.environ[key] - else: - os.environ[key] = old + # Ensure we don't leak the caller's client when no connection was fetched. + del conn + del client + return + + key = f"AIRFLOW_CONN_{conn_id.upper()}" + old = os.getenv(key) + os.environ[key] = conn.get_uri() + try: + yield + finally: + if old is None: + del os.environ[key] + else: + os.environ[key] = old + + # Explicitly drop local references so the caller's client can be garbage collected. + del conn + del client @attrs.define(kw_only=True) diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py b/task-sdk/tests/task_sdk/execution_time/test_secrets.py index b19ab3b1003..8f9745b0ffe 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py +++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py @@ -144,9 +144,29 @@ class TestExecutionAPISecretsBackend: # Mock the greenback and asyncio modules that are imported inside the exception handler mocker.patch("greenback.has_portal", return_value=True) - mock_greenback_await = mocker.patch("greenback.await_", return_value=expected_conn) mocker.patch("asyncio.current_task") + # Mock greenback.await_ to actually await the coroutine it receives. + # This prevents Python 3.13 RuntimeWarning about unawaited coroutines. + import asyncio + + def greenback_await_side_effect(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + mock_greenback_await = mocker.patch("greenback.await_", side_effect=greenback_await_side_effect) + + # Mock aget_connection to return the expected connection directly. + # We need to mock this because the real aget_connection would try to + # use SUPERVISOR_COMMS.asend which is not set up for this test. + async def mock_aget_connection(self, conn_id): + return expected_conn + + mocker.patch.object(ExecutionAPISecretsBackend, "aget_connection", mock_aget_connection) + backend = ExecutionAPISecretsBackend() conn = backend.get_connection("databricks_default") diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index ba3c5166931..c7b54666a1e 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -225,8 +225,35 @@ class TestWatchedSubprocess: def disable_log_upload(self, spy_agency): spy_agency.spy_on(ActivitySubprocess._upload_logs, call_original=False) - # TODO: Investigate and fix it after 3.1.0 - @pytest.mark.xfail(reason="Fails on Py 3.12 with multi-threading error only in tests.") + @pytest.fixture(autouse=True) + def use_real_secrets_backends(self, monkeypatch): + """ + Ensure that real secrets backend instances are used instead of mocks. + + This prevents Python 3.13 RuntimeWarning when hasattr checks async methods + on mocked backends. The warning occurs because hasattr on AsyncMock creates + unawaited coroutines. + + This fixture ensures test isolation when running in parallel with pytest-xdist, + regardless of what other tests patch. + """ + import importlib + + import airflow.sdk.execution_time.secrets.execution_api as execution_api_module + from airflow.secrets.environment_variables import EnvironmentVariablesBackend + + fresh_execution_backend = importlib.reload(execution_api_module).ExecutionAPISecretsBackend + + # Ensure downstream imports see the restored class instead of any AsyncMock left by other tests + import airflow.sdk.execution_time.secrets as secrets_package + + monkeypatch.setattr(secrets_package, "ExecutionAPISecretsBackend", fresh_execution_backend) + + monkeypatch.setattr( + "airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded", + lambda: [EnvironmentVariablesBackend(), fresh_execution_backend()], + ) + def test_reading_from_pipes(self, captured_logs, time_machine, client_with_ti_start): def subprocess_main(): # This is run in the subprocess!
