This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 5605d32a986 [v3-1-test] Fix race condition in auth manager 
initialization (#62214) (#62326)
5605d32a986 is described below

commit 5605d32a9865f1d13076c493b7f26a1bef27df0a
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Feb 24 15:12:24 2026 +0800

    [v3-1-test] Fix race condition in auth manager initialization (#62214) 
(#62326)
    
    * [v3-1-test] Fix race condition in auth manager initialization (#62214)
    
    * Fix race condition in auth manager initialization
    
    Make create_auth_manager() thread-safe using double-checked locking
    to prevent concurrent requests from creating multiple auth manager
    instances. This fixes intermittent 500 errors on /auth/token when
    multiple requests arrive simultaneously.
    
    Closes: #61108
    
    * Handle auth manager class change in singleton cache
    
    The singleton check now also verifies the cached instance matches
    the currently configured auth manager class. This prevents stale
    instances when the config changes (e.g. switching between
    SimpleAuthManager and FabAuthManager during db upgrade).
    
    * Avoid calling get_auth_manager_cls on every create_auth_manager call
    
    Move get_auth_manager_cls() inside the lock so the fast path is just a None 
check. Add purge_cached_app() in test_upgradedb to ensure clean state between 
parametrized cases with different auth manager configs.
    
    * Reset auth manager singleton in get_application_builder
    
    Since get_application_builder creates a fresh Flask app each time, the 
cached auth manager singleton from a previous app context must be cleared to 
avoid stale state (e.g. KeyError on AUTH_USER_REGISTRATION).
    
    * Clear auth manager singleton in test fixtures using create_app
    
    Test fixtures that call application.create_app() or 
get_application_builder()
    can receive a stale auth manager singleton from a previous test, causing
    AirflowSecurityManagerV2 to be used instead of 
FabAirflowSecurityManagerOverride.
    
    Add purge_cached_app() calls to test fixtures across fab, google, and 
keycloak
    providers to ensure each test gets a fresh auth manager instance.
    (cherry picked from commit 5c9171af1a56b5b60bb121537159ceb9c4de9f73)
    
    Co-authored-by: Young-Ki Kim <[email protected]>
    
    * Fix CI error
    
    ---------
    
    Co-authored-by: Young-Ki Kim <[email protected]>
---
 airflow-core/src/airflow/api_fastapi/app.py        | 34 +++++++++++----------
 airflow-core/tests/unit/api_fastapi/test_app.py    | 35 ++++++++++++++++++++++
 airflow-core/tests/unit/utils/test_db.py           |  3 ++
 .../fab/auth_manager/cli_commands/utils.py         |  3 ++
 .../unit/fab/auth_manager/api_fastapi/conftest.py  |  2 ++
 .../fab/tests/unit/fab/auth_manager/conftest.py    |  5 ++++
 .../unit/fab/auth_manager/test_fab_auth_manager.py |  6 ++++
 .../tests/unit/fab/auth_manager/test_security.py   |  3 ++
 .../fab/auth_manager/views/test_permissions.py     |  3 ++
 .../unit/fab/auth_manager/views/test_roles_list.py |  3 ++
 .../tests/unit/fab/auth_manager/views/test_user.py |  3 ++
 .../unit/fab/auth_manager/views/test_user_edit.py  |  3 ++
 .../unit/fab/auth_manager/views/test_user_stats.py |  3 ++
 providers/fab/tests/unit/fab/www/test_auth.py      |  3 ++
 .../fab/www/views/test_views_custom_user_views.py  | 28 +++++++++++++++++
 .../common/auth_backend/test_google_openid.py      |  7 +++++
 .../unit/keycloak/auth_manager/routes/conftest.py  |  3 +-
 17 files changed, 131 insertions(+), 16 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/app.py 
b/airflow-core/src/airflow/api_fastapi/app.py
index 7c05295807e..02b27a9b5f1 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -17,7 +17,9 @@
 from __future__ import annotations
 
 import logging
+import threading
 from contextlib import AsyncExitStack, asynccontextmanager
+from functools import cache
 from typing import TYPE_CHECKING, cast
 from urllib.parse import urlsplit
 
@@ -54,8 +56,10 @@ RESERVED_URL_PREFIXES = ["/api/v2", "/ui", "/execution"]
 
 log = logging.getLogger(__name__)
 
-app: FastAPI | None = None
-auth_manager: BaseAuthManager | None = None
+
+class _AuthManagerState:
+    instance: BaseAuthManager | None = None
+    _lock = threading.Lock()
 
 
 @asynccontextmanager
@@ -107,19 +111,16 @@ def create_app(apps: str = "all") -> FastAPI:
     return app
 
 
+@cache
 def cached_app(config=None, testing=False, apps="all") -> FastAPI:
     """Return cached instance of Airflow API app."""
-    global app
-    if not app:
-        app = create_app(apps=apps)
-    return app
+    return create_app(apps=apps)
 
 
 def purge_cached_app() -> None:
     """Remove the cached version of the app and auth_manager in global 
state."""
-    global app, auth_manager
-    app = None
-    auth_manager = None
+    cached_app.cache_clear()
+    _AuthManagerState.instance = None
 
 
 def get_auth_manager_cls() -> type[BaseAuthManager]:
@@ -140,10 +141,13 @@ def get_auth_manager_cls() -> type[BaseAuthManager]:
 
 def create_auth_manager() -> BaseAuthManager:
     """Create the auth manager."""
-    global auth_manager
-    auth_manager_cls = get_auth_manager_cls()
-    auth_manager = auth_manager_cls()
-    return auth_manager
+    if _AuthManagerState.instance is not None:
+        return _AuthManagerState.instance
+    with _AuthManagerState._lock:
+        if _AuthManagerState.instance is None:
+            auth_manager_cls = get_auth_manager_cls()
+            _AuthManagerState.instance = auth_manager_cls()
+    return _AuthManagerState.instance
 
 
 def init_auth_manager(app: FastAPI | None = None) -> BaseAuthManager:
@@ -161,12 +165,12 @@ def init_auth_manager(app: FastAPI | None = None) -> 
BaseAuthManager:
 
 def get_auth_manager() -> BaseAuthManager:
     """Return the auth manager, provided it's been initialized before."""
-    if auth_manager is None:
+    if _AuthManagerState.instance is None:
         raise RuntimeError(
             "Auth Manager has not been initialized yet. "
             "The `init_auth_manager` method needs to be called first."
         )
-    return auth_manager
+    return _AuthManagerState.instance
 
 
 def init_plugins(app: FastAPI) -> None:
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py 
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 448d527ab6b..3ad7437c373 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import threading
 from unittest import mock
 
 import pytest
@@ -118,3 +119,37 @@ def test_plugin_with_invalid_url_prefix(caplog, 
fastapi_apps, expected_message,
 
     assert any(expected_message in rec.message for rec in caplog.records)
     assert not any(r.path == invalid_path for r in app.routes)
+
+
+def test_create_auth_manager_thread_safety():
+    """Concurrent calls to create_auth_manager must return the same singleton 
instance."""
+    call_count = 0
+    singleton = None
+
+    class FakeAuthManager:
+        def __init__(self):
+            nonlocal call_count, singleton
+            call_count += 1
+            singleton = self
+
+    app_module.purge_cached_app()
+
+    results = []
+    barrier = threading.Barrier(10)
+
+    def call_create_auth_manager():
+        barrier.wait()
+        results.append(app_module.create_auth_manager())
+
+    with mock.patch.object(app_module, "get_auth_manager_cls", 
return_value=FakeAuthManager):
+        threads = [threading.Thread(target=call_create_auth_manager) for _ in 
range(10)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+    assert len(results) == 10
+    assert all(r is singleton for r in results)
+    assert call_count == 1
+
+    app_module.purge_cached_app()
diff --git a/airflow-core/tests/unit/utils/test_db.py 
b/airflow-core/tests/unit/utils/test_db.py
index d6a8d4c1d66..1b9e3062b90 100644
--- a/airflow-core/tests/unit/utils/test_db.py
+++ b/airflow-core/tests/unit/utils/test_db.py
@@ -240,6 +240,9 @@ class TestDb:
 
         mock_upgrade = mocker.patch("alembic.command.upgrade")
 
+        from airflow.api_fastapi.app import purge_cached_app
+
+        purge_cached_app()
         with conf_vars(auth):
             upgradedb()
 
diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/utils.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/utils.py
index 174b3867ad0..f5e5ea9c790 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/utils.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/utils.py
@@ -28,6 +28,7 @@ from flask import Flask
 from sqlalchemy.engine import make_url
 
 import airflow
+from airflow.api_fastapi.app import purge_cached_app
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.providers.fab.www.extensions.init_appbuilder import 
init_appbuilder
@@ -49,6 +50,8 @@ def _return_appbuilder(app: Flask) -> AirflowAppBuilder:
 
 @contextmanager
 def get_application_builder() -> Generator[AirflowAppBuilder, None, None]:
+    _return_appbuilder.cache_clear()
+    purge_cached_app()
     static_folder = os.path.join(os.path.dirname(airflow.__file__), "www", 
"static")
     flask_app = Flask(__name__, static_folder=static_folder)
     webserver_config = conf.get_mandatory_value("fab", "config_file")
diff --git a/providers/fab/tests/unit/fab/auth_manager/api_fastapi/conftest.py 
b/providers/fab/tests/unit/fab/auth_manager/api_fastapi/conftest.py
index 86273a0af74..4c7f566d395 100644
--- a/providers/fab/tests/unit/fab/auth_manager/api_fastapi/conftest.py
+++ b/providers/fab/tests/unit/fab/auth_manager/api_fastapi/conftest.py
@@ -19,11 +19,13 @@ from __future__ import annotations
 import pytest
 from fastapi.testclient import TestClient
 
+from airflow.api_fastapi.app import purge_cached_app
 from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
 
 
 @pytest.fixture(scope="module")
 def fab_auth_manager():
+    purge_cached_app()
     return FabAuthManager(None)
 
 
diff --git a/providers/fab/tests/unit/fab/auth_manager/conftest.py 
b/providers/fab/tests/unit/fab/auth_manager/conftest.py
index 2cad4b4db03..0def443a7f3 100644
--- a/providers/fab/tests/unit/fab/auth_manager/conftest.py
+++ b/providers/fab/tests/unit/fab/auth_manager/conftest.py
@@ -22,7 +22,9 @@ from pathlib import Path
 
 import pytest
 
+from airflow.api_fastapi.app import purge_cached_app
 from airflow.providers.fab.www import app
+from airflow.providers.fab.www.app import purge_cached_app as 
purge_fab_cached_app
 
 from tests_common.test_utils.config import conf_vars
 from unit.fab.decorators import dont_initialize_flask_app_submodules
@@ -30,6 +32,9 @@ from unit.fab.decorators import 
dont_initialize_flask_app_submodules
 
 @pytest.fixture(scope="session")
 def minimal_app_for_auth_api():
+    purge_cached_app()
+    purge_fab_cached_app()
+
     @dont_initialize_flask_app_submodules(
         skip_all_except=[
             "init_appbuilder",
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py 
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index 85fb85dace9..a60ed124664 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -159,6 +159,12 @@ def auth_manager():
 
 @pytest.fixture
 def flask_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
+    from airflow.providers.fab.www.app import purge_cached_app as 
purge_fab_cached_app
+
+    purge_fab_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_security.py 
b/providers/fab/tests/unit/fab/auth_manager/test_security.py
index 75dec27d8b2..218677098c1 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_security.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_security.py
@@ -203,6 +203,9 @@ def clear_db_before_test():
 
 @pytest.fixture(scope="module")
 def app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git 
a/providers/fab/tests/unit/fab/auth_manager/views/test_permissions.py 
b/providers/fab/tests/unit/fab/auth_manager/views/test_permissions.py
index 52c67f57425..c33383e856d 100644
--- a/providers/fab/tests/unit/fab/auth_manager/views/test_permissions.py
+++ b/providers/fab/tests/unit/fab/auth_manager/views/test_permissions.py
@@ -30,6 +30,9 @@ from unit.fab.utils import client_with_login
 
 @pytest.fixture(scope="module")
 def fab_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/auth_manager/views/test_roles_list.py 
b/providers/fab/tests/unit/fab/auth_manager/views/test_roles_list.py
index 4ef28203ef7..7f293aa85e2 100644
--- a/providers/fab/tests/unit/fab/auth_manager/views/test_roles_list.py
+++ b/providers/fab/tests/unit/fab/auth_manager/views/test_roles_list.py
@@ -30,6 +30,9 @@ from unit.fab.utils import client_with_login
 
 @pytest.fixture(scope="module")
 def fab_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/auth_manager/views/test_user.py 
b/providers/fab/tests/unit/fab/auth_manager/views/test_user.py
index c56d5f7aa09..a8748b14d92 100644
--- a/providers/fab/tests/unit/fab/auth_manager/views/test_user.py
+++ b/providers/fab/tests/unit/fab/auth_manager/views/test_user.py
@@ -30,6 +30,9 @@ from unit.fab.utils import client_with_login
 
 @pytest.fixture(scope="module")
 def fab_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/auth_manager/views/test_user_edit.py 
b/providers/fab/tests/unit/fab/auth_manager/views/test_user_edit.py
index 97e57d4fe28..9b34aab2bb0 100644
--- a/providers/fab/tests/unit/fab/auth_manager/views/test_user_edit.py
+++ b/providers/fab/tests/unit/fab/auth_manager/views/test_user_edit.py
@@ -30,6 +30,9 @@ from unit.fab.utils import client_with_login
 
 @pytest.fixture(scope="module")
 def fab_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/auth_manager/views/test_user_stats.py 
b/providers/fab/tests/unit/fab/auth_manager/views/test_user_stats.py
index 3671217e0fe..6ebbf614c70 100644
--- a/providers/fab/tests/unit/fab/auth_manager/views/test_user_stats.py
+++ b/providers/fab/tests/unit/fab/auth_manager/views/test_user_stats.py
@@ -30,6 +30,9 @@ from unit.fab.utils import client_with_login
 
 @pytest.fixture(scope="module")
 def fab_app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git a/providers/fab/tests/unit/fab/www/test_auth.py 
b/providers/fab/tests/unit/fab/www/test_auth.py
index b20ee27bd34..7559ecbb704 100644
--- a/providers/fab/tests/unit/fab/www/test_auth.py
+++ b/providers/fab/tests/unit/fab/www/test_auth.py
@@ -33,6 +33,9 @@ mock_call = Mock()
 
 @pytest.fixture
 def app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
     with conf_vars(
         {
             (
diff --git 
a/providers/fab/tests/unit/fab/www/views/test_views_custom_user_views.py 
b/providers/fab/tests/unit/fab/www/views/test_views_custom_user_views.py
index e63ce584e3b..9cafe982476 100644
--- a/providers/fab/tests/unit/fab/www/views/test_views_custom_user_views.py
+++ b/providers/fab/tests/unit/fab/www/views/test_views_custom_user_views.py
@@ -63,6 +63,34 @@ PERMISSIONS_TESTS_PARAMS = [
 ]
 
 
+def delete_roles(app):
+    for role_name in ["role_edit_one_dag"]:
+        delete_role(app, role_name)
+
+
[email protected]
+def app():
+    from airflow.api_fastapi.app import purge_cached_app
+
+    purge_cached_app()
+    with conf_vars(
+        {
+            (
+                "core",
+                "auth_manager",
+            ): 
"airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager",
+        }
+    ):
+        app = application.create_app(enable_plugins=False)
+        app.config["WTF_CSRF_ENABLED"] = False
+        yield app
+
+
[email protected]
+def client(app):
+    return app.test_client()
+
+
 class TestSecurity:
     @classmethod
     def setup_class(cls):
diff --git 
a/providers/google/tests/unit/google/common/auth_backend/test_google_openid.py 
b/providers/google/tests/unit/google/common/auth_backend/test_google_openid.py
index 4cc6ad386c0..16ac6cc6ac0 100644
--- 
a/providers/google/tests/unit/google/common/auth_backend/test_google_openid.py
+++ 
b/providers/google/tests/unit/google/common/auth_backend/test_google_openid.py
@@ -34,6 +34,8 @@ if not AIRFLOW_V_3_0_PLUS:
         allow_module_level=True,
     )
 
+from airflow.api_fastapi.app import purge_cached_app
+
 from tests_common.test_utils.config import conf_vars
 
 
@@ -42,6 +44,11 @@ def google_openid_app():
     if importlib.util.find_spec("flask_session") is None:
         return None
 
+    purge_cached_app()
+    from airflow.providers.fab.www.app import purge_cached_app as 
purge_fab_cached_app
+
+    purge_fab_cached_app()
+
     def factory():
         with conf_vars(
             {
diff --git 
a/providers/keycloak/tests/unit/keycloak/auth_manager/routes/conftest.py 
b/providers/keycloak/tests/unit/keycloak/auth_manager/routes/conftest.py
index 1807778b86a..bd6a40eb62a 100644
--- a/providers/keycloak/tests/unit/keycloak/auth_manager/routes/conftest.py
+++ b/providers/keycloak/tests/unit/keycloak/auth_manager/routes/conftest.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 import pytest
 from fastapi.testclient import TestClient
 
-from airflow.api_fastapi.app import create_app
+from airflow.api_fastapi.app import create_app, purge_cached_app
 from airflow.providers.keycloak.auth_manager.constants import (
     CONF_CLIENT_ID_KEY,
     CONF_CLIENT_SECRET_KEY,
@@ -32,6 +32,7 @@ from tests_common.test_utils.config import conf_vars
 
 @pytest.fixture
 def client():
+    purge_cached_app()
     with conf_vars(
         {
             (

Reply via email to