This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aec3744e15f fix(fab): recover from first idle MySQL disconnect in
token auth (#62919)
aec3744e15f is described below
commit aec3744e15fff587b9be623b6b35f0840139d632
Author: Kushal Bohra <[email protected]>
AuthorDate: Tue Mar 10 10:37:59 2026 -0700
fix(fab): recover from first idle MySQL disconnect in token auth (#62919)
* fix(fab): recover from first idle MySQL disconnect in token auth
Retry user deserialization once after clearing the poisoned scoped session
so the first request after a server-side idle timeout does not return 500. Add
regression coverage for transient disconnect recovery and factorize
deserialization lookup logic to avoid duplication.
* test(fab): rename retry mock for clarity
---
.../providers/fab/auth_manager/fab_auth_manager.py | 21 ++++++++++++++++----
.../unit/fab/auth_manager/test_fab_auth_manager.py | 23 ++++++++++++++++++++--
2 files changed, 38 insertions(+), 6 deletions(-)
diff --git
a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
index 99fcbeff134..2ad86559a74 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
@@ -267,16 +267,29 @@ class FabAuthManager(BaseAuthManager[User]):
@cachedmethod(lambda self: self.cache, key=lambda _, token:
int(token["sub"]))
def deserialize_user(self, token: dict[str, Any]) -> User:
+ user_id = int(token["sub"])
+
+ def _fetch_user() -> User:
+ try:
+ return self.session.scalars(select(User).where(User.id ==
user_id)).one()
+ except NoResultFound:
+ raise ValueError(f"User with id {token['sub']} not found")
+
try:
- return self.session.scalars(select(User).where(User.id ==
int(token["sub"]))).one()
- except NoResultFound:
- raise ValueError(f"User with id {token['sub']} not found")
+ return _fetch_user()
except SQLAlchemyError:
# Discard the poisoned scoped session so the next request gets a
# fresh connection from the pool instead of a PendingRollbackError.
with suppress(Exception):
self.session.remove()
- raise
+ try:
+ return _fetch_user()
+ except SQLAlchemyError:
+ # If retry also fails, remove the scoped session again to keep
+ # future requests from reusing a broken transaction state.
+ with suppress(Exception):
+ self.session.remove()
+ raise
def serialize_user(self, user: User) -> dict[str, Any]:
return {"sub": str(user.id)}
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index 3b3ebd4bbbe..99f7dc24edc 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -996,7 +996,7 @@ class TestDeserializeUserSessionCleanup:
ids=["operational_error", "pending_rollback_error"],
)
def test_db_error_calls_session_remove(self, auth_manager_with_appbuilder,
raised_exc):
- """session.remove() is called on SQLAlchemy errors so the next request
recovers."""
+ """session.remove() is called on SQLAlchemy errors before and after
retry."""
mock_session = MagicMock(spec=["scalars", "remove"])
mock_session.scalars.side_effect = raised_exc
auth_manager_with_appbuilder.cache.pop(99997, None)
@@ -1005,7 +1005,7 @@ class TestDeserializeUserSessionCleanup:
with pytest.raises(type(raised_exc)):
auth_manager_with_appbuilder.deserialize_user({"sub": "99997"})
- mock_session.remove.assert_called_once()
+ assert mock_session.remove.call_count == 2
def test_db_error_propagates_when_session_remove_raises(self,
auth_manager_with_appbuilder):
"""The original SQLAlchemyError propagates even if session.remove()
itself raises."""
@@ -1021,6 +1021,25 @@ class TestDeserializeUserSessionCleanup:
with pytest.raises(OperationalError):
auth_manager_with_appbuilder.deserialize_user({"sub": "99997"})
+ assert mock_session.remove.call_count == 2
+
+ def test_db_error_retries_once_and_recovers(self,
auth_manager_with_appbuilder):
+ """A transient DB disconnect is recovered by removing session and
retrying once."""
+ user = Mock()
+ user.id = 99996
+ original_exc = OperationalError("connection dropped", None,
Exception())
+ retry_query_result = Mock()
+ retry_query_result.one.return_value = user
+
+ mock_session = MagicMock(spec=["scalars", "remove"])
+ mock_session.scalars.side_effect = [original_exc, retry_query_result]
+ auth_manager_with_appbuilder.cache.pop(user.id, None)
+
+ with self._patched_session(auth_manager_with_appbuilder, mock_session):
+ result = auth_manager_with_appbuilder.deserialize_user({"sub":
str(user.id)})
+
+ assert result == user
+ assert mock_session.scalars.call_count == 2
mock_session.remove.assert_called_once()