This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cf709d929a0 Remove global variables in airflow.settings (#67070)
cf709d929a0 is described below

commit cf709d929a0faf22420f6a207da31eb5d729483f
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon May 18 00:57:54 2026 +0200

    Remove global variables in airflow.settings (#67070)
---
 airflow-core/src/airflow/migrations/env.py         |  2 +-
 airflow-core/src/airflow/settings.py               | 94 ++++++++++++++++------
 airflow-core/src/airflow/utils/db.py               | 11 +--
 airflow-core/src/airflow/utils/db_manager.py       |  2 +-
 airflow-core/tests/unit/core/test_settings.py      | 20 +++--
 .../tests/unit/core/test_sqlalchemy_config.py      | 16 ++--
 airflow-core/tests/unit/utils/test_db.py           | 32 +++++---
 .../providers/fab/auth_manager/models/db.py        | 17 ++--
 .../src/airflow/providers/fab/migrations/env.py    |  4 +-
 .../src/airflow/providers/fab/version_compat.py    |  1 +
 .../tests/unit/fab/auth_manager/models/test_db.py  | 11 ++-
 .../src/airflow/sdk/execution_time/supervisor.py   |  4 +-
 12 files changed, 135 insertions(+), 79 deletions(-)

diff --git a/airflow-core/src/airflow/migrations/env.py 
b/airflow-core/src/airflow/migrations/env.py
index dfb46ab6c37..2f4f8307cc8 100644
--- a/airflow-core/src/airflow/migrations/env.py
+++ b/airflow-core/src/airflow/migrations/env.py
@@ -80,7 +80,7 @@ def run_migrations_offline():
 
     """
     context.configure(
-        url=settings.SQL_ALCHEMY_CONN,
+        url=settings.get_sql_alchemy_conn(),
         target_metadata=target_metadata,
         literal_binds=True,
         compare_type=compare_type,
diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index 739e2437f6a..a824227b23b 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -110,8 +110,15 @@ HEADER = "\n".join(
 
 SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
 
-SQL_ALCHEMY_CONN: str | None = None
-SQL_ALCHEMY_CONN_ASYNC: str | None = None
+
+class _AirflowSettings:
+    """Class to hold Airflow settings. This is used to avoid circular imports 
and to allow for lazy loading of settings."""
+
+    block_orm_access: bool = False
+    sql_alchemy_conn: str | None = None
+    sql_alchemy_conn_async: str | None = None
+
+
 PLUGINS_FOLDER: str | None = None
 DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", 
"DAGS_FOLDER"))
 
@@ -126,7 +133,25 @@ async_engine: AsyncEngine | None = None
 AsyncSession: Callable[..., SAAsyncSession] | None = None
 
 
-def get_engine():
+def get_sql_alchemy_conn() -> str:
+    """Get the configured SQLAlchemy connection string, raising an error if 
not configured."""
+    if _AirflowSettings.block_orm_access:
+        raise AttributeError("Access to the Airflow Metadatabase from dags is 
not allowed!")
+    if _AirflowSettings.sql_alchemy_conn is None:
+        raise RuntimeError("SQLAlchemy connection string not configured. Call 
configure_vars() first.")
+    return _AirflowSettings.sql_alchemy_conn
+
+
+def get_sql_alchemy_conn_async() -> str:
+    """Get the configured SQLAlchemy connection string, raising an error if 
not configured."""
+    if _AirflowSettings.block_orm_access:
+        raise AttributeError("Access to the Airflow Metadatabase from dags is 
not allowed!")
+    if _AirflowSettings.sql_alchemy_conn_async is None:
+        raise RuntimeError("SQLAlchemy async connection string not configured. 
Call configure_vars() first.")
+    return _AirflowSettings.sql_alchemy_conn_async
+
+
+def get_engine() -> Engine:
     """Get the configured engine, raising an error if not configured."""
     if engine is None:
         raise RuntimeError("Engine not configured. Call configure_orm() 
first.")
@@ -140,6 +165,11 @@ def get_session():
     return Session
 
 
+def block_orm_access():
+    """Block access to the ORM by marking state."""
+    _AirflowSettings.block_orm_access = True
+
+
 # The JSON library to use for DAG Serialization and De-Serialization
 json = json_lib
 
@@ -250,16 +280,16 @@ def _get_async_conn_uri_from_sync(sync_uri):
 
 def configure_vars():
     """Configure Global Variables from airflow.cfg."""
-    global SQL_ALCHEMY_CONN
-    global SQL_ALCHEMY_CONN_ASYNC
     global DAGS_FOLDER
     global PLUGINS_FOLDER
 
-    SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
+    _AirflowSettings.sql_alchemy_conn = conf.get("database", 
"sql_alchemy_conn")
     if conf.has_option("database", "sql_alchemy_conn_async"):
-        SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
+        _AirflowSettings.sql_alchemy_conn_async = conf.get("database", 
"sql_alchemy_conn_async")
     else:
-        SQL_ALCHEMY_CONN_ASYNC = 
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
+        _AirflowSettings.sql_alchemy_conn_async = 
_get_async_conn_uri_from_sync(
+            sync_uri=_AirflowSettings.sql_alchemy_conn
+        )
 
     DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
 
@@ -402,7 +432,8 @@ def _configure_async_session() -> None:
     """
     global AsyncSession, async_engine
 
-    if not SQL_ALCHEMY_CONN_ASYNC:
+    async_conn = _AirflowSettings.sql_alchemy_conn_async
+    if not async_conn:
         async_engine = None
         AsyncSession = None
         return
@@ -414,14 +445,14 @@ def _configure_async_session() -> None:
     engine_args: dict[str, Any] = {}
     if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
         engine_args["poolclass"] = NullPool
-    elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
+    elif not async_conn.startswith("sqlite"):
         engine_args["pool_size"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
         engine_args["pool_recycle"] = conf.getint("database", 
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
         engine_args["pool_pre_ping"] = conf.getboolean("database", 
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
         engine_args["max_overflow"] = conf.getint("database", 
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
 
     async_engine = create_async_metadata_engine(
-        SQL_ALCHEMY_CONN_ASYNC,
+        async_conn,
         connect_args=_get_connect_args("async"),
         engine_args=engine_args,
     )
@@ -437,11 +468,12 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     """Configure ORM using SQLAlchemy."""
     from airflow._shared.secrets_masker import mask_secret
 
-    if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
+    conn_str = get_sql_alchemy_conn()
+    if _is_sqlite_db_path_relative(conn_str):
         from airflow.exceptions import AirflowConfigException
 
         raise AirflowConfigException(
-            f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to 
sqlite. "
+            f"Cannot use relative path: `{conn_str}` to connect to sqlite. "
             "Please use absolute path such as `sqlite:////tmp/airflow.db`."
         )
 
@@ -458,17 +490,13 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     engine_args = prepare_engine_args(disable_connection_pool, pool_class)
 
     connect_args = _get_connect_args("sync")
-    if SQL_ALCHEMY_CONN.startswith("sqlite"):
+    if conn_str.startswith("sqlite"):
         # FastAPI runs sync endpoints in a separate thread. SQLite does not 
allow
         # to use objects created in another threads by default. Allowing that 
in test
         # to so the `test` thread and the tested endpoints can use common 
objects.
         connect_args["check_same_thread"] = False
 
-    engine = create_metadata_engine(
-        SQL_ALCHEMY_CONN,
-        engine_args=engine_args,
-        connect_args=connect_args,
-    )
+    engine = create_metadata_engine(conn_str, engine_args=engine_args, 
connect_args=connect_args)
     _configure_async_session()
     mask_secret(engine.url.password)
     setup_event_handlers(engine)
@@ -530,8 +558,9 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     }
 
     default_args = {}
+    conn_str = get_sql_alchemy_conn()
     for dialect, default in DEFAULT_ENGINE_ARGS.items():
-        if SQL_ALCHEMY_CONN.startswith(dialect):
+        if conn_str.startswith(dialect):
             default_args = default.copy()
             break
 
@@ -543,7 +572,7 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     elif disable_connection_pool or not conf.getboolean("database", 
"SQL_ALCHEMY_POOL_ENABLED"):
         engine_args["poolclass"] = NullPool
         log.debug("settings.prepare_engine_args(): Using NullPool")
-    elif _is_sqlite_in_memory(SQL_ALCHEMY_CONN):
+    elif _is_sqlite_in_memory(conn_str):
         # In-memory SQLite uses SingletonThreadPool which doesn't support 
pool_size/max_overflow.
         log.debug("settings.prepare_engine_args(): Skipping pool settings for 
in-memory SQLite")
     else:
@@ -596,7 +625,7 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
     # 'READ COMMITTED' is the default value for PostgreSQL.
     # More information here:
     # 
https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
-    if SQL_ALCHEMY_CONN.startswith("mysql"):
+    if conn_str.startswith("mysql"):
         engine_args["isolation_level"] = "READ COMMITTED"
 
     return engine_args
@@ -645,12 +674,13 @@ def configure_adapters():
     """Register Adapters and DB Converters."""
     from pendulum import DateTime as Pendulum
 
-    if SQL_ALCHEMY_CONN.startswith("sqlite"):
+    conn_str = get_sql_alchemy_conn()
+    if conn_str.startswith("sqlite"):
         from sqlite3 import register_adapter
 
         register_adapter(Pendulum, lambda val: val.isoformat(" "))
 
-    if SQL_ALCHEMY_CONN.startswith("mysql"):
+    if conn_str.startswith("mysql"):
         try:
             try:
                 import MySQLdb.converters
@@ -727,6 +757,22 @@ def __getattr__(name: str):
 
     from airflow.exceptions import RemovedInAirflow4Warning
 
+    if name == "SQL_ALCHEMY_CONN":
+        warnings.warn(
+            "settings.SQL_ALCHEMY_CONN has been replaced by 
get_sql_alchemy_conn(). This shim is just for compatibility. "
+            "Please upgrade your provider or integration.",
+            RemovedInAirflow4Warning,
+            stacklevel=2,
+        )
+        return get_sql_alchemy_conn()
+    if name == "SQL_ALCHEMY_CONN_ASYNC":
+        warnings.warn(
+            "settings.SQL_ALCHEMY_CONN_ASYNC has been replaced by 
get_sql_alchemy_conn_async(). "
+            "This shim is just for compatibility. Please upgrade your provider 
or integration.",
+            RemovedInAirflow4Warning,
+            stacklevel=2,
+        )
+        return get_sql_alchemy_conn_async()
     if name == "MASK_SECRETS_IN_LOGS":
         warnings.warn(
             "settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns 
default value of False. "
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 4caa9901bfb..e611be90d0b 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -654,7 +654,8 @@ class AutocommitEngineForMySQL:
     """
 
     def __init__(self):
-        self.is_mysql = settings.SQL_ALCHEMY_CONN and 
settings.SQL_ALCHEMY_CONN.lower().startswith("mysql")
+        conn_str = settings.get_sql_alchemy_conn()
+        self.is_mysql = conn_str and conn_str.lower().startswith("mysql")
         self.original_prepare_engine_args = None
 
     def __enter__(self):
@@ -879,7 +880,7 @@ def _get_alembic_config():
     else:
         config = Config(os.path.join(package_dir, alembic_file))
     config.set_main_option("script_location", directory.replace("%", "%%"))
-    config.set_main_option("sqlalchemy.url", 
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
+    config.set_main_option("sqlalchemy.url", 
settings.get_sql_alchemy_conn().replace("%", "%%"))
     return config
 
 
@@ -1244,9 +1245,6 @@ def upgradedb(
     if from_revision and not show_sql_only:
         raise AirflowException("`from_revision` only supported with 
`sql_only=True`.")
 
-    if not settings.SQL_ALCHEMY_CONN:
-        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a 
critical assertion.")
-
     from alembic import command
 
     import_all_models()
@@ -1383,9 +1381,6 @@ def downgrade(*, to_revision, from_revision=None, 
show_sql_only=False, session:
             "downgrade from current revision."
         )
 
-    if not settings.SQL_ALCHEMY_CONN:
-        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
-
     # alembic adds significant import time, so we import it lazily
     from alembic import command
 
diff --git a/airflow-core/src/airflow/utils/db_manager.py 
b/airflow-core/src/airflow/utils/db_manager.py
index 57173341e88..8aa76bd131b 100644
--- a/airflow-core/src/airflow/utils/db_manager.py
+++ b/airflow-core/src/airflow/utils/db_manager.py
@@ -100,7 +100,7 @@ class BaseDBManager(LoggingMixin):
 
         config = Config(self.alembic_file)
         config.set_main_option("script_location", 
self.migration_dir.replace("%", "%%"))
-        config.set_main_option("sqlalchemy.url", 
settings.SQL_ALCHEMY_CONN.replace("%", "%%"))
+        config.set_main_option("sqlalchemy.url", 
settings.get_sql_alchemy_conn().replace("%", "%%"))
         return config
 
     def get_script_object(self, config=None) -> ScriptDirectory:
diff --git a/airflow-core/tests/unit/core/test_settings.py 
b/airflow-core/tests/unit/core/test_settings.py
index 588567181c5..0126e9a4f40 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -243,7 +243,7 @@ class TestMetadataEngineHooks:
 
         with (
             patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "false"}),
-            patch("airflow.settings.SQL_ALCHEMY_CONN", "sqlite://"),
+            patch("airflow.settings._AirflowSettings.sql_alchemy_conn", 
"sqlite://"),
             patch("airflow.settings.Session"),
             patch("airflow.settings.engine"),
             patch("airflow.settings.setup_event_handlers"),
@@ -269,7 +269,7 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"sqlite+aiosqlite://"),
+            patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async", 
"sqlite+aiosqlite://"),
             patch("airflow.settings.conf") as mock_conf,
         ):
             # Pool enabled but sqlite -- pool args should be skipped
@@ -291,7 +291,10 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
+            patch(
+                "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
+                "postgresql+asyncpg://localhost/airflow",
+            ),
             patch("airflow.settings.conf") as mock_conf,
         ):
             mock_conf.getint.side_effect = lambda section, key, fallback=None: 
{
@@ -317,7 +320,10 @@ class TestMetadataEngineHooks:
         mock_create_async_engine.return_value = MagicMock()
 
         with (
-            patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", 
"postgresql+asyncpg://localhost/airflow"),
+            patch(
+                "airflow.settings._AirflowSettings.sql_alchemy_conn_async",
+                "postgresql+asyncpg://localhost/airflow",
+            ),
             patch("airflow.settings.conf") as mock_conf,
         ):
             mock_conf.getboolean.return_value = False
@@ -330,10 +336,10 @@ class TestMetadataEngineHooks:
 
     @patch("airflow.settings.create_async_metadata_engine")
     def test_configure_async_session_skips_when_no_async_conn(self, 
mock_create_async_engine):
-        """_configure_async_session() must not call the hook when 
SQL_ALCHEMY_CONN_ASYNC is empty."""
+        """_configure_async_session() must not call the hook when 
``sql_alchemy_conn_async`` is empty."""
         from airflow import settings
 
-        with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC", ""):
+        with patch("airflow.settings._AirflowSettings.sql_alchemy_conn_async", 
""):
             settings._configure_async_session()
 
         assert mock_create_async_engine.mock_calls == []
@@ -426,7 +432,7 @@ def test_sqlite_relative_path(value, expectation):
 
     with (
         patch("os.environ", {"_AIRFLOW_SKIP_DB_TESTS": "true"}),
-        patch("airflow.settings.SQL_ALCHEMY_CONN", value),
+        patch("airflow.settings._AirflowSettings.sql_alchemy_conn", value),
         patch("airflow.settings.Session"),
         patch("airflow.settings.engine"),
     ):
diff --git a/airflow-core/tests/unit/core/test_sqlalchemy_config.py 
b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
index db211bab2c5..15dbb5a1148 100644
--- a/airflow-core/tests/unit/core/test_sqlalchemy_config.py
+++ b/airflow-core/tests/unit/core/test_sqlalchemy_config.py
@@ -38,8 +38,8 @@ class TestSqlAlchemySettings:
         try:
             with pytest.MonkeyPatch.context() as mp:
                 mp.setattr(
-                    settings,
-                    "SQL_ALCHEMY_CONN",
+                    settings._AirflowSettings,
+                    "sql_alchemy_conn",
                     
"mysql+foobar://user:pass@host/dbname?inline=param&another=param",
                 )
                 yield
@@ -56,7 +56,7 @@ class TestSqlAlchemySettings:
         settings.configure_orm()
         expected_kwargs = dict(
             connect_args={}
-            if not settings.SQL_ALCHEMY_CONN.startswith("sqlite")
+            if not settings.get_sql_alchemy_conn().startswith("sqlite")
             else {"check_same_thread": False},
             max_overflow=10,
             pool_pre_ping=True,
@@ -66,7 +66,7 @@ class TestSqlAlchemySettings:
             future=True,
         )
         mock_create_engine.assert_called_once_with(
-            settings.SQL_ALCHEMY_CONN,
+            settings.get_sql_alchemy_conn(),
             **expected_kwargs,
         )
 
@@ -83,7 +83,7 @@ class TestSqlAlchemySettings:
         monkeypatch,
     ):
         """SQLAlchemy 2.0+ uses QueuePool for file-based SQLite, so pool 
settings should be applied."""
-        monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", 
"sqlite:////tmp/airflow.db")
+        monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn", 
"sqlite:////tmp/airflow.db")
         settings.configure_orm()
         expected_kwargs = dict(
             connect_args={"check_same_thread": False},
@@ -114,7 +114,7 @@ class TestSqlAlchemySettings:
     )
     def test_prepare_engine_args_sqlite_in_memory_skips_pool_settings(self, 
conn_str, monkeypatch):
         """In-memory SQLite uses SingletonThreadPool which doesn't support 
pool_size/max_overflow."""
-        monkeypatch.setattr(settings, "SQL_ALCHEMY_CONN", conn_str)
+        monkeypatch.setattr(settings._AirflowSettings, "sql_alchemy_conn", 
conn_str)
         engine_args = settings.prepare_engine_args()
         assert "pool_size" not in engine_args
         assert "max_overflow" not in engine_args
@@ -139,7 +139,7 @@ class TestSqlAlchemySettings:
         with conf_vars(config):
             settings.configure_orm()
             engine_args = {"arg": 1}
-            if settings.SQL_ALCHEMY_CONN.startswith("mysql"):
+            if settings.get_sql_alchemy_conn().startswith("mysql"):
                 engine_args["isolation_level"] = "READ COMMITTED"
             expected_kwargs = dict(
                 connect_args=SQL_ALCHEMY_CONNECT_ARGS,
@@ -148,7 +148,7 @@ class TestSqlAlchemySettings:
                 **engine_args,
             )
             mock_create_engine.assert_called_once_with(
-                settings.SQL_ALCHEMY_CONN,
+                settings.get_sql_alchemy_conn(),
                 **expected_kwargs,
             )
 
diff --git a/airflow-core/tests/unit/utils/test_db.py 
b/airflow-core/tests/unit/utils/test_db.py
index d2f13e27e9e..600f08a548e 100644
--- a/airflow-core/tests/unit/utils/test_db.py
+++ b/airflow-core/tests/unit/utils/test_db.py
@@ -406,7 +406,9 @@ class TestAutocommitEngineForMySQL:
     def test_non_mysql_database_is_noop(self, mocker):
         """Test that non-MySQL databases don't trigger any changes."""
         # Mock settings to use PostgreSQL
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"postgresql://user:pass@localhost/db")
+        mocker.patch.object(
+            settings._AirflowSettings, "sql_alchemy_conn", 
"postgresql://user:pass@localhost/db"
+        )
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
@@ -428,7 +430,9 @@ class TestAutocommitEngineForMySQL:
     def test_mysql_database_modifies_engine(self, mocker):
         """Test that MySQL databases trigger AUTOCOMMIT mode."""
         # Mock settings to use MySQL
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql+mysqlconnector://user:pass@localhost/db")
+        mocker.patch.object(
+            settings._AirflowSettings, "sql_alchemy_conn", 
"mysql+mysqlconnector://user:pass@localhost/db"
+        )
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
@@ -470,7 +474,7 @@ class TestAutocommitEngineForMySQL:
         ]
 
         for conn_string in mysql_variants:
-            mocker.patch.object(settings, "SQL_ALCHEMY_CONN", conn_string)
+            mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
conn_string)
             mock_dispose.reset_mock()
             mock_configure.reset_mock()
 
@@ -485,18 +489,21 @@ class TestAutocommitEngineForMySQL:
 
     def test_none_sql_alchemy_conn(self, mocker):
         """Test behavior when SQL_ALCHEMY_CONN is None."""
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", None)
+        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
None)
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
 
-        with AutocommitEngineForMySQL():
-            # Should be a no-op when connection string is None
-            mock_dispose.assert_not_called()
-            mock_configure.assert_not_called()
+        with pytest.raises(RuntimeError, match="connection string not 
configured"):
+            with AutocommitEngineForMySQL():
+                pass
+
+        # Failure should not trigger any ORM disposal or reconfiguration side 
effects.
+        mock_dispose.assert_not_called()
+        mock_configure.assert_not_called()
 
     def test_prepare_engine_args_with_parameters(self, mocker):
         """Test that prepare_engine_args parameters are properly forwarded."""
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
         mocker.patch.object(settings, "dispose_orm")
         mocker.patch.object(settings, "configure_orm")
 
@@ -527,7 +534,7 @@ class TestAutocommitEngineForMySQL:
 
     def test_exception_during_context(self, mocker):
         """Test that cleanup happens even if an exception occurs."""
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
         mock_dispose = mocker.patch.object(settings, "dispose_orm")
         mock_configure = mocker.patch.object(settings, "configure_orm")
         original_prepare = mocker.Mock()
@@ -550,7 +557,7 @@ class TestAutocommitEngineForMySQL:
 
     def test_logging_messages(self, mocker):
         """Test that appropriate log messages are generated."""
-        mocker.patch.object(settings, "SQL_ALCHEMY_CONN", 
"mysql://user:pass@localhost/db")
+        mocker.patch.object(settings._AirflowSettings, "sql_alchemy_conn", 
"mysql://user:pass@localhost/db")
         mocker.patch.object(settings, "dispose_orm")
         mocker.patch.object(settings, "configure_orm")
 
@@ -571,7 +578,8 @@ class TestAutocommitEngineForMySQL:
         """Test integration using a fully mocked settings module."""
         # Setup mock settings module
         mock_settings = mocker.Mock()
-        mock_settings.SQL_ALCHEMY_CONN = "mysql://user:pass@localhost/db"
+        mock_settings._AirflowSettings = mocker.Mock()
+        mock_settings._AirflowSettings.sql_alchemy_conn = 
"mysql://user:pass@localhost/db"
         mock_settings.dispose_orm = mocker.Mock()
         mock_settings.configure_orm = mocker.Mock()
         original_prepare = mocker.Mock(return_value={"test": "value"})
diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
index 10b83612339..a62cbcc409a 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/db.py
@@ -24,6 +24,7 @@ from sqlalchemy.engine.url import make_url
 from airflow import settings
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.fab.auth_manager.models import model_metadata
+from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
 from airflow.utils.db import _offline_migration, print_happy_cat
 from airflow.utils.db_manager import BaseDBManager
 
@@ -67,7 +68,8 @@ class FABDBManager(BaseDBManager):
 
     def create_db_from_orm(self):
         super().create_db_from_orm()
-        db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
+        sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
settings.SQL_ALCHEMY_CONN
+        db, flask_app = _get_flask_db(sql_conn)
         with flask_app.app_context():
             db.create_all()
 
@@ -127,8 +129,6 @@ class FABDBManager(BaseDBManager):
         _release_metadata_locks_if_supported(self)
 
         # alembic adds significant import time, so we import it lazily
-        if not settings.SQL_ALCHEMY_CONN:
-            raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is 
a critical assertion.")
         from alembic import command
 
         current_revision = self.get_current_revision()
@@ -147,7 +147,10 @@ class FABDBManager(BaseDBManager):
             config = self.get_alembic_config()
 
         if show_sql_only:
-            if make_url(settings.SQL_ALCHEMY_CONN).get_backend_name() == 
"sqlite":
+            sql_conn: str = (
+                settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
str(settings.SQL_ALCHEMY_CONN)
+            )
+            if make_url(sql_conn).get_backend_name() == "sqlite":
                 raise SystemExit("Offline migration not supported for SQLite.")
             if not from_revision:
                 from_revision = current_revision
@@ -172,9 +175,6 @@ class FABDBManager(BaseDBManager):
                 "downgrade from current revision."
             )
 
-        if not settings.SQL_ALCHEMY_CONN:
-            raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
-
         # alembic adds significant import time, so we import it lazily
         from alembic import command
 
@@ -200,6 +200,7 @@ class FABDBManager(BaseDBManager):
 
     def drop_tables(self, connection):
         super().drop_tables(connection)
-        db, flask_app = _get_flask_db(settings.SQL_ALCHEMY_CONN)
+        sql_conn = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
settings.SQL_ALCHEMY_CONN
+        db, flask_app = _get_flask_db(sql_conn)
         with flask_app.app_context():
             db.drop_all()
diff --git a/providers/fab/src/airflow/providers/fab/migrations/env.py 
b/providers/fab/src/airflow/providers/fab/migrations/env.py
index 903057ba602..7a812c56e7a 100644
--- a/providers/fab/src/airflow/providers/fab/migrations/env.py
+++ b/providers/fab/src/airflow/providers/fab/migrations/env.py
@@ -24,6 +24,7 @@ from alembic import context
 
 from airflow import settings
 from airflow.providers.fab.auth_manager.models.db import FABDBManager
+from airflow.providers.fab.version_compat import AIRFLOW_V_3_3_PLUS
 
 # this is the Alembic Config object, which provides
 # access to the values within the .ini file in use.
@@ -67,8 +68,9 @@ def run_migrations_offline():
     script output.
 
     """
+    url: str = settings.get_sql_alchemy_conn() if AIRFLOW_V_3_3_PLUS else 
str(settings.SQL_ALCHEMY_CONN)
     context.configure(
-        url=settings.SQL_ALCHEMY_CONN,
+        url=url,
         target_metadata=target_metadata,
         literal_binds=True,
         compare_type=True,
diff --git a/providers/fab/src/airflow/providers/fab/version_compat.py 
b/providers/fab/src/airflow/providers/fab/version_compat.py
index 92feca19762..419ebf48568 100644
--- a/providers/fab/src/airflow/providers/fab/version_compat.py
+++ b/providers/fab/src/airflow/providers/fab/version_compat.py
@@ -36,3 +36,4 @@ AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 
1, 0)
 AIRFLOW_V_3_1_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 1)
 AIRFLOW_V_3_1_8_PLUS = get_base_airflow_version_tuple() >= (3, 1, 8)
 AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS = get_base_airflow_version_tuple() >= (3, 3, 0)
diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py 
b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
index 452e5522fb7..69154acd5b4 100644
--- a/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
+++ b/providers/fab/tests/unit/fab/auth_manager/models/test_db.py
@@ -32,6 +32,8 @@ from airflow.utils.db import (
     compare_type,
 )
 
+from tests_common.test_utils.config import conf_vars
+
 pytestmark = [pytest.mark.db_test]
 try:
     from airflow.providers.fab.auth_manager.models.db import FABDBManager
@@ -109,14 +111,11 @@ try:
             actual = mock_om.call_args.kwargs["revision"]
             assert actual == "abc"
 
+        @conf_vars({("database", "sql_alchemy_conn"): 
"sqlite:////tmp/fab-test.db"})
         @mock.patch.object(FABDBManager, "get_current_revision")
         def test_sqlite_offline_upgrade_raises_with_revision(self, mock_gcr, 
session):
-            with mock.patch(
-                
"airflow.providers.fab.auth_manager.models.db.settings.SQL_ALCHEMY_CONN",
-                "sqlite:////tmp/fab-test.db",
-            ):
-                with pytest.raises(SystemExit, match="Offline migration not 
supported for SQLite"):
-                    FABDBManager(session).upgradedb(from_revision=None, 
to_revision=None, show_sql_only=True)
+            with pytest.raises(SystemExit, match="Offline migration not 
supported for SQLite"):
+                FABDBManager(session).upgradedb(from_revision=None, 
to_revision=None, show_sql_only=True)
 
         @mock.patch("alembic.command.upgrade")
         @mock.patch.object(FABDBManager, "create_db_from_orm")
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 3e6236c5786..2269f22420f 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -344,6 +344,7 @@ def block_orm_access():
             conf.set("database", "sql_alchemy_conn", conn)
             conf.set("database", "sql_alchemy_conn_cmd", "/bin/false")
             conf.set("database", "sql_alchemy_conn_secret", 
"db-access-blocked")
+        settings.block_orm_access()
 
         # This only gets called when the module does not already have an 
attribute, and for these values
         # lets give a custom error message
@@ -354,9 +355,6 @@ def block_orm_access():
 
         settings.__getattr__ = __getattr__
 
-        settings.SQL_ALCHEMY_CONN = conn
-        settings.SQL_ALCHEMY_CONN_ASYNC = conn
-
     os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = conn
     os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = conn
 

Reply via email to