This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0c2079585e4 Fix DagFileProcessorManager silent hang on DB lock
contention (#68118)
0c2079585e4 is described below
commit 0c2079585e4eee5154ab94a6876830aabe15e171
Author: Subham <[email protected]>
AuthorDate: Sun Jun 7 13:18:36 2026 +0530
Fix DagFileProcessorManager silent hang on DB lock contention (#68118)
* Fix DagFileProcessorManager silent hang on DB lock contention
* Change unsupported lock timeout warning to debug log
---
airflow-core/src/airflow/dag_processing/manager.py | 68 ++++++++++++++++------
airflow-core/src/airflow/utils/sqlalchemy.py | 44 +++++++++++++-
.../tests/unit/dag_processing/test_manager.py | 56 ++++++++++++++++++
3 files changed, 148 insertions(+), 20 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index fc00b6730c8..37f0496a89c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -40,6 +40,7 @@ from typing import TYPE_CHECKING, Any, Literal, NamedTuple,
cast
import attrs
import structlog
from sqlalchemy import select, update
+from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only
from tabulate import tabulate
from uuid6 import uuid7
@@ -76,7 +77,12 @@ from airflow.utils.process_utils import (
)
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, create_session, provide_session
-from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
+from airflow.utils.sqlalchemy import (
+ is_lock_not_available_error,
+ prohibit_commit,
+ with_db_lock_timeout,
+ with_row_locks,
+)
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Iterator, Sequence
@@ -453,15 +459,26 @@ class DagFileProcessorManager(LoggingMixin):
to_deactivate.add(dag.dag_id)
if to_deactivate:
- deactivated_dagmodel = session.execute(
- update(DagModel)
- .where(DagModel.dag_id.in_(to_deactivate))
- .values(is_stale=True)
- .execution_options(synchronize_session="fetch")
- )
- deactivated = getattr(deactivated_dagmodel, "rowcount", 0)
- if deactivated:
- self.log.info("Deactivated %i DAGs which are no longer present
in file.", deactivated)
+ try:
+ with with_db_lock_timeout(session=session, lock_timeout=30):
+ deactivated_dagmodel = session.execute(
+ update(DagModel)
+ .where(DagModel.dag_id.in_(to_deactivate))
+ .values(is_stale=True)
+ .execution_options(synchronize_session="fetch")
+ )
+ deactivated = getattr(deactivated_dagmodel, "rowcount", 0)
+ if deactivated:
+ self.log.info("Deactivated %i DAGs which are no longer
present in file.", deactivated)
+ except OperationalError as e:
+ if is_lock_not_available_error(e):
+ self.log.warning(
+ "Lock not available when deactivating stale DAGs. "
+ "Skipping this iteration to prevent processor hang."
+ )
+ session.rollback()
+ else:
+ raise
def _run_parsing_loop(self):
# initialize cache to mutualize calls to Variable.get in DAGs
@@ -903,15 +920,28 @@ class DagFileProcessorManager(LoggingMixin):
"""Deactivate DAGs that come from files that are no longer present in
bundle."""
observed_filelocs = self._get_observed_filelocs(present)
with create_session() as session:
- any_deactivated = DagModel.deactivate_deleted_dags(
- bundle_name=bundle_name,
- rel_filelocs=observed_filelocs,
- session=session,
- )
- # Only run cleanup if we actually deactivated any DAGs
- # This avoids unnecessary DELETE queries in the common case where
no DAGs were deleted
- if any_deactivated:
- remove_references_to_deleted_dags(session=session)
+ try:
+ with with_db_lock_timeout(session=session, lock_timeout=30):
+ any_deactivated = DagModel.deactivate_deleted_dags(
+ bundle_name=bundle_name,
+ rel_filelocs=observed_filelocs,
+ session=session,
+ )
+ # Only run cleanup if we actually deactivated any DAGs
+ # This avoids unnecessary DELETE queries in the common
case where no DAGs were deleted
+ if any_deactivated:
+ remove_references_to_deleted_dags(session=session)
+ session.flush()
+ except OperationalError as e:
+ if is_lock_not_available_error(e):
+ self.log.warning(
+ "Lock not available when deactivating deleted DAGs for
bundle %s. "
+ "Skipping this iteration to prevent processor hang.",
+ bundle_name,
+ )
+ session.rollback()
+ else:
+ raise
def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
"""Occasionally print out stats about how fast the files are getting
processed."""
diff --git a/airflow-core/src/airflow/utils/sqlalchemy.py
b/airflow-core/src/airflow/utils/sqlalchemy.py
index 35a6f4ee05b..a9508712cbc 100644
--- a/airflow-core/src/airflow/utils/sqlalchemy.py
+++ b/airflow-core/src/airflow/utils/sqlalchemy.py
@@ -24,7 +24,7 @@ import logging
from collections.abc import Generator
from typing import TYPE_CHECKING, Any
-from sqlalchemy import TIMESTAMP, PickleType, String, event, nullsfirst
+from sqlalchemy import TIMESTAMP, PickleType, String, event, nullsfirst, text
from sqlalchemy.dialects import mysql
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.ext.compiler import compiles
@@ -461,6 +461,48 @@ def lock_rows(query: Select, session: Session) ->
Generator[None, None, None]:
del locked_rows
[email protected]
+def with_db_lock_timeout(session: Session, lock_timeout: int = 30) ->
Generator[None, None, None]:
+ """
+ Context manager to set the database lock timeout for the current session.
+
+ This prevents long-running operations from blocking indefinitely if they
encounter
+ lock contention. Only supported on PostgreSQL and MySQL.
+
+ :param session: ORM Session
+ :param lock_timeout: Lock timeout in seconds.
+ """
+ if lock_timeout <= 0:
+ raise ValueError("lock_timeout must be a positive integer number of
seconds")
+
+ try:
+ dialect_name = get_dialect_name(session)
+ except ValueError:
+ dialect_name = None
+
+ old_mysql_timeout = None
+
+ if dialect_name == "postgresql":
+ # SET LOCAL applies only to the current transaction and resets on
COMMIT/ROLLBACK.
+ session.execute(text(f"SET LOCAL lock_timeout = '{lock_timeout}s'"))
+ elif dialect_name == "mysql":
+ old_mysql_timeout = session.execute(text("SELECT
@@SESSION.innodb_lock_wait_timeout")).scalar()
+ session.execute(text(f"SET SESSION innodb_lock_wait_timeout =
{lock_timeout}"))
+ else:
+ log.debug(
+ "Database lock timeout is not supported for dialect '%s'. "
+ "The requested timeout of %ss will not be applied.",
+ dialect_name,
+ lock_timeout,
+ )
+
+ try:
+ yield
+ finally:
+ if dialect_name == "mysql" and old_mysql_timeout is not None:
+ session.execute(text(f"SET SESSION innodb_lock_wait_timeout =
{old_mysql_timeout}"))
+
+
class CommitProhibitorGuard:
"""Context manager class that powers prohibit_commit."""
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 941d090f229..5b6fc1608e2 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -39,6 +39,7 @@ import msgspec
import pytest
import time_machine
from sqlalchemy import func, select
+from sqlalchemy.exc import OperationalError
from uuid6 import uuid7
from airflow._shared.timezones import timezone
@@ -1040,6 +1041,61 @@ class TestDagFileProcessorManager:
)
assert is_stale_by_dag == {"dag_in_inactive_bundle": True,
"dag_in_active_bundle": False}
+ @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error")
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_deactivate_stale_dags_handles_lock_timeout(self,
mock_is_lock_not_available, session, caplog):
+ """Dags deactivation should gracefully handle database lock
timeouts."""
+ from sqlalchemy.exc import OperationalError
+
+ session.add(DagBundleModel(name="gone-bundle"))
+ session.flush()
+ session.execute(
+ DagBundleModel.__table__.update().where(DagBundleModel.name ==
"gone-bundle").values(active=False)
+ )
+ session.add(
+ DagModel(
+ dag_id="dag_in_inactive_bundle",
+ bundle_name="gone-bundle",
+ relative_fileloc="some_file.py",
+ last_parsed_time=timezone.utcnow(),
+ is_stale=False,
+ )
+ )
+ session.flush()
+
+ manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 *
60)
+
+ # Mock session.execute to raise OperationalError only when updating
DagModel
+ original_execute = session.execute
+
+ def mock_execute(*args, **kwargs):
+ if hasattr(args[0], "table") and getattr(args[0].table, "name",
"") == "dag":
+ raise OperationalError("Lock wait timeout exceeded",
params={}, orig=Exception())
+ return original_execute(*args, **kwargs)
+
+ mock_is_lock_not_available.return_value = True
+
+ with mock.patch.object(session, "execute", side_effect=mock_execute):
+ manager.deactivate_stale_dags(last_parsed={})
+
+ assert "Lock not available when deactivating stale DAGs" in caplog.text
+
+ @mock.patch("airflow.dag_processing.manager.is_lock_not_available_error")
+ @mock.patch("airflow.models.dag.DagModel.deactivate_deleted_dags")
+ def test_deactivate_deleted_dags_handles_lock_timeout(
+ self, mock_deactivate_deleted_dags, mock_is_lock_not_available, caplog
+ ):
+ """Dags deactivation of deleted dags should gracefully handle database
lock timeouts."""
+ mock_deactivate_deleted_dags.side_effect = OperationalError(
+ "Lock wait timeout exceeded", params={}, orig=Exception()
+ )
+ mock_is_lock_not_available.return_value = True
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.deactivate_deleted_dags(bundle_name="testing_bundle",
present=set())
+
+ assert "Lock not available when deactivating deleted DAGs for bundle
testing_bundle" in caplog.text
+
@mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)