This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 21123cecdad Reuse a session-scoped FastAPI app across api_fastapi
tests (#68261)
21123cecdad is described below
commit 21123cecdadc3e6b51d4f638d26e6f8f9ce97331
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jun 10 13:56:30 2026 +0100
Reuse a session-scoped FastAPI app across api_fastapi tests (#68261)
The test_client / unauthenticated_test_client / unauthorized_test_client
fixtures
rebuilt the whole FastAPI app via create_app() on every test -- two mounted
apps,
every route and the OpenAPI schema, ~0.55s of per-test setup that dominated
the
api_fastapi suite. The app structure is identical for the default client,
so build
it once per session and reuse it.
Because the app is now shared, per-test mutations are reset on every client
fixture
entry: restore the auth-manager singleton (the auth endpoint tests swap it
for a
mock without restoring) and clear any leftover dependency overrides.
* Add fresh_test_client for the DagBag-singleton test
test_dagbag_used_as_singleton_in_dependency patches DBDagBag and asserts the
constructor runs exactly once during app creation. With the session-shared
app the
app is built before the per-test patch, so the patched factory was never
hit. Give
that test a freshly built app (fresh_test_client) so it observes app
construction.
* Clear the shared DagBag cache between api_fastapi tests
The session-shared app keeps app.state.dag_bag (an LRU/TTL cache keyed by
dag_version_id) warm across the whole session. An entry warmed by one test
would
let a later test skip the serialized-Dag DB read, silently breaking
query-count
assertions (e.g. the grid ti_summaries stream tests) depending on execution
order.
Empty the cache on each client fixture entry, alongside the auth-manager and
dependency-override resets.
* Snapshot/restore shared app state instead of enumerated resets
Per review: replace _reset_shared_app with an _isolated_shared_app yield
fixture
that snapshots state + dependency_overrides on the root app and every
mounted
sub-app (recursively) on entry and restores them on teardown, so isolation
is
resilient to future mutations rather than enumerating the known ones.
app.state.dag_bag is the exception: its cache is mutated in place (not a
state
rebinding), so a snapshot can't empty it -- it is still cleared explicitly.
---
.../tests/unit/api_fastapi/common/test_dagbag.py | 6 +-
airflow-core/tests/unit/api_fastapi/conftest.py | 139 ++++++++++++++++-----
2 files changed, 108 insertions(+), 37 deletions(-)
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
index cff9576beea..48c6f706ba7 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_dagbag.py
@@ -59,7 +59,7 @@ class TestDagBagSingleton:
purge_cached_app()
yield
- def test_dagbag_used_as_singleton_in_dependency(self, session, dag_maker,
test_client):
+ def test_dagbag_used_as_singleton_in_dependency(self, session, dag_maker,
fresh_test_client):
"""
Ensure DagBag is created only once and reused across multiple API
requests.
@@ -76,10 +76,10 @@ class TestDagBagSingleton:
BaseOperator(task_id="test_task")
session.commit()
- resp1 = test_client.get(f"/api/v2/dags/{dag_id}")
+ resp1 = fresh_test_client.get(f"/api/v2/dags/{dag_id}")
assert resp1.status_code == 200
- resp2 = test_client.get(f"/api/v2/dags/{dag_id}")
+ resp2 = fresh_test_client.get(f"/api/v2/dags/{dag_id}")
assert resp2.status_code == 200
assert self.dagbag_call_counter["count"] == 1
diff --git a/airflow-core/tests/unit/api_fastapi/conftest.py
b/airflow-core/tests/unit/api_fastapi/conftest.py
index 03c43a178a0..f275d48f967 100644
--- a/airflow-core/tests/unit/api_fastapi/conftest.py
+++ b/airflow-core/tests/unit/api_fastapi/conftest.py
@@ -23,6 +23,8 @@ from unittest import mock
import pytest
import time_machine
+from fastapi import FastAPI
+from fastapi.routing import Mount
from fastapi.testclient import TestClient
from airflow.api_fastapi.app import create_app
@@ -54,8 +56,16 @@ def get_api_path(request):
return API_PATHS.get(subdirectory_name, "/")
[email protected]
-def test_client(request):
[email protected](scope="session")
+def _shared_api_app():
+ """
+ Build the FastAPI app once per test session.
+
+ ``create_app()`` rebuilds two full FastAPI apps (core + execution),
registers every route and
+ builds the OpenAPI schema -- ~0.5s. The default ``test_client`` always
uses the same config
+ (SimpleAuthManager), so the app structure is identical across tests; only
per-test DB state and
+ request data differ. Building it once and reusing it removes that per-test
rebuild cost.
+ """
with conf_vars(
{
(
@@ -64,35 +74,85 @@ def test_client(request):
):
"airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager",
}
):
- app = create_app()
- auth_manager: SimpleAuthManager = app.state.auth_manager
- # set time_very_before to 2014-01-01 00:00:00 and time_very_after to
tomorrow
- # to make the JWT token always valid for all test cases with
time_machine
- time_very_before = datetime.datetime(2014, 1, 1, 0, 0, 0)
- time_after = datetime.datetime.now() + datetime.timedelta(days=1)
- with time_machine.travel(time_very_before, tick=False):
- token = auth_manager._get_token_signer(
- expiration_time_in_seconds=(time_after -
time_very_before).total_seconds()
- ).generate(
- auth_manager.serialize_user(
- SimpleAuthManagerUser(username="test", role="admin",
teams=["team1"])
- ),
- )
- with
mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False):
- yield TestClient(
- app,
- headers={"Authorization": f"Bearer {token}"},
- base_url=f"{BASE_URL}{get_api_path(request)}",
- )
+ return create_app()
+
+
+def _mounted_fastapi_apps(app: FastAPI) -> list[FastAPI]:
+ """Return ``app`` and every FastAPI app mounted under it, recursively
(``/execution``, ``/auth``, ...)."""
+ apps = [app]
+ for route in app.routes:
+ if isinstance(route, Mount) and isinstance(route.app, FastAPI):
+ apps.extend(_mounted_fastapi_apps(route.app))
+ return apps
+
+
[email protected]
+def _isolated_shared_app(_shared_api_app):
+ """
+ Yield the session-shared app with its mutable state snapshotted and
restored around each test.
+
+ The app is built once per session, so a test that rebinds something on
``app.state`` (the auth
+ endpoint tests swap ``auth_manager`` for a mock) or installs a
``dependency_overrides`` entry
+ (extra-links/tasks/logs install a ``dag_bag_from_app`` override) would
leak into later tests.
+ Snapshotting ``state`` and ``dependency_overrides`` on the root app and
every mounted sub-app on
+ entry and restoring them on exit keeps the reset resilient to future
mutations without having to
+ enumerate them.
+
+ ``app.state.dag_bag`` is the exception: tests mutate the DagBag object *in
place* (its cache of
+ deserialized Dags fills as requests resolve them), which a state snapshot
can't undo, so its
+ cache is cleared explicitly. A leaked warm entry would otherwise let a
later test skip a
+ serialized-Dag DB read and break query-count assertions (e.g. the grid
``ti_summaries`` stream
+ tests) depending on execution order.
+ """
+ apps = _mounted_fastapi_apps(_shared_api_app)
+ # ``app.state._state`` is Starlette's backing dict for ``State`` -- the
only way to enumerate it.
+ saved = [(app, dict(app.state._state), dict(app.dependency_overrides)) for
app in apps]
+ _shared_api_app.state.dag_bag.clear_cache()
+ try:
+ yield _shared_api_app
+ finally:
+ for app, state, overrides in saved:
+ app.state._state.clear()
+ app.state._state.update(state)
+ app.dependency_overrides.clear()
+ app.dependency_overrides.update(overrides)
+
+
+def _authed_test_client(app: FastAPI, request):
+ auth_manager: SimpleAuthManager = app.state.auth_manager
+ # set time_very_before to 2014-01-01 00:00:00 and time_very_after to
tomorrow
+ # to make the JWT token always valid for all test cases with time_machine
+ time_very_before = datetime.datetime(2014, 1, 1, 0, 0, 0)
+ time_after = datetime.datetime.now() + datetime.timedelta(days=1)
+ with time_machine.travel(time_very_before, tick=False):
+ token = auth_manager._get_token_signer(
+ expiration_time_in_seconds=(time_after -
time_very_before).total_seconds()
+ ).generate(
+ auth_manager.serialize_user(
+ SimpleAuthManagerUser(username="test", role="admin",
teams=["team1"])
+ ),
+ )
+ with mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False):
+ yield TestClient(
+ app,
+ headers={"Authorization": f"Bearer {token}"},
+ base_url=f"{BASE_URL}{get_api_path(request)}",
+ )
@pytest.fixture
-def unauthenticated_test_client(request):
- return TestClient(create_app(),
base_url=f"{BASE_URL}{get_api_path(request)}")
+def test_client(request, _isolated_shared_app):
+ yield from _authed_test_client(_isolated_shared_app, request)
@pytest.fixture
-def unauthorized_test_client(request):
+def fresh_test_client(request):
+ """
+ Like ``test_client`` but backed by a freshly built app instead of the
session-shared one.
+
+ For the rare tests that patch app construction (e.g. counting ``DBDagBag``
instantiation) and
+ so need the app built *after* their patch is applied.
+ """
with conf_vars(
{
(
@@ -102,16 +162,27 @@ def unauthorized_test_client(request):
}
):
app = create_app()
- auth_manager: SimpleAuthManager = app.state.auth_manager
- token = auth_manager._get_token_signer().generate(
-
auth_manager.serialize_user(SimpleAuthManagerUser(username="dummy", role=None))
+ yield from _authed_test_client(app, request)
+
+
[email protected]
+def unauthenticated_test_client(request, _isolated_shared_app):
+ return TestClient(_isolated_shared_app,
base_url=f"{BASE_URL}{get_api_path(request)}")
+
+
[email protected]
+def unauthorized_test_client(request, _isolated_shared_app):
+ app = _isolated_shared_app
+ auth_manager: SimpleAuthManager = app.state.auth_manager
+ token = auth_manager._get_token_signer().generate(
+ auth_manager.serialize_user(SimpleAuthManagerUser(username="dummy",
role=None))
+ )
+ with mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False):
+ yield TestClient(
+ app,
+ headers={"Authorization": f"Bearer {token}"},
+ base_url=f"{BASE_URL}{get_api_path(request)}",
)
- with
mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked",
return_value=False):
- yield TestClient(
- app,
- headers={"Authorization": f"Bearer {token}"},
- base_url=f"{BASE_URL}{get_api_path(request)}",
- )
@pytest.fixture