This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new c1b6e5729fc Fix async engine missing pool_recycle and pool_pre_ping
configuration (#65276) (#66866)
c1b6e5729fc is described below
commit c1b6e5729fca70c788beec8320687611b80ddc40
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:48:45 2026 +0530
Fix async engine missing pool_recycle and pool_pre_ping configuration
(#65276) (#66866)
The async SQLAlchemy engine was created without any pool health
settings while the sync engine got pool_size, pool_recycle,
pool_pre_ping, and max_overflow from [database] config. This meant
dead connections from PostgreSQL idle timeouts or pgbouncer disconnects
were never detected by the async pool.
Read the same [database] config values for the async engine. Also
respect SQL_ALCHEMY_POOL_ENABLED=False by using NullPool, matching
the sync engine behavior.
(cherry picked from commit 981b0e218c5cf18ffc74cff21a991e20e34611a8)
Co-authored-by: Kaxil Naik <[email protected]>
---
airflow-core/src/airflow/settings.py | 18 ++++++++
airflow-core/tests/unit/core/test_settings.py | 65 +++++++++++++++++++++++++--
2 files changed, 80 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/settings.py
b/airflow-core/src/airflow/settings.py
index e42aa3ceff6..739e2437f6a 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -374,16 +374,20 @@ def create_async_metadata_engine(
sql_alchemy_conn_async: str,
*,
connect_args: dict[str, Any],
+ engine_args: dict[str, Any] | None = None,
) -> AsyncEngine:
"""
Create the async SQLAlchemy Engine for the Airflow metadata database.
Override in ``airflow_local_settings.py`` to customize async engine
creation.
For ``do_connect`` handlers, register on ``engine.sync_engine``.
+
+ :param engine_args: Pool and engine configuration (pool_size,
pool_recycle, etc.).
"""
return create_async_engine(
sql_alchemy_conn_async,
connect_args=connect_args,
+ **(engine_args or {}),
future=True,
)
@@ -403,9 +407,23 @@ def _configure_async_session() -> None:
AsyncSession = None
return
+ # Apply the same pool health settings used by the sync engine.
+ # Without these, the async pool uses SQLAlchemy defaults (pool_recycle=-1,
+ # pool_pre_ping=False) which means dead connections from PostgreSQL idle
+ # timeouts or pgbouncer disconnects are never detected.
+ engine_args: dict[str, Any] = {}
+ if not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
+ engine_args["poolclass"] = NullPool
+ elif not SQL_ALCHEMY_CONN_ASYNC.startswith("sqlite"):
+ engine_args["pool_size"] = conf.getint("database",
"SQL_ALCHEMY_POOL_SIZE", fallback=5)
+ engine_args["pool_recycle"] = conf.getint("database",
"SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
+ engine_args["pool_pre_ping"] = conf.getboolean("database",
"SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
+ engine_args["max_overflow"] = conf.getint("database",
"SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
+
async_engine = create_async_metadata_engine(
SQL_ALCHEMY_CONN_ASYNC,
connect_args=_get_connect_args("async"),
+ engine_args=engine_args,
)
AsyncSession = async_sessionmaker(
bind=async_engine,
diff --git a/airflow-core/tests/unit/core/test_settings.py
b/airflow-core/tests/unit/core/test_settings.py
index 22cb00b36bb..588567181c5 100644
--- a/airflow-core/tests/unit/core/test_settings.py
+++ b/airflow-core/tests/unit/core/test_settings.py
@@ -27,6 +27,7 @@ from unittest.mock import MagicMock, call, patch
import pytest
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncEngine
+from sqlalchemy.pool import NullPool
from airflow import settings
from airflow.exceptions import AirflowClusterPolicyViolation,
AirflowConfigException
@@ -262,18 +263,70 @@ class TestMetadataEngineHooks:
def test_configure_async_session_delegates_to_create_async_metadata_engine(
self, mock_create_async_engine
):
- """_configure_async_session() must call
create_async_metadata_engine."""
+ """_configure_async_session() must call create_async_metadata_engine
with no pool args for sqlite."""
from airflow import settings
mock_create_async_engine.return_value = MagicMock()
- with patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"sqlite+aiosqlite://"):
+ with (
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"sqlite+aiosqlite://"),
+ patch("airflow.settings.conf") as mock_conf,
+ ):
+ # Pool enabled but sqlite -- pool args should be skipped
+ mock_conf.getboolean.return_value = True
settings._configure_async_session()
mock_create_async_engine.assert_called_once()
call_kwargs = mock_create_async_engine.call_args
assert call_kwargs[0][0] == "sqlite+aiosqlite://"
assert "connect_args" in call_kwargs[1]
+ # sqlite doesn't support pool size args
+ assert call_kwargs[1]["engine_args"] == {}
+
+ @patch("airflow.settings.create_async_metadata_engine")
+ def test_configure_async_session_passes_pool_args_for_non_sqlite(self,
mock_create_async_engine):
+ """_configure_async_session() must pass pool configuration for
non-sqlite backends."""
+ from airflow import settings
+
+ mock_create_async_engine.return_value = MagicMock()
+
+ with (
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
+ patch("airflow.settings.conf") as mock_conf,
+ ):
+ mock_conf.getint.side_effect = lambda section, key, fallback=None:
{
+ "SQL_ALCHEMY_POOL_SIZE": 10,
+ "SQL_ALCHEMY_POOL_RECYCLE": 900,
+ "SQL_ALCHEMY_MAX_OVERFLOW": 5,
+ }.get(key, fallback)
+ mock_conf.getboolean.return_value = True
+
+ settings._configure_async_session()
+
+ engine_args = mock_create_async_engine.call_args[1]["engine_args"]
+ assert engine_args["pool_size"] == 10
+ assert engine_args["pool_recycle"] == 900
+ assert engine_args["pool_pre_ping"] is True
+ assert engine_args["max_overflow"] == 5
+
+ @patch("airflow.settings.create_async_metadata_engine")
+ def test_configure_async_session_uses_nullpool_when_pool_disabled(self,
mock_create_async_engine):
+ """_configure_async_session() must use NullPool when
SQL_ALCHEMY_POOL_ENABLED is False."""
+ from airflow import settings
+
+ mock_create_async_engine.return_value = MagicMock()
+
+ with (
+ patch("airflow.settings.SQL_ALCHEMY_CONN_ASYNC",
"postgresql+asyncpg://localhost/airflow"),
+ patch("airflow.settings.conf") as mock_conf,
+ ):
+ mock_conf.getboolean.return_value = False
+
+ settings._configure_async_session()
+
+ engine_args = mock_create_async_engine.call_args[1]["engine_args"]
+ assert engine_args["poolclass"] is NullPool
+ assert "pool_size" not in engine_args
@patch("airflow.settings.create_async_metadata_engine")
def test_configure_async_session_skips_when_no_async_conn(self,
mock_create_async_engine):
@@ -313,12 +366,18 @@ class TestMetadataEngineHooks:
mock_sa_create_async.return_value = MagicMock()
connect_args = {"timeout": 30}
+ engine_args = {"pool_size": 5, "pool_recycle": 1800, "pool_pre_ping":
True}
- settings.create_async_metadata_engine("sqlite+aiosqlite://",
connect_args=connect_args)
+ settings.create_async_metadata_engine(
+ "sqlite+aiosqlite://", connect_args=connect_args,
engine_args=engine_args
+ )
mock_sa_create_async.assert_called_once_with(
"sqlite+aiosqlite://",
connect_args={"timeout": 30},
+ pool_size=5,
+ pool_recycle=1800,
+ pool_pre_ping=True,
future=True,
)