This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 281cdabc93e Fix race condition in auth manager initialization (#62431)
281cdabc93e is described below
commit 281cdabc93e22400004e1f954874d5bec0a5db3d
Author: Young-Ki Kim <[email protected]>
AuthorDate: Thu Mar 5 23:28:33 2026 +0900
Fix race condition in auth manager initialization (#62431)
FAB FastAPI routes call get_application_builder() on every request,
which creates a new Flask app and invokes init_app(). Concurrent calls
race on the singleton auth_manager's appbuilder and security_manager,
causing KeyError: 'AUTH_USER_REGISTRATION' and AttributeError.
Add _init_app_lock around the critical section in init_app() that
mutates the singleton auth_manager state and registers views, so
concurrent get_application_builder() calls are serialized.
---
airflow-core/src/airflow/api_fastapi/app.py | 10 +++++--
airflow-core/tests/unit/api_fastapi/test_app.py | 35 ++++++++++++++++++++++
.../fab/www/extensions/init_appbuilder.py | 33 ++++++++++++--------
3 files changed, 63 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/app.py
b/airflow-core/src/airflow/api_fastapi/app.py
index 5d1a5f46568..9b7dd0a9acc 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import logging
+import threading
from contextlib import AsyncExitStack, asynccontextmanager
from functools import cache
from typing import TYPE_CHECKING
@@ -67,6 +68,7 @@ log = logging.getLogger(__name__)
class _AuthManagerState:
instance: BaseAuthManager | None = None
+ _lock = threading.Lock()
@asynccontextmanager
@@ -146,9 +148,13 @@ def get_auth_manager_cls() -> type[BaseAuthManager]:
def create_auth_manager() -> BaseAuthManager:
- """Create the auth manager."""
+ """Create the auth manager, cached as a thread-safe singleton."""
auth_manager_cls = get_auth_manager_cls()
- _AuthManagerState.instance = auth_manager_cls()
+ if _AuthManagerState.instance is not None and
isinstance(_AuthManagerState.instance, auth_manager_cls):
+ return _AuthManagerState.instance
+ with _AuthManagerState._lock:
+ if _AuthManagerState.instance is None or not
isinstance(_AuthManagerState.instance, auth_manager_cls):
+ _AuthManagerState.instance = auth_manager_cls()
return _AuthManagerState.instance
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 1879720d100..130247f60c0 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import threading
from unittest import mock
import pytest
@@ -140,3 +141,37 @@ class TestGetCookiePath:
"""When base_url contains a nested subpath, get_cookie_path() should
return it."""
with mock.patch.object(app_module, "API_ROOT_PATH",
"/org/team-a/airflow/"):
assert app_module.get_cookie_path() == "/org/team-a/airflow/"
+
+
+def test_create_auth_manager_thread_safety():
+ """Concurrent calls to create_auth_manager must return the same singleton
instance."""
+ call_count = 0
+ singleton = None
+
+ class FakeAuthManager:
+ def __init__(self):
+ nonlocal call_count, singleton
+ call_count += 1
+ singleton = self
+
+ app_module.purge_cached_app()
+
+ results = []
+ barrier = threading.Barrier(10)
+
+ def call_create_auth_manager():
+ barrier.wait()
+ results.append(app_module.create_auth_manager())
+
+ with mock.patch.object(app_module, "get_auth_manager_cls",
return_value=FakeAuthManager):
+ threads = [threading.Thread(target=call_create_auth_manager) for _ in
range(10)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ assert len(results) == 10
+ assert all(r is singleton for r in results)
+ assert call_count == 1
+
+ app_module.purge_cached_app()
diff --git
a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
index 97ed27940a0..c68b8aaea33 100644
--- a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
+++ b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
@@ -19,6 +19,7 @@
from __future__ import annotations
import logging
+import threading
from functools import reduce
from typing import TYPE_CHECKING
@@ -58,6 +59,8 @@ if TYPE_CHECKING:
# This module contains code imported from FlaskAppbuilder, so lets use _its_
logger name
log = logging.getLogger("flask_appbuilder.base")
+_init_app_lock = threading.Lock()
+
def dynamic_class_import(class_path):
"""
@@ -194,19 +197,23 @@ class AirflowAppBuilder:
self._addon_managers = app.config["ADDON_MANAGERS"]
self.session = session
auth_manager = create_auth_manager()
- auth_manager.appbuilder = self
- if hasattr(auth_manager, "init_flask_resources"):
- auth_manager.init_flask_resources()
- if hasattr(auth_manager, "security_manager"):
- self.sm = auth_manager.security_manager
- else:
- self.sm = AirflowSecurityManagerV2(self)
- self.bm = BabelManager(self)
- self._add_global_static()
- self._add_global_filters()
- app.before_request(self.sm.before_request)
- self._add_admin_views()
- self._add_addon_views()
+ with _init_app_lock:
+ auth_manager.appbuilder = self
+ # Invalidate cached security_manager so it binds to the current
Flask app.
+ if "security_manager" in auth_manager.__dict__:
+ del auth_manager.__dict__["security_manager"]
+ if hasattr(auth_manager, "init_flask_resources"):
+ auth_manager.init_flask_resources()
+ if hasattr(auth_manager, "security_manager"):
+ self.sm = auth_manager.security_manager
+ else:
+ self.sm = AirflowSecurityManagerV2(self)
+ self.bm = BabelManager(self)
+ self._add_global_static()
+ self._add_global_filters()
+ app.before_request(self.sm.before_request)
+ self._add_admin_views()
+ self._add_addon_views()
self._init_extension(app)
self._swap_url_filter()