This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 281cdabc93e Fix race condition in auth manager initialization (#62431)
281cdabc93e is described below

commit 281cdabc93e22400004e1f954874d5bec0a5db3d
Author: Young-Ki Kim <[email protected]>
AuthorDate: Thu Mar 5 23:28:33 2026 +0900

    Fix race condition in auth manager initialization (#62431)
    
    FAB FastAPI routes call get_application_builder() on every request,
    which creates a new Flask app and invokes init_app(). Concurrent calls
    race on the singleton auth_manager's appbuilder and security_manager,
    causing KeyError: 'AUTH_USER_REGISTRATION' and AttributeError.
    
    Add _init_app_lock around the critical section in init_app() that
    mutates the singleton auth_manager state and registers views, so
    concurrent get_application_builder() calls are serialized.
---
 airflow-core/src/airflow/api_fastapi/app.py        | 10 +++++--
 airflow-core/tests/unit/api_fastapi/test_app.py    | 35 ++++++++++++++++++++++
 .../fab/www/extensions/init_appbuilder.py          | 33 ++++++++++++--------
 3 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/app.py 
b/airflow-core/src/airflow/api_fastapi/app.py
index 5d1a5f46568..9b7dd0a9acc 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import logging
+import threading
 from contextlib import AsyncExitStack, asynccontextmanager
 from functools import cache
 from typing import TYPE_CHECKING
@@ -67,6 +68,7 @@ log = logging.getLogger(__name__)
 
 class _AuthManagerState:
     instance: BaseAuthManager | None = None
+    _lock = threading.Lock()
 
 
 @asynccontextmanager
@@ -146,9 +148,13 @@ def get_auth_manager_cls() -> type[BaseAuthManager]:
 
 
 def create_auth_manager() -> BaseAuthManager:
-    """Create the auth manager."""
+    """Create the auth manager, cached as a thread-safe singleton."""
     auth_manager_cls = get_auth_manager_cls()
-    _AuthManagerState.instance = auth_manager_cls()
+    if _AuthManagerState.instance is not None and 
isinstance(_AuthManagerState.instance, auth_manager_cls):
+        return _AuthManagerState.instance
+    with _AuthManagerState._lock:
+        if _AuthManagerState.instance is None or not 
isinstance(_AuthManagerState.instance, auth_manager_cls):
+            _AuthManagerState.instance = auth_manager_cls()
     return _AuthManagerState.instance
 
 
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py 
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 1879720d100..130247f60c0 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import threading
 from unittest import mock
 
 import pytest
@@ -140,3 +141,37 @@ class TestGetCookiePath:
         """When base_url contains a nested subpath, get_cookie_path() should 
return it."""
         with mock.patch.object(app_module, "API_ROOT_PATH", 
"/org/team-a/airflow/"):
             assert app_module.get_cookie_path() == "/org/team-a/airflow/"
+
+
+def test_create_auth_manager_thread_safety():
+    """Concurrent calls to create_auth_manager must return the same singleton 
instance."""
+    call_count = 0
+    singleton = None
+
+    class FakeAuthManager:
+        def __init__(self):
+            nonlocal call_count, singleton
+            call_count += 1
+            singleton = self
+
+    app_module.purge_cached_app()
+
+    results = []
+    barrier = threading.Barrier(10)
+
+    def call_create_auth_manager():
+        barrier.wait()
+        results.append(app_module.create_auth_manager())
+
+    with mock.patch.object(app_module, "get_auth_manager_cls", 
return_value=FakeAuthManager):
+        threads = [threading.Thread(target=call_create_auth_manager) for _ in 
range(10)]
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+    assert len(results) == 10
+    assert all(r is singleton for r in results)
+    assert call_count == 1
+
+    app_module.purge_cached_app()
diff --git 
a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py 
b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
index 97ed27940a0..c68b8aaea33 100644
--- a/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
+++ b/providers/fab/src/airflow/providers/fab/www/extensions/init_appbuilder.py
@@ -19,6 +19,7 @@
 from __future__ import annotations
 
 import logging
+import threading
 from functools import reduce
 from typing import TYPE_CHECKING
 
@@ -58,6 +59,8 @@ if TYPE_CHECKING:
 # This module contains code imported from FlaskAppbuilder, so lets use _its_ 
logger name
 log = logging.getLogger("flask_appbuilder.base")
 
+_init_app_lock = threading.Lock()
+
 
 def dynamic_class_import(class_path):
     """
@@ -194,19 +197,23 @@ class AirflowAppBuilder:
         self._addon_managers = app.config["ADDON_MANAGERS"]
         self.session = session
         auth_manager = create_auth_manager()
-        auth_manager.appbuilder = self
-        if hasattr(auth_manager, "init_flask_resources"):
-            auth_manager.init_flask_resources()
-        if hasattr(auth_manager, "security_manager"):
-            self.sm = auth_manager.security_manager
-        else:
-            self.sm = AirflowSecurityManagerV2(self)
-        self.bm = BabelManager(self)
-        self._add_global_static()
-        self._add_global_filters()
-        app.before_request(self.sm.before_request)
-        self._add_admin_views()
-        self._add_addon_views()
+        with _init_app_lock:
+            auth_manager.appbuilder = self
+            # Invalidate cached security_manager so it binds to the current 
Flask app.
+            if "security_manager" in auth_manager.__dict__:
+                del auth_manager.__dict__["security_manager"]
+            if hasattr(auth_manager, "init_flask_resources"):
+                auth_manager.init_flask_resources()
+            if hasattr(auth_manager, "security_manager"):
+                self.sm = auth_manager.security_manager
+            else:
+                self.sm = AirflowSecurityManagerV2(self)
+            self.bm = BabelManager(self)
+            self._add_global_static()
+            self._add_global_filters()
+            app.before_request(self.sm.before_request)
+            self._add_admin_views()
+            self._add_addon_views()
         self._init_extension(app)
         self._swap_url_filter()
 

Reply via email to