This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f5d8201ea7935d17cecaf25fc90d4ef0ccdd627b
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Aug 13 21:21:43 2023 +0200

    Remove user sessions when resetting password (#33347)
    
    * Remove user sessions when resetting password
    
    When user's password is reset, we also remove all DB sessions
    for that user - for database session backend.
    
    In case we are using securecookie mechanism, resetting password does
    not invalidate old sessions, so instead we are displaying warning to
    the user performing the reset that in order to clear existing
    sessions of the user, the secure_key needs to be changed and it
    will invalidate all sessions for all users.
    
    Protection has been added in case the number of sessions in the DB
    is too big to effectively scan and remove sessions for the user. In
    such case we print warning for the user that sessions have not
    been reset, and we suggest to improve the way their deployment
    mechanisms create too many sessions - by either changing the way
    how automation of the API calls is done and/or by purging the
    sessions regularly by "airflow db clean".
    
    * Update airflow/auth/managers/fab/security_manager/override.py
    
    Co-authored-by: Hussein Awala <[email protected]>
    
    ---------
    
    Co-authored-by: Hussein Awala <[email protected]>
    (cherry picked from commit 2caa186935151683076b74357daad83d2538a3f6)
---
 .../auth/managers/fab/security_manager/override.py |  53 +++++++++-
 airflow/config_templates/config.yml                |  14 ++-
 airflow/utils/db_cleanup.py                        |   4 +
 tests/www/views/test_views_custom_user_views.py    | 112 +++++++++++++++++++++
 4 files changed, 181 insertions(+), 2 deletions(-)

diff --git a/airflow/auth/managers/fab/security_manager/override.py 
b/airflow/auth/managers/fab/security_manager/override.py
index f452fe912c..089b449422 100644
--- a/airflow/auth/managers/fab/security_manager/override.py
+++ b/airflow/auth/managers/fab/security_manager/override.py
@@ -19,14 +19,26 @@ from __future__ import annotations
 
 from functools import cached_property
 
-from flask import g
+from flask import flash, g
 from flask_appbuilder.const import AUTH_DB, AUTH_LDAP, AUTH_OAUTH, AUTH_OID, 
AUTH_REMOTE_USER
 from flask_babel import lazy_gettext
 from flask_jwt_extended import JWTManager
 from flask_login import LoginManager
+from itsdangerous import want_bytes
+from markupsafe import Markup
 from werkzeug.security import generate_password_hash
 
+from airflow.auth.managers.fab.models import User
 from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser
+from airflow.www.session import AirflowDatabaseSessionInterface
+
+# This is the limit of DB user sessions that we consider as "healthy". If you 
have more sessions that this
+# number then we will refuse to delete sessions that have expired and old user 
sessions when resetting
+# user's password, and raise a warning in the UI instead. Usually when you 
have that many sessions, it means
+# that there is something wrong with your deployment - for example you have an 
automated API call that
+# continuously creates new sessions. Such setup should be fixed by reusing 
sessions or by periodically
+# purging the old sessions by using `airflow db clean` command.
+MAX_NUM_DATABASE_USER_SESSIONS = 50000
 
 
 class FabAirflowSecurityManagerOverride:
@@ -230,8 +242,47 @@ class FabAirflowSecurityManagerOverride:
         """
         user = self.get_user_by_id(userid)
         user.password = generate_password_hash(password)
+        self.reset_user_sessions(user)
         self.update_user(user)
 
+    def reset_user_sessions(self, user: User) -> None:
+        if isinstance(self.appbuilder.get_app.session_interface, 
AirflowDatabaseSessionInterface):
+            interface = self.appbuilder.get_app.session_interface
+            session = interface.db.session
+            user_session_model = interface.sql_session_model
+            num_sessions = session.query(user_session_model).count()
+            if num_sessions > MAX_NUM_DATABASE_USER_SESSIONS:
+                flash(
+                    Markup(
+                        f"The old sessions for user {user.username} have 
<b>NOT</b> been deleted!<br>"
+                        f"You have a lot ({num_sessions}) of user sessions in 
the 'SESSIONS' table in "
+                        f"your database.<br> "
+                        "This indicates that this deployment might have an 
automated API calls that create "
+                        "and not reuse sessions.<br>You should consider 
reusing sessions or cleaning them "
+                        "periodically using db clean.<br>"
+                        "Make sure to reset password for the user again after 
cleaning the session table "
+                        "to remove old sessions of the user."
+                    ),
+                    "warning",
+                )
+            else:
+                for s in session.query(user_session_model):
+                    session_details = 
interface.serializer.loads(want_bytes(s.data))
+                    if session_details.get("_user_id") == user.id:
+                        session.delete(s)
+        else:
+            flash(
+                Markup(
+                    "Since you are using `securecookie` session backend 
mechanism, we cannot prevent "
+                    f"some old sessions for user {user.username} to be 
reused.<br> If you want to make sure "
+                    "that the user is logged out from all sessions, you should 
consider using "
+                    "`database` session backend mechanism.<br> You can also 
change the 'secret_key` "
+                    "webserver configuration for all your webserver instances 
and restart the webserver. "
+                    "This however will logout all users from all sessions."
+                ),
+                "warning",
+            )
+
     def load_user(self, user_id):
         """Load user by ID."""
         return self.get_user_by_id(int(user_id))
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index dc89dcc814..b9ea0c0e3f 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1387,7 +1387,19 @@ webserver:
       default: ""
     session_backend:
       description: |
-        The type of backend used to store web session data, can be 'database' 
or 'securecookie'
+        The type of backend used to store web session data, can be `database` 
or `securecookie`. For the
+        `database` backend, sessions are store in the database (in `session` 
table) and they can be
+        managed there (for example when you reset password of the user, all 
sessions for that user are
+        deleted). For the `securecookie` backend, sessions are stored in 
encrypted cookies on the client
+        side. The `securecookie` mechanism is 'lighter' than database backend, 
but sessions are not deleted
+        when you reset password of the user, which means that other than 
waiting for expiry time, the only
+        way to invalidate all sessions for a user is to change secret_key and 
restart webserver (which
+        also invalidates and logs out all other user's sessions).
+
+        When you are using `database` backend, make sure to keep your database 
session table small
+        by periodically running `airflow db clean --table session` command, 
especially if you have
+        automated API calls that will create a new session for each call 
rather than reuse the sessions
+        stored in browser cookies.
       version_added: 2.2.4
       type: string
       example: securecookie
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index f9dae37242..79a804a9e5 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -38,6 +38,7 @@ from sqlalchemy.sql.expression import ClauseElement, 
Executable, tuple_
 
 from airflow import AirflowException
 from airflow.cli.simple_table import AirflowConsole
+from airflow.configuration import conf
 from airflow.models import Base
 from airflow.utils import timezone
 from airflow.utils.db import reflect_tables
@@ -115,6 +116,9 @@ config_list: list[_TableConfig] = [
     _TableConfig(table_name="celery_tasksetmeta", 
recency_column_name="date_done"),
 ]
 
+if conf.get("webserver", "session_backend") == "database":
+    config_list.append(_TableConfig(table_name="session", 
recency_column_name="expiry"))
+
 config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in 
sorted(config_list)}
 
 
diff --git a/tests/www/views/test_views_custom_user_views.py 
b/tests/www/views/test_views_custom_user_views.py
index 71ab551ab7..771a103d69 100644
--- a/tests/www/views/test_views_custom_user_views.py
+++ b/tests/www/views/test_views_custom_user_views.py
@@ -17,7 +17,11 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timedelta
+from unittest import mock
+
 import pytest
+from flask.sessions import SecureCookieSessionInterface
 from flask_appbuilder import SQLA
 
 from airflow import settings
@@ -166,3 +170,111 @@ class TestSecurity:
         check_content_in_response("Deleted Row", response)
         check_content_not_in_response(user_to_delete.username, response)
         assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) 
is False
+
+
+# type: ignore[attr-defined]
+
+
+class TestResetUserSessions:
+    @classmethod
+    def setup_class(cls):
+        settings.configure_orm()
+
+    def setup_method(self):
+        # We cannot reuse the app in tests (on class level) as in Flask 2.2 
this causes
+        # an exception because app context teardown is removed and if even 
single request is run via app
+        # it cannot be re-intialized again by passing it as constructor to SQLA
+        # This makes the tests slightly slower (but they work with Flask 2.1 
and 2.2
+        self.app = application.create_app(testing=True)
+        self.appbuilder = self.app.appbuilder
+        self.app.config["WTF_CSRF_ENABLED"] = False
+        self.security_manager = self.appbuilder.sm
+        self.interface = self.app.session_interface
+        self.model = self.interface.sql_session_model
+        self.serializer = self.interface.serializer
+        self.db = self.interface.db
+        self.db.session.query(self.model).delete()
+        self.db.session.commit()
+        self.db.session.flush()
+        self.user_1 = create_user(
+            self.app,
+            username="user_to_delete_1",
+            role_name="user_to_delete",
+        )
+        self.user_2 = create_user(
+            self.app,
+            username="user_to_delete_2",
+            role_name="user_to_delete",
+        )
+        self.db.session.commit()
+        self.db.session.flush()
+
+    def create_user_db_session(self, session_id: str, time_delta: timedelta, 
user_id: int):
+        self.db.session.add(
+            self.model(
+                session_id=session_id,
+                data=self.serializer.dumps({"_user_id": user_id}),
+                expiry=datetime.now() + time_delta,
+            )
+        )
+
+    @pytest.mark.parametrize(
+        "time_delta, user_sessions_deleted",
+        [
+            pytest.param(timedelta(days=-1), True, id="Both expired"),
+            pytest.param(timedelta(hours=1), True, id="Both fresh"),
+            pytest.param(timedelta(days=1), True, id="Both future"),
+        ],
+    )
+    def test_reset_user_sessions_delete(self, time_delta: timedelta, 
user_sessions_deleted: bool):
+
+        self.create_user_db_session("session_id_1", time_delta, self.user_1.id)
+        self.create_user_db_session("session_id_2", time_delta, self.user_2.id)
+        self.db.session.commit()
+        self.db.session.flush()
+        assert self.db.session.query(self.model).count() == 2
+        assert self.get_session_by_id("session_id_1") is not None
+        assert self.get_session_by_id("session_id_2") is not None
+
+        self.security_manager.reset_password(self.user_1.id, "new_password")
+        self.db.session.commit()
+        self.db.session.flush()
+        if user_sessions_deleted:
+            assert self.db.session.query(self.model).count() == 1
+            assert self.get_session_by_id("session_id_1") is None
+        else:
+            assert self.db.session.query(self.model).count() == 2
+            assert self.get_session_by_id("session_id_1") is not None
+
+    def get_session_by_id(self, session_id: str):
+        return self.db.session.query(self.model).filter(self.model.session_id 
== session_id).scalar()
+
+    @mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
+    
@mock.patch("airflow.auth.managers.fab.security_manager.override.MAX_NUM_DATABASE_USER_SESSIONS",
 1)
+    def test_refuse_delete(self, flash_mock):
+        self.create_user_db_session("session_id_1", timedelta(days=1), 
self.user_1.id)
+        self.create_user_db_session("session_id_2", timedelta(days=1), 
self.user_2.id)
+        self.db.session.commit()
+        self.db.session.flush()
+        assert self.db.session.query(self.model).count() == 2
+        assert self.get_session_by_id("session_id_1") is not None
+        assert self.get_session_by_id("session_id_2") is not None
+        self.security_manager.reset_password(self.user_1.id, "new_password")
+        assert flash_mock.called
+        assert (
+            "The old sessions for user user_to_delete_1 have <b>NOT</b> been 
deleted!"
+            in flash_mock.call_args[0][0]
+        )
+        assert self.db.session.query(self.model).count() == 2
+        assert self.get_session_by_id("session_id_1") is not None
+        assert self.get_session_by_id("session_id_2") is not None
+
+    @mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
+    def test_warn_securecookie(self, flash_mock):
+        self.app.session_interface = SecureCookieSessionInterface()
+        self.security_manager.reset_password(self.user_1.id, "new_password")
+        assert flash_mock.called
+        assert (
+            "Since you are using `securecookie` session backend mechanism, we 
cannot"
+            in flash_mock.call_args[0][0]
+        )

Reply via email to