This is an automated email from the ASF dual-hosted git repository.

msumit pushed a commit to branch sumit/ro_session
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6b3d36ced7a6f267d506eae6d50749177b7b030a
Author: Sumit Maheshwari <[email protected]>
AuthorDate: Wed Aug 20 15:29:21 2025 +0530

    [AIP-94] Add support for read-only sql_alchemy sessions
---
 .../src/airflow/config_templates/config.yml        | 21 +++++
 airflow-core/src/airflow/models/deadline.py        |  4 +-
 airflow-core/src/airflow/models/serialized_dag.py  | 10 +--
 airflow-core/src/airflow/settings.py               | 95 +++++++++++++++++++++-
 airflow-core/src/airflow/utils/session.py          | 66 +++++++++++++++
 5 files changed, 188 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 3a824fbcda6..22487e6d63b 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -544,6 +544,27 @@ database:
       sensitive: true
       example: "postgresql+asyncpg://postgres:airflow@postgres/airflow"
       default: ~
+    sql_alchemy_conn_readonly:
+      description: |
+        The SQLAlchemy connection string to a read-only replica of the 
metadata database.
+        This is used for read-heavy operations to offload traffic from the 
master database.
+        If not set, read operations will use the same connection as 
``sql_alchemy_conn``.
+        The read-only database should be a replica or read-only instance of 
the master database.
+      version_added: 3.1.0
+      type: string
+      sensitive: true
+      example: "postgresql://postgres:airflow@postgres-readonly/airflow"
+      default: ~
+    sql_alchemy_conn_readonly_async:
+      description: |
+        The SQLAlchemy connection string to a read-only replica of the 
metadata database for async connections.
+        If not set, Airflow will automatically derive this from 
``sql_alchemy_conn_readonly`` if available,
+        or fall back to ``sql_alchemy_conn_async``.
+      version_added: 3.1.0
+      type: string
+      sensitive: true
+      example: 
"postgresql+asyncpg://postgres:airflow@postgres-readonly/airflow"
+      default: ~
     sql_alchemy_engine_args:
       description: |
         Extra engine specific keyword args passed to SQLAlchemy's 
create_engine, as a JSON-encoded value
diff --git a/airflow-core/src/airflow/models/deadline.py 
b/airflow-core/src/airflow/models/deadline.py
index 8dc48acf314..556a3e6449a 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -38,7 +38,7 @@ from airflow.serialization.serde import deserialize, serialize
 from airflow.settings import json
 from airflow.triggers.deadline import PAYLOAD_STATUS_KEY, 
DeadlineCallbackTrigger
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import provide_session, provide_readonly_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
 if TYPE_CHECKING:
@@ -356,7 +356,7 @@ class ReferenceModels:
 DeadlineReferenceType = ReferenceModels.BaseDeadlineReference
 
 
-@provide_session
+@provide_readonly_session
 def _fetch_from_db(model_reference: Column, session=None, **conditions) -> 
datetime:
     """
     Fetch a datetime value from the database using the provided model 
reference and filtering conditions.
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 6444d5720de..fe3fe05c821 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -48,7 +48,7 @@ from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.settings import COMPRESS_SERIALIZED_DAGS, json
 from airflow.utils.hashlib_wrapper import md5
-from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.session import NEW_SESSION, NEW_READONLY_SESSION, 
provide_session, provide_readonly_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
 if TYPE_CHECKING:
@@ -472,9 +472,9 @@ class SerializedDagModel(Base):
         return select(cls).where(cls.dag_id == 
dag_id).order_by(cls.created_at.desc()).limit(1)
 
     @classmethod
-    @provide_session
+    @provide_readonly_session
     def get_latest_serialized_dags(
-        cls, *, dag_ids: list[str], session: Session = NEW_SESSION
+        cls, *, dag_ids: list[str], session: Session = NEW_READONLY_SESSION
     ) -> list[SerializedDagModel]:
         """
         Get the latest serialized dags of given DAGs.
