This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4da5903b481190b6ac72df909f641f88a1020a26 Author: Vincent <[email protected]> AuthorDate: Mon Mar 9 13:02:59 2026 -0400 Fix race condition in auth manager initialization (#62431) (#62995) FAB FastAPI routes call get_application_builder() on every request, which creates a new Flask app and invokes init_app(). Concurrent calls race on the singleton auth_manager's appbuilder and security_manager, causing KeyError: 'AUTH_USER_REGISTRATION' and AttributeError. Add _init_app_lock around the critical section in init_app() that mutates the singleton auth_manager state and registers views, so concurrent get_application_builder() calls are serialized. Co-authored-by: Young-Ki Kim <[email protected]> --- airflow-core/src/airflow/api_fastapi/app.py | 10 +++++-- airflow-core/tests/unit/api_fastapi/test_app.py | 35 ++++++++++++++++++++++ .../fab/www/extensions/init_appbuilder.py | 33 ++++++++++++-------- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/app.py b/airflow-core/src/airflow/api_fastapi/app.py index 3a61c7895c7..c13ea9ed675 100644 --- a/airflow-core/src/airflow/api_fastapi/app.py +++ b/airflow-core/src/airflow/api_fastapi/app.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import threading from contextlib import AsyncExitStack, asynccontextmanager from functools import cache from typing import TYPE_CHECKING, cast @@ -68,6 +69,7 @@ log = logging.getLogger(__name__) class _AuthManagerState: instance: BaseAuthManager | None = None + _lock = threading.Lock() @asynccontextmanager @@ -148,9 +150,13 @@ def get_auth_manager_cls() -> type[BaseAuthManager]: def create_auth_manager() -> BaseAuthManager: - """Create the auth manager.""" + """Create the auth manager, cached as a thread-safe singleton.""" auth_manager_cls = get_auth_manager_cls() - _AuthManagerState.instance = auth_manager_cls() + if _AuthManagerState.instance is not None and isinstance(_AuthManagerState.instance, auth_manager_cls): + return _AuthManagerState.instance + with _AuthManagerState._lock: + if _AuthManagerState.instance is None or not isinstance(_AuthManagerState.instance, auth_manager_cls): + _AuthManagerState.instance = auth_manager_cls() return _AuthManagerState.instance diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py b/airflow-core/tests/unit/api_fastapi/test_app.py index cd43260211b..d4d3e8999ce 100644 --- a/airflow-core/tests/unit/api_fastapi/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/test_app.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import threading from unittest import mock import pytest @@ -140,3 +141,37 @@ class TestGetCookiePath: """When base_url contains a nested subpath, get_cookie_path() should return it.""" with mock.patch.object(app_module, "API_ROOT_PATH", "/org/team-a/airflow/"): assert app_module.get_cookie_path() == "/org/team-a/airflow/" + + +def test_create_auth_manager_thread_safety(): + """Concurrent calls to create_auth_manager must return the same singleton instance.""" + call_count = 0 + singleton = None + + class FakeAuthManager: + def __init__(self): + nonlocal call_count, singleton + call_count += 1 + singleton = self + + app_module.purge_cached_app() + + results = [] + barrier = threading.Barrier(10) + + def call_create_auth_manager(): + barrier.wait() + results.append(app_module.create_auth_manager()) + + with mock.patch.object(app_module, "get_auth_manager_cls", return_value=FakeAuthManager): + threads = [threading.Thread(target=call_create_auth_manager) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(results) == 10 + assert all(r is singleton for r in results) + assert call_count == 1 + + app_module.purge_cached_app() diff --git a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py index 5c510984480..59b60c9ff6c 100644 --- a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py +++ b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py @@ -19,6 +19,7 @@ from __future__ import annotations import logging +import threading from functools import reduce from typing import TYPE_CHECKING @@ -57,6 +58,8 @@ if TYPE_CHECKING: # This module contains code imported from FlaskAppbuilder, so lets use _its_ logger name log = logging.getLogger("flask_appbuilder.base") +_init_app_lock = threading.Lock() + def dynamic_class_import(class_path): """ @@ -197,19 +200,23 @@ class AirflowAppBuilder: self._addon_managers = app.config["ADDON_MANAGERS"] self.session = session auth_manager = create_auth_manager() - auth_manager.appbuilder = self - if hasattr(auth_manager, "init_flask_resources"): - auth_manager.init_flask_resources() - if hasattr(auth_manager, "security_manager"): - self.sm = auth_manager.security_manager - else: - self.sm = AirflowSecurityManagerV2(self) - self.bm = BabelManager(self) - self._add_global_static() - self._add_global_filters() - app.before_request(self.sm.before_request) - self._add_admin_views() - self._add_addon_views() + with _init_app_lock: + auth_manager.appbuilder = self + # Invalidate cached security_manager so it binds to the current Flask app. + if "security_manager" in auth_manager.__dict__: + del auth_manager.__dict__["security_manager"] + if hasattr(auth_manager, "init_flask_resources"): + auth_manager.init_flask_resources() + if hasattr(auth_manager, "security_manager"): + self.sm = auth_manager.security_manager + else: + self.sm = AirflowSecurityManagerV2(self) + self.bm = BabelManager(self) + self._add_global_static() + self._add_global_filters() + app.before_request(self.sm.before_request) + self._add_admin_views() + self._add_addon_views() self._init_extension(app) self._swap_url_filter()
