This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3428dc98fc1 Fix flaky test_reading_from_pipes on Python 3.13 with 
pytest-xdist (#59801)
3428dc98fc1 is described below

commit 3428dc98fc1958049f301ee6dea635d4833cd1e1
Author: Dev-iL <[email protected]>
AuthorDate: Thu Dec 25 15:05:55 2025 +0200

    Fix flaky test_reading_from_pipes on Python 3.13 with pytest-xdist (#59801)
    
    The test was failing intermittently because forked subprocesses could 
inherit mocked secrets backends from parallel tests. When hasattr() checks 
async methods on AsyncMock (Python 3.13), it creates unawaited coroutines, 
triggering RuntimeWarning.
---
 task-sdk/src/airflow/sdk/api/client.py             |  3 +-
 task-sdk/src/airflow/sdk/execution_time/context.py |  8 +++-
 .../src/airflow/sdk/execution_time/supervisor.py   | 55 ++++++++++++----------
 .../tests/task_sdk/execution_time/test_secrets.py  | 22 ++++++++-
 .../task_sdk/execution_time/test_supervisor.py     | 13 ++++-
 5 files changed, 70 insertions(+), 31 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index b520d0ab226..2a38ef2fad7 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -910,7 +910,8 @@ class Client(httpx.Client):
             kwargs.setdefault("base_url", "dry-run://server")
         else:
             kwargs["base_url"] = base_url
-            kwargs["verify"] = self._get_ssl_context_cached(certifi.where(), 
API_SSL_CERT_PATH)
+            # Call via the class to avoid binding lru_cache wires to this 
instance.
+            kwargs["verify"] = 
type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH)
 
         # Set timeout if not explicitly provided
         kwargs.setdefault("timeout", API_TIMEOUT)
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 8ed06333ce8..db5a75e10c1 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import collections
 import contextlib
 import functools
+import inspect
 from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
 from datetime import datetime
 from functools import cache
@@ -199,8 +200,11 @@ async def _async_get_connection(conn_id: str) -> 
Connection:
     for secrets_backend in backends:
         try:
             # Use async method if available, otherwise wrap sync method
-            if hasattr(secrets_backend, "aget_connection"):
-                conn = await secrets_backend.aget_connection(conn_id)  # type: 
ignore[assignment]
+            # getattr avoids triggering AsyncMock coroutine creation under 
Python 3.13
+            async_method = getattr(secrets_backend, "aget_connection", None)
+            if async_method is not None:
+                maybe_awaitable = async_method(conn_id)
+                conn = await maybe_awaitable if 
inspect.isawaitable(maybe_awaitable) else maybe_awaitable
             else:
                 conn = await 
sync_to_async(secrets_backend.get_connection)(conn_id)  # type: 
ignore[assignment]
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 45432ac37e5..b87131aa733 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -36,14 +36,7 @@ from contextlib import contextmanager, suppress
 from datetime import datetime, timezone
 from http import HTTPStatus
 from socket import socket, socketpair
-from typing import (
-    TYPE_CHECKING,
-    BinaryIO,
-    ClassVar,
-    NoReturn,
-    TextIO,
-    cast,
-)
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, NoReturn, TextIO, cast
 from urllib.parse import urlparse
 from uuid import UUID
 
@@ -911,27 +904,39 @@ def _remote_logging_conn(client: Client):
     # Fetch connection details on-demand without caching the entire API client 
instance
     conn = _fetch_remote_logging_conn(conn_id, client)
 
-    if conn:
-        key = f"AIRFLOW_CONN_{conn_id.upper()}"
-        old_conn = os.getenv(key)
-        old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
-
-        os.environ[key] = conn.get_uri()
-        # Set process context to "client" so that Connection deserialization 
uses SDK Connection class
-        # which has from_uri() method, instead of core Connection class
-        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+    if not conn:
         try:
             yield
         finally:
-            if old_conn is None:
-                del os.environ[key]
-            else:
-                os.environ[key] = old_conn
+            # Ensure we don't leak the caller's client when no connection was 
fetched.
+            del conn
+            del client
+        return
 
-            if old_context is None:
-                del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
-            else:
-                os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
+    key = f"AIRFLOW_CONN_{conn_id.upper()}"
+    old_conn = os.getenv(key)
+    old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
+
+    os.environ[key] = conn.get_uri()
+    # Set process context to "client" so that Connection deserialization uses 
SDK Connection class
+    # which has from_uri() method, instead of core Connection class
+    os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+    try:
+        yield
+    finally:
+        if old_conn is None:
+            del os.environ[key]
+        else:
+            os.environ[key] = old_conn
+
+        if old_context is None:
+            del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
+        else:
+            os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
+
+        # Explicitly drop local references so the caller's client can be 
garbage collected.
+        del conn
+        del client
 
 
 @attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py 
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
index b19ab3b1003..8f9745b0ffe 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -144,9 +144,29 @@ class TestExecutionAPISecretsBackend:
 
         # Mock the greenback and asyncio modules that are imported inside the 
exception handler
         mocker.patch("greenback.has_portal", return_value=True)
-        mock_greenback_await = mocker.patch("greenback.await_", 
return_value=expected_conn)
         mocker.patch("asyncio.current_task")
 
+        # Mock greenback.await_ to actually await the coroutine it receives.
+        # This prevents Python 3.13 RuntimeWarning about unawaited coroutines.
+        import asyncio
+
+        def greenback_await_side_effect(coro):
+            loop = asyncio.new_event_loop()
+            try:
+                return loop.run_until_complete(coro)
+            finally:
+                loop.close()
+
+        mock_greenback_await = mocker.patch("greenback.await_", 
side_effect=greenback_await_side_effect)
+
+        # Mock aget_connection to return the expected connection directly.
+        # We need to mock this because the real aget_connection would try to
+        # use SUPERVISOR_COMMS.asend which is not set up for this test.
+        async def mock_aget_connection(self, conn_id):
+            return expected_conn
+
+        mocker.patch.object(ExecutionAPISecretsBackend, "aget_connection", 
mock_aget_connection)
+
         backend = ExecutionAPISecretsBackend()
         conn = backend.get_connection("databricks_default")
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 9c0eb2a79f0..63fbdfe7138 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -248,12 +248,21 @@ class TestWatchedSubprocess:
         This fixture ensures test isolation when running in parallel with 
pytest-xdist,
         regardless of what other tests patch.
         """
-        from airflow.sdk.execution_time.secrets import 
ExecutionAPISecretsBackend
+        import importlib
+
+        import airflow.sdk.execution_time.secrets.execution_api as 
execution_api_module
         from airflow.secrets.environment_variables import 
EnvironmentVariablesBackend
 
+        fresh_execution_backend = 
importlib.reload(execution_api_module).ExecutionAPISecretsBackend
+
+        # Ensure downstream imports see the restored class instead of any 
AsyncMock left by other tests
+        import airflow.sdk.execution_time.secrets as secrets_package
+
+        monkeypatch.setattr(secrets_package, "ExecutionAPISecretsBackend", 
fresh_execution_backend)
+
         monkeypatch.setattr(
             
"airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded",
-            lambda: [EnvironmentVariablesBackend(), 
ExecutionAPISecretsBackend()],
+            lambda: [EnvironmentVariablesBackend(), fresh_execution_backend()],
         )
 
     def test_reading_from_pipes(self, captured_logs, time_machine, 
client_with_ti_start):

Reply via email to