This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3428dc98fc1 Fix flaky test_reading_from_pipes on Python 3.13 with
pytest-xdist (#59801)
3428dc98fc1 is described below
commit 3428dc98fc1958049f301ee6dea635d4833cd1e1
Author: Dev-iL <[email protected]>
AuthorDate: Thu Dec 25 15:05:55 2025 +0200
Fix flaky test_reading_from_pipes on Python 3.13 with pytest-xdist (#59801)
The test was failing intermittently because forked subprocesses could
inherit mocked secrets backends from parallel tests. When hasattr() checks
async methods on AsyncMock (Python 3.13), it creates unawaited coroutines,
triggering RuntimeWarning.
---
task-sdk/src/airflow/sdk/api/client.py | 3 +-
task-sdk/src/airflow/sdk/execution_time/context.py | 8 +++-
.../src/airflow/sdk/execution_time/supervisor.py | 55 ++++++++++++----------
.../tests/task_sdk/execution_time/test_secrets.py | 22 ++++++++-
.../task_sdk/execution_time/test_supervisor.py | 13 ++++-
5 files changed, 70 insertions(+), 31 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index b520d0ab226..2a38ef2fad7 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -910,7 +910,8 @@ class Client(httpx.Client):
kwargs.setdefault("base_url", "dry-run://server")
else:
kwargs["base_url"] = base_url
- kwargs["verify"] = self._get_ssl_context_cached(certifi.where(),
API_SSL_CERT_PATH)
+ # Call via the class to avoid binding lru_cache wires to this
instance.
+ kwargs["verify"] =
type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH)
# Set timeout if not explicitly provided
kwargs.setdefault("timeout", API_TIMEOUT)
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 8ed06333ce8..db5a75e10c1 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import collections
import contextlib
import functools
+import inspect
from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
from datetime import datetime
from functools import cache
@@ -199,8 +200,11 @@ async def _async_get_connection(conn_id: str) ->
Connection:
for secrets_backend in backends:
try:
# Use async method if available, otherwise wrap sync method
- if hasattr(secrets_backend, "aget_connection"):
- conn = await secrets_backend.aget_connection(conn_id) # type:
ignore[assignment]
+ # getattr avoids triggering AsyncMock coroutine creation under
Python 3.13
+ async_method = getattr(secrets_backend, "aget_connection", None)
+ if async_method is not None:
+ maybe_awaitable = async_method(conn_id)
+ conn = await maybe_awaitable if
inspect.isawaitable(maybe_awaitable) else maybe_awaitable
else:
conn = await
sync_to_async(secrets_backend.get_connection)(conn_id) # type:
ignore[assignment]
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 45432ac37e5..b87131aa733 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -36,14 +36,7 @@ from contextlib import contextmanager, suppress
from datetime import datetime, timezone
from http import HTTPStatus
from socket import socket, socketpair
-from typing import (
- TYPE_CHECKING,
- BinaryIO,
- ClassVar,
- NoReturn,
- TextIO,
- cast,
-)
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, NoReturn, TextIO, cast
from urllib.parse import urlparse
from uuid import UUID
@@ -911,27 +904,39 @@ def _remote_logging_conn(client: Client):
# Fetch connection details on-demand without caching the entire API client
instance
conn = _fetch_remote_logging_conn(conn_id, client)
- if conn:
- key = f"AIRFLOW_CONN_{conn_id.upper()}"
- old_conn = os.getenv(key)
- old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
-
- os.environ[key] = conn.get_uri()
- # Set process context to "client" so that Connection deserialization
uses SDK Connection class
- # which has from_uri() method, instead of core Connection class
- os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+ if not conn:
try:
yield
finally:
- if old_conn is None:
- del os.environ[key]
- else:
- os.environ[key] = old_conn
+ # Ensure we don't leak the caller's client when no connection was
fetched.
+ del conn
+ del client
+ return
- if old_context is None:
- del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
- else:
- os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
+ key = f"AIRFLOW_CONN_{conn_id.upper()}"
+ old_conn = os.getenv(key)
+ old_context = os.getenv("_AIRFLOW_PROCESS_CONTEXT")
+
+ os.environ[key] = conn.get_uri()
+ # Set process context to "client" so that Connection deserialization uses
SDK Connection class
+ # which has from_uri() method, instead of core Connection class
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+ try:
+ yield
+ finally:
+ if old_conn is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = old_conn
+
+ if old_context is None:
+ del os.environ["_AIRFLOW_PROCESS_CONTEXT"]
+ else:
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = old_context
+
+ # Explicitly drop local references so the caller's client can be
garbage collected.
+ del conn
+ del client
@attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
index b19ab3b1003..8f9745b0ffe 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -144,9 +144,29 @@ class TestExecutionAPISecretsBackend:
# Mock the greenback and asyncio modules that are imported inside the
exception handler
mocker.patch("greenback.has_portal", return_value=True)
- mock_greenback_await = mocker.patch("greenback.await_",
return_value=expected_conn)
mocker.patch("asyncio.current_task")
+ # Mock greenback.await_ to actually await the coroutine it receives.
+ # This prevents Python 3.13 RuntimeWarning about unawaited coroutines.
+ import asyncio
+
+ def greenback_await_side_effect(coro):
+ loop = asyncio.new_event_loop()
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+
+ mock_greenback_await = mocker.patch("greenback.await_",
side_effect=greenback_await_side_effect)
+
+ # Mock aget_connection to return the expected connection directly.
+ # We need to mock this because the real aget_connection would try to
+ # use SUPERVISOR_COMMS.asend which is not set up for this test.
+ async def mock_aget_connection(self, conn_id):
+ return expected_conn
+
+ mocker.patch.object(ExecutionAPISecretsBackend, "aget_connection",
mock_aget_connection)
+
backend = ExecutionAPISecretsBackend()
conn = backend.get_connection("databricks_default")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 9c0eb2a79f0..63fbdfe7138 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -248,12 +248,21 @@ class TestWatchedSubprocess:
This fixture ensures test isolation when running in parallel with
pytest-xdist,
regardless of what other tests patch.
"""
- from airflow.sdk.execution_time.secrets import
ExecutionAPISecretsBackend
+ import importlib
+
+ import airflow.sdk.execution_time.secrets.execution_api as
execution_api_module
from airflow.secrets.environment_variables import
EnvironmentVariablesBackend
+ fresh_execution_backend =
importlib.reload(execution_api_module).ExecutionAPISecretsBackend
+
+ # Ensure downstream imports see the restored class instead of any
AsyncMock left by other tests
+ import airflow.sdk.execution_time.secrets as secrets_package
+
+ monkeypatch.setattr(secrets_package, "ExecutionAPISecretsBackend",
fresh_execution_backend)
+
monkeypatch.setattr(
"airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded",
- lambda: [EnvironmentVariablesBackend(),
ExecutionAPISecretsBackend()],
+ lambda: [EnvironmentVariablesBackend(), fresh_execution_backend()],
)
def test_reading_from_pipes(self, captured_logs, time_machine,
client_with_ti_start):