This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2b089c54bf Add session cleanup middleware to FAB FastAPI app (#61480)
b2b089c54bf is described below

commit b2b089c54bf96580ea89c2d72646f88748ec6c36
Author: Youil <[email protected]>
AuthorDate: Wed Feb 11 02:09:49 2026 +0900

    Add session cleanup middleware to FAB FastAPI app (#61480)
---
 .../providers/fab/auth_manager/fab_auth_manager.py | 20 ++++++
 .../unit/fab/auth_manager/test_fab_auth_manager.py | 82 ++++++++++++++++++++++
 2 files changed, 102 insertions(+)

diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
index eb0fb75275f..8219b262d72 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
@@ -222,6 +222,26 @@ class FabAuthManager(BaseAuthManager[User]):
         app.include_router(roles_router)
         app.include_router(users_router)
 
+        # Session cleanup middleware to prevent PendingRollbackError.
+        # FAB's Flask views (e.g., /users/list/, /roles/list/) are mounted 
below via
+        # WSGIMiddleware. These views use settings.Session (SQLAlchemy 
scoped_session),
+        # but unlike a native Flask app where teardown_appcontext calls 
Session.remove(),
+        # the WSGI wrapper does not trigger Flask's teardown hooks.
+        # Without explicit cleanup, sessions remain in "idle in transaction" 
state.
+        # When the database connection times out (e.g., PostgreSQL's
+        # idle_in_transaction_session_timeout), subsequent requests reusing the
+        # invalidated session raise PendingRollbackError.
+        @app.middleware("http")
+        async def cleanup_session_middleware(request, call_next):
+            try:
+                response = await call_next(request)
+                return response
+            finally:
+                from airflow import settings
+
+                if settings.Session:
+                    settings.Session.remove()
+
         app.mount("/", WSGIMiddleware(flask_app))
 
         return app
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py 
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index 248c57e73c5..35856f7fb6e 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -957,3 +957,85 @@ def test_resetdb(
         mock_init.assert_not_called()
     else:
         mock_init.assert_called_once()
+
+
+class TestFabAuthManagerSessionCleanup:
+    """Test session cleanup middleware in FAB auth manager FastAPI app.
+
+    Background:
+    FAB auth manager's FastAPI app has the following route structure:
+    - /token, /logout: FastAPI routes (login_router)
+    - /users/*, /roles/*: FastAPI API routes
+    - /*: WSGIMiddleware -> Flask App (FAB views like /users/list/, 
/roles/list/)
+
+    Problem:
+    FAB's Flask views (e.g., /users/list/, /roles/list/) use settings.Session
+    (SQLAlchemy scoped_session). In a normal Flask app, teardown_appcontext
+    automatically calls Session.remove() after each request. However, when 
Flask
+    is mounted via WSGIMiddleware in FastAPI, teardown_appcontext does NOT 
trigger.
+
+    This leaves database sessions in "idle in transaction" state. When the 
database
+    connection times out (e.g., PostgreSQL's 
idle_in_transaction_session_timeout),
+    subsequent requests reusing the invalidated session raise 
PendingRollbackError.
+
+    Solution:
+    Add a FastAPI middleware that calls Session.remove() in the finally block,
+    ensuring session cleanup for ALL requests including those forwarded to 
Flask via WSGI.
+    """
+
+    
@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
+    def test_session_cleanup_middleware_on_wsgi_route(self, mock_create_app):
+        """Test Session.remove() is called after requests to WSGI-mounted 
Flask routes.
+
+        This is the critical scenario: requests to Flask AppBuilder views like
+        /users/list/ and /roles/list/ go through WSGIMiddleware. Without the
+        cleanup middleware, these requests leave sessions in "idle in 
transaction"
+        state, eventually causing PendingRollbackError.
+        """
+        from unittest.mock import patch
+
+        from fastapi.testclient import TestClient
+
+        # Setup mock Flask app (simulates FAB's Flask app)
+        mock_flask_app = MagicMock()
+        mock_create_app.return_value = mock_flask_app
+
+        auth_manager = FabAuthManager()
+        fastapi_app = auth_manager.get_fastapi_app()
+
+        client = TestClient(fastapi_app, raise_server_exceptions=False)
+
+        with patch("airflow.settings.Session") as mock_session:
+            # Request to a path not handled by FastAPI routers goes to 
WSGIMiddleware -> Flask
+            # This simulates accessing /users/list/ or /roles/list/ which 
caused the original bug
+            client.get("/users/list/")
+
+            # Verify Session.remove() was called by the cleanup middleware
+            mock_session.remove.assert_called()
+
+    
@mock.patch("airflow.providers.fab.auth_manager.fab_auth_manager.create_app")
+    def test_session_cleanup_middleware_on_fastapi_route(self, 
mock_create_app):
+        """Test Session.remove() is also called after FastAPI route requests.
+
+        Even though FastAPI routes may not directly use settings.Session,
+        the middleware should clean up any session that might have been
+        used during request processing (e.g., by dependencies or nested calls).
+        """
+        from unittest.mock import patch
+
+        from fastapi.testclient import TestClient
+
+        mock_flask_app = MagicMock()
+        mock_create_app.return_value = mock_flask_app
+
+        auth_manager = FabAuthManager()
+        fastapi_app = auth_manager.get_fastapi_app()
+
+        client = TestClient(fastapi_app, raise_server_exceptions=False)
+
+        with patch("airflow.settings.Session") as mock_session:
+            # Request to a FastAPI route (login endpoint)
+            client.post("/token", json={"username": "test", "password": 
"test"})
+
+            # Verify Session.remove() was called
+            mock_session.remove.assert_called()

Reply via email to