This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e7e188d453c Fix race condition in FabAuthManager when workers
concurrently create permissions, roles, and resources (#63842)
e7e188d453c is described below
commit e7e188d453cac109a8a42340cd1d7eab3abfccc9
Author: Piyush Mudgal <[email protected]>
AuthorDate: Wed Mar 18 20:41:50 2026 +0530
Fix race condition in FabAuthManager when workers concurrently create
permissions, roles, and resources (#63842)
---
.../fab/auth_manager/security_manager/override.py | 32 ++++++++++
.../auth_manager/security_manager/test_override.py | 71 ++++++++++++++++++++++
2 files changed, 103 insertions(+)
diff --git
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
index 80800aa25c2..0cfc2351874 100644
---
a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
+++
b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -1299,6 +1299,13 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
self.session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_ROLE, name)
return role
+ except IntegrityError:
+ self.session.rollback()
+ role = self.find_role(name)
+ if role is not None:
+ log.info("Role '%s' was created by a concurrent worker,
using existing record", name)
+ return role
+ raise
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_ROLE, e)
self.session.rollback()
@@ -1570,6 +1577,13 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
self.session.add(action)
self.session.commit()
return action
+ except IntegrityError:
+ self.session.rollback()
+ action = self.get_action(name)
+ if action is not None:
+ log.info("Action '%s' was created by a concurrent worker,
using existing record", name)
+ return action
+ raise
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMISSION, e)
self.session.rollback()
@@ -1628,6 +1642,13 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
self.session.add(resource)
self.session.commit()
return resource
+ except IntegrityError:
+ self.session.rollback()
+ resource = self.get_resource(name)
+ if resource is not None:
+ log.info("Resource '%s' was created by a concurrent
worker, using existing record", name)
+ return resource
+ raise
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_VIEWMENU, e)
self.session.rollback()
@@ -1698,6 +1719,17 @@ class
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
self.session.commit()
log.info(const.LOGMSG_INF_SEC_ADD_PERMVIEW, perm)
return perm
+ except IntegrityError:
+ self.session.rollback()
+ existing = self.get_permission(action_name, resource_name)
+ if existing is not None:
+ log.info(
+ "Permission '%s'->'%s' was created by a concurrent worker,
using existing record",
+ action_name,
+ resource_name,
+ )
+ return existing
+ raise
except Exception as e:
log.error(const.LOGMSG_ERR_SEC_ADD_PERMVIEW, e)
self.session.rollback()
diff --git
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
index f8a03f54bfe..a2186c82973 100644
---
a/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
+++
b/providers/fab/tests/unit/fab/auth_manager/security_manager/test_override.py
@@ -25,7 +25,9 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from airflow.providers.fab.auth_manager.models import (
+ Action,
Permission,
+ Resource,
Role,
)
from airflow.providers.fab.auth_manager.security_manager.override import
FabAirflowSecurityManagerOverride
@@ -79,6 +81,75 @@ class TestFabAirflowSecurityManagerOverride:
f"Failed to add '{permission}' permission to the '{role}' role
Error: {mock_error}",
)
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def test_add_role_returns_existing_on_concurrent_insert(self, mock_log):
+ sm = EmptySecurityManager()
+ existing_role = Mock(spec=Role, name="Admin")
+
+ mock_session = Mock(spec=Session)
+ mock_session.commit.side_effect = IntegrityError("stmt", {},
Exception("Duplicate entry"))
+ sm.find_role = Mock(side_effect=[None, existing_role])
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ result = sm.add_role("Admin")
+
+ assert result is existing_role
+ assert mock_session.rollback.called
+ assert mock_log.error.call_count == 0
+
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def test_create_action_returns_existing_on_concurrent_insert(self,
mock_log):
+ sm = EmptySecurityManager()
+ existing_action = Mock(spec=Action, name="can_read")
+
+ mock_session = Mock(spec=Session)
+ mock_session.commit.side_effect = IntegrityError("stmt", {},
Exception("Duplicate entry"))
+ sm.get_action = Mock(side_effect=[None, existing_action])
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ result = sm.create_action("can_read")
+
+ assert result is existing_action
+ assert mock_session.rollback.called
+ assert mock_log.error.call_count == 0
+
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def test_create_resource_returns_existing_on_concurrent_insert(self,
mock_log):
+ sm = EmptySecurityManager()
+ existing_resource = Mock(spec=Resource, name="Connections")
+
+ mock_session = Mock(spec=Session)
+ mock_session.commit.side_effect = IntegrityError("stmt", {},
Exception("Duplicate entry"))
+ sm.get_resource = Mock(side_effect=[None, existing_resource])
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ result = sm.create_resource("Connections")
+
+ assert result is existing_resource
+ assert mock_session.rollback.called
+ assert mock_log.error.call_count == 0
+
+
@mock.patch("airflow.providers.fab.auth_manager.security_manager.override.log")
+ def test_create_permission_returns_existing_on_concurrent_insert(self,
mock_log):
+ sm = EmptySecurityManager()
+ existing_perm = Mock(spec=Permission)
+ existing_resource = Mock(spec=Resource, id=10)
+ existing_action = Mock(spec=Action, id=20)
+
+ mock_session = Mock(spec=Session)
+ mock_session.commit.side_effect = IntegrityError("stmt", {},
Exception("Duplicate entry"))
+
+ sm.get_permission = Mock(side_effect=[None, existing_perm])
+ sm.create_resource = Mock(return_value=existing_resource)
+ sm.create_action = Mock(return_value=existing_action)
+
+ with mock.patch.object(EmptySecurityManager, "session", mock_session):
+ result = sm.create_permission("can_read", "Connections")
+
+ assert result is existing_perm
+ assert mock_session.rollback.called
+ assert mock_log.error.call_count == 0
+
def test_load_user(self):
sm = EmptySecurityManager()
sm.get_user_by_id = Mock()