@@ -501,8 +501,8 @@ class SerializedDagModel(Base):
         return latest_serdags or []
 
     @classmethod
-    @provide_session
-    def read_all_dags(cls, session: Session = NEW_SESSION) -> dict[str, 
SerializedDAG]:
+    @provide_readonly_session
+    def read_all_dags(cls, session: Session = NEW_READONLY_SESSION) -> 
dict[str, SerializedDAG]:
         """
         Read all DAGs in serialized_dag table.
 
diff --git a/airflow-core/src/airflow/settings.py 
b/airflow-core/src/airflow/settings.py
index db5ce959b1f..7f0cf1b0dd4 100644
--- a/airflow-core/src/airflow/settings.py
+++ b/airflow-core/src/airflow/settings.py
@@ -89,6 +89,8 @@ SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
 
 SQL_ALCHEMY_CONN: str | None = None
 SQL_ALCHEMY_CONN_ASYNC: str | None = None
+SQL_ALCHEMY_CONN_READONLY: str | None = None
+SQL_ALCHEMY_CONN_READONLY_ASYNC: str | None = None
 PLUGINS_FOLDER: str | None = None
 LOGGING_CLASS_PATH: str | None = None
 DONOT_MODIFY_HANDLERS: bool | None = None
@@ -111,6 +113,13 @@ NonScopedSession: sessionmaker
 async_engine: AsyncEngine
 AsyncSession: Callable[..., SAAsyncSession]
 
+# Read-only database connection components
+readonly_engine: Engine
+ReadOnlySession: scoped_session
+NonScopedReadOnlySession: sessionmaker
+readonly_async_engine: AsyncEngine
+ReadOnlyAsyncSession: Callable[..., SAAsyncSession]
+
 # The JSON library to use for DAG Serialization and De-Serialization
 json = json
 
@@ -222,6 +231,8 @@ def configure_vars():
     """Configure Global Variables from airflow.cfg."""
     global SQL_ALCHEMY_CONN
     global SQL_ALCHEMY_CONN_ASYNC
+    global SQL_ALCHEMY_CONN_READONLY
+    global SQL_ALCHEMY_CONN_READONLY_ASYNC
     global DAGS_FOLDER
     global PLUGINS_FOLDER
     global DONOT_MODIFY_HANDLERS
@@ -232,6 +243,19 @@ def configure_vars():
     else:
         SQL_ALCHEMY_CONN_ASYNC = 
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
 
+    # Configure read-only database connections
+    if conf.has_option("database", "sql_alchemy_conn_readonly"):
+        SQL_ALCHEMY_CONN_READONLY = conf.get("database", 
"sql_alchemy_conn_readonly")
+    else:
+        SQL_ALCHEMY_CONN_READONLY = SQL_ALCHEMY_CONN
+
+    if conf.has_option("database", "sql_alchemy_conn_readonly_async"):
+        SQL_ALCHEMY_CONN_READONLY_ASYNC = conf.get("database", 
"sql_alchemy_conn_readonly_async")
+    elif SQL_ALCHEMY_CONN_READONLY:
+        SQL_ALCHEMY_CONN_READONLY_ASYNC = 
_get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN_READONLY)
+    else:
+        SQL_ALCHEMY_CONN_READONLY_ASYNC = SQL_ALCHEMY_CONN_ASYNC
+
     DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
 
     PLUGINS_FOLDER = conf.get("core", "plugins_folder", 
fallback=os.path.join(AIRFLOW_HOME, "plugins"))
@@ -360,6 +384,31 @@ def _configure_async_session() -> None:
     )
 
 
+def _configure_readonly_async_session() -> None:
+    """
+    Configure read-only async SQLAlchemy session.
+
+    This exists so tests can reconfigure the session. How SQLAlchemy configures
+    this does not work well with Pytest and you can end up with issues when the
+    session and runs in a different event loop from the test itself.
+    """
+    global ReadOnlyAsyncSession
+    global readonly_async_engine
+
+    readonly_async_engine = create_async_engine(
+        SQL_ALCHEMY_CONN_READONLY_ASYNC,
+        connect_args=_get_connect_args("async"),
+        future=True,
+    )
+    ReadOnlyAsyncSession = sessionmaker(
+        bind=readonly_async_engine,
+        autocommit=False,
+        autoflush=False,
+        class_=SAAsyncSession,
+        expire_on_commit=False,
+    )
+
+
 def configure_orm(disable_connection_pool=False, pool_class=None):
     """Configure ORM using SQLAlchemy."""
     from airflow.sdk.execution_time.secrets_masker import mask_secret
@@ -375,11 +424,16 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     global NonScopedSession
     global Session
     global engine
+    global NonScopedReadOnlySession
+    global ReadOnlySession
+    global readonly_engine
 
     if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
         # Skip DB initialization in unit tests, if DB tests are skipped
         Session = SkipDBTestsSession
+        ReadOnlySession = SkipDBTestsSession
         engine = None
+        readonly_engine = None
         return
     log.debug("Setting up DB connection pool (PID %s)", os.getpid())
     engine_args = prepare_engine_args(disable_connection_pool, pool_class)
@@ -401,6 +455,22 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
     mask_secret(engine.url.password)
     setup_event_handlers(engine)
 
+    # Configure read-only database connections
+    readonly_connect_args = _get_connect_args("sync")
+    if SQL_ALCHEMY_CONN_READONLY.startswith("sqlite"):
+        readonly_connect_args["check_same_thread"] = False
+
+    readonly_engine_args = prepare_engine_args(disable_connection_pool, 
pool_class)
+    readonly_engine = create_engine(
+        SQL_ALCHEMY_CONN_READONLY,
+        connect_args=readonly_connect_args,
+        **readonly_engine_args,
+        future=True,
+    )
+    _configure_readonly_async_session()
+    mask_secret(readonly_engine.url.password)
+    setup_event_handlers(readonly_engine)
+
     if conf.has_option("database", "sql_alchemy_session_maker"):
         _session_maker = conf.getimport("database", 
"sql_alchemy_session_maker")
     else:
@@ -412,6 +482,10 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
         )
     NonScopedSession = _session_maker(engine)
     Session = scoped_session(NonScopedSession)
+    
+    # Configure read-only sessions
+    NonScopedReadOnlySession = _session_maker(readonly_engine)
+    ReadOnlySession = scoped_session(NonScopedReadOnlySession)
 
     if register_at_fork := getattr(os, "register_at_fork", None):
         # 
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
@@ -421,6 +495,10 @@ def configure_orm(disable_connection_pool=False, 
pool_class=None):
                 engine.dispose(close=False)
             if async_engine := _globals.get("async_engine"):
                 async_engine.sync_engine.dispose(close=False)
+            if readonly_engine := _globals.get("readonly_engine"):
+                readonly_engine.dispose(close=False)
+            if readonly_async_engine := _globals.get("readonly_async_engine"):
+                readonly_async_engine.sync_engine.dispose(close=False)
 
         # Won't work on Windows
         register_at_fork(after_in_child=clean_in_fork)
@@ -515,9 +593,15 @@ def prepare_engine_args(disable_connection_pool=False, 
pool_class=None):
 def dispose_orm(do_log: bool = True):
     """Properly close pooled database connections."""
     global Session, engine, NonScopedSession
+    global ReadOnlySession, readonly_engine, NonScopedReadOnlySession
 
     _globals = globals()
-    if _globals.get("engine") is None and _globals.get("Session") is None:
+    if (
+        _globals.get("engine") is None 
+        and _globals.get("Session") is None
+        and _globals.get("readonly_engine") is None
+        and _globals.get("ReadOnlySession") is None
+    ):
         return
 
     if do_log:
@@ -531,10 +615,19 @@ def dispose_orm(do_log: bool = True):
         NonScopedSession = None
         close_all_sessions()
 
+    if "ReadOnlySession" in _globals and ReadOnlySession is not None:
+        ReadOnlySession.remove()
+        ReadOnlySession = None
+        NonScopedReadOnlySession = None
+
     if "engine" in _globals and engine is not None:
         engine.dispose()
         engine = None
 
+    if "readonly_engine" in _globals and readonly_engine is not None:
+        readonly_engine.dispose()
+        readonly_engine = None
+
 
 def reconfigure_orm(disable_connection_pool=False, pool_class=None):
     """Properly close database connections and re-configure ORM."""
diff --git a/airflow-core/src/airflow/utils/session.py 
b/airflow-core/src/airflow/utils/session.py
index 211c1645320..321387af68d 100644
--- a/airflow-core/src/airflow/utils/session.py
+++ b/airflow-core/src/airflow/utils/session.py
@@ -48,6 +48,26 @@ def create_session(scoped: bool = True) -> 
Generator[SASession, None, None]:
         session.close()
 
 
[email protected]
+def create_readonly_session(scoped: bool = True) -> Generator[SASession, None, 
None]:
+    """Contextmanager that will create and teardown a read-only session."""
+    if scoped:
+        ReadOnlySession = getattr(settings, "ReadOnlySession", None)
+    else:
+        ReadOnlySession = getattr(settings, "NonScopedReadOnlySession", None)
+    if ReadOnlySession is None:
+        raise RuntimeError("ReadOnlySession must be set before!")
+    session = ReadOnlySession()
+    try:
+        yield session
+        # Note: No commit() for read-only sessions
+    except Exception:
+        session.rollback()
+        raise
+    finally:
+        session.close()
+
+
 @contextlib.asynccontextmanager
 async def create_session_async():
     """
