This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e7e188d453c Fix race condition in FabAuthManager when workers 
concurrently create permissions, roles, and resources (#63842)
e7e188d453c is described below

commit e7e188d453cac109a8a42340cd1d7eab3abfccc9
Author: Piyush Mudgal <[email protected]>
AuthorDate: Wed Mar 18 20:41:50 2026 +0530

    Fix race condition in FabAuthManager when workers concurrently create 
permissions, roles, and resources (#63842)
---
 .../fab/auth_manager/security_manager/override.py  | 32 ++++++++++
 .../auth_manager/security_manager/test_override.py | 71 ++++++++++++++++++++++
 2 files changed, 103 insertions(+)

diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 80800aa25c2..0cfc2351874 100644
--- 
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++ 
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -1299,6 +1299,13 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
                 self.session.commit()
                 log.info(const.LOGMSG_INF_SEC_ADD_ROLE, name)
                 return role
+            except IntegrityError:
+                self.session.rollback()
+                role = self.find_role(name)
+                if role is not None:
+                    log.info("Role '%s' was created by a concurrent worker, 
using existing record", name)
+                    return role
+                raise
             except Exception as e:
                 log.error(const.LOGMSG_ERR_SEC_ADD_ROLE, e)
                 self.session.rollback()
@@ -1570,6 +1577,13 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
                 self.session.add(action)
                 self.session.commit()
                 return action
+            except IntegrityError:
+                self.session.rollback()
+                action = self.get_action(name)
+                if action is not None:
+                    log.info("Action '%s' was created by a concurrent worker, 
using existing record", name)
+                    return action
+                raise
             except Exception as e:
                 log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION, e)
                 self.session.rollback()
@@ -1628,6 +1642,13 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
                 self.session.add(resource)
                 self.session.commit()
                 return resource
+            except IntegrityError:
+                self.session.rollback()
+                resource = self.get_resource(name)
+                if resource is not None:
+                    log.info("Resource '%s' was created by a concurrent 
worker, using existing record", name)
+                    return resource
+                raise
             except Exception as e:
                 log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU, e)
                 self.session.rollback()
@@ -1698,6 +1719,17 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
             self.session.commit()
             log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW, perm)
             return perm
+        except IntegrityError:
+            self.session.rollback()
+            existing = self.get_permission(action_name, resource_name)
+            if existing is not None:
+                log.info(
+                    "Permission '%s'->'%s' was created by a concurrent worker, 
using existing record",
+                    action_name,
+                    resource_name,
+                )
+                return existing
+            raise
         except Exception as e:
             log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW, e)
             self.session.rollback()
diff --git 
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py 
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
index f8a03f54bfe..a2186c82973 100644
--- 
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
+++ 
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
@@ -25,7 +25,9 @@ from sqlalchemy.exc import IntegrityError
 from sqlalchemy.orm import Session
 
 from airflow.providers.fab.auth_manager.models import (
+    Action,
     Permission,
+    Resource,
     Role,
 )
 from airflow.providers.fab.auth_manager.security_manager.override import 
FabAirflowSecurityManagerOverride
@@ -79,6 +81,75 @@ class TestFabAirflowSecurityManagerOverride:
             f"Failed to add '{permission}' permission to the '{role}' role 
Error: {mock_error}",
         )
 
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def test_add_role_returns_existing_on_concurrent_insert(self, mock_log):
+        sm = EmptySecurityManager()
+        existing_role = Mock(spec=Role, name="Admin")
+
+        mock_session = Mock(spec=Session)
+        mock_session.commit.side_effect = IntegrityError("stmt", {}, 
Exception("Duplicate entry"))
+        sm.find_role = Mock(side_effect=[None, existing_role])
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            result = sm.add_role("Admin")
+
+        assert result is existing_role
+        assert mock_session.rollback.called
+        assert mock_log.error.call_count == 0
+
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def test_create_action_returns_existing_on_concurrent_insert(self, 
mock_log):
+        sm = EmptySecurityManager()
+        existing_action = Mock(spec=Action, name="can_read")
+
+        mock_session = Mock(spec=Session)
+        mock_session.commit.side_effect = IntegrityError("stmt", {}, 
Exception("Duplicate entry"))
+        sm.get_action = Mock(side_effect=[None, existing_action])
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            result = sm.create_action("can_read")
+
+        assert result is existing_action
+        assert mock_session.rollback.called
+        assert mock_log.error.call_count == 0
+
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def test_create_resource_returns_existing_on_concurrent_insert(self, 
mock_log):
+        sm = EmptySecurityManager()
+        existing_resource = Mock(spec=Resource, name="Connections")
+
+        mock_session = Mock(spec=Session)
+        mock_session.commit.side_effect = IntegrityError("stmt", {}, 
Exception("Duplicate entry"))
+        sm.get_resource = Mock(side_effect=[None, existing_resource])
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            result = sm.create_resource("Connections")
+
+        assert result is existing_resource
+        assert mock_session.rollback.called
+        assert mock_log.error.call_count == 0
+
+    
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+    def test_create_permission_returns_existing_on_concurrent_insert(self, 
mock_log):
+        sm = EmptySecurityManager()
+        existing_perm = Mock(spec=Permission)
+        existing_resource = Mock(spec=Resource, id=10)
+        existing_action = Mock(spec=Action, id=20)
+
+        mock_session = Mock(spec=Session)
+        mock_session.commit.side_effect = IntegrityError("stmt", {}, 
Exception("Duplicate entry"))
+
+        sm.get_permission = Mock(side_effect=[None, existing_perm])
+        sm.create_resource = Mock(return_value=existing_resource)
+        sm.create_action = Mock(return_value=existing_action)
+
+        with mock.patch.object(EmptySecurityManager, "session", mock_session):
+            result = sm.create_permission("can_read", "Connections")
+
+        assert result is existing_perm
+        assert mock_session.rollback.called
+        assert mock_log.error.call_count == 0
+
     def test_load_user(self):
         sm = EmptySecurityManager()
         sm.get_user_by_id = Mock()

Reply via email to