@@ -66,6 +86,24 @@ async def create_session_async():
             raise
 
 
[email protected]
+async def create_readonly_session_async():
+    """
+    Context manager to create async read-only session.
+
+    :meta private:
+    """
+    from airflow.settings import ReadOnlyAsyncSession
+
+    async with ReadOnlyAsyncSession() as session:
+        try:
+            yield session
+            # Note: No commit() for read-only sessions
+        except Exception:
+            await session.rollback()
+            raise
+
+
 PS = ParamSpec("PS")
 RT = TypeVar("RT")
 
@@ -102,8 +140,36 @@ def provide_session(func: Callable[PS, RT]) -> 
Callable[PS, RT]:
     return wrapper
 
 
+def provide_readonly_session(func: Callable[PS, RT]) -> Callable[PS, RT]:
+    """
+    Provide a read-only session if it isn't provided.
+
+    This decorator is intended for read-only operations that can benefit from
+    using a read-only database replica to offload traffic from the master 
database.
+    If you want to reuse a session or run the function as part of a 
transaction,
+    you pass it to the function, if not this wrapper will create a read-only
+    session and close it for you.
+    """
+    session_args_idx = find_session_idx(func)
+
+    @wraps(func)
+    def wrapper(*args, **kwargs) -> RT:
+        if "session" in kwargs or session_args_idx < len(args):
+            return func(*args, **kwargs)
+        with create_readonly_session() as session:
+            return func(*args, session=session, **kwargs)
+
+    return wrapper
+
+
 # A fake session to use in functions decorated by provide_session. This allows
 # the 'session' argument to be of type Session instead of Session | None,
 # making it easier to type hint the function body without dealing with the None
 # case that can never happen at runtime.
 NEW_SESSION: SASession = cast("SASession", None)
+
+# A fake read-only session to use in functions decorated by 
provide_readonly_session.
+# This allows the 'session' argument to be of type Session instead of Session 
| None,
+# making it easier to type hint the function body without dealing with the None
+# case that can never happen at runtime.
+NEW_READONLY_SESSION: SASession = cast("SASession", None)

Reply via email to