This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 87d35caaf99 Fix duplicate deadline callbacks with HA scheduler
replicas (#64737)
87d35caaf99 is described below
commit 87d35caaf993d60b86f483f216b74638c9e70ec3
Author: Shivam Rastogi <[email protected]>
AuthorDate: Mon Apr 6 15:31:27 2026 -0700
Fix duplicate deadline callbacks with HA scheduler replicas (#64737)
The scheduler's deadline-check loop selected expired, unhandled
Deadline rows without any row-level lock. Under HA deployments with
multiple scheduler replicas, two schedulers could read the same row
concurrently, both call handle_miss, and both create Trigger records
for the same deadline -- causing the deadline breach callback to fire
multiple times.
Wrap the SELECT in with_row_locks(..., of=Deadline, skip_locked=True,
key_share=False) so the first scheduler to reach a row holds a
FOR UPDATE lock until commit, and any other scheduler silently skips
that row. key_share=False is required because the default FOR KEY SHARE
does not conflict with itself.
closes: #64710
---
.../src/airflow/jobs/scheduler_job_runner.py | 16 ++++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 62 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 67006a7d8ee..d949012d4f5 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1624,13 +1624,23 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.log.exception("Something went wrong when trying
to save task event logs.")
with create_session() as session:
- # Only retrieve expired deadlines that haven't been
processed yet.
- # `missed` is False by default until the handler sets it.
- for deadline in session.scalars(
+ # Lock expired, unhandled deadlines with FOR UPDATE SKIP
LOCKED so
+ # concurrent HA scheduler replicas don't both process the
same row
+ # and create duplicate callbacks.
+ deadline_query = (
select(Deadline)
.where(Deadline.deadline_time <
datetime.now(timezone.utc))
.where(~Deadline.missed)
.options(selectinload(Deadline.callback),
selectinload(Deadline.dagrun))
+ )
+ for deadline in session.scalars(
+ with_row_locks(
+ deadline_query,
+ of=Deadline,
+ session=session,
+ skip_locked=True,
+ key_share=False,
+ )
):
deadline.handle_miss(session)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 5ccbb7e4056..b0f72b91db3 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -101,6 +101,7 @@ from airflow.serialization.definitions.dag import
SerializedDAG
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.timetables.base import DagRunInfo, DataInterval
from airflow.utils.session import create_session, provide_session
+from airflow.utils.sqlalchemy import with_row_locks
from airflow.utils.state import CallbackState, DagRunState, State,
TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -7897,6 +7898,67 @@ class TestSchedulerJob:
# The handler should not be called, but no exceptions should be raised
either.`
mock_handle_miss.assert_not_called()
+ @mock.patch("airflow.models.Deadline.handle_miss")
+ def test_expired_deadline_locked_by_other_scheduler_is_skipped(
+ self, mock_handle_miss, session, dag_maker
+ ):
+ """The scheduler's deadline loop must skip rows another replica
already holds."""
+ if session.get_bind().dialect.name == "sqlite":
+ pytest.skip("SQLite does not support row-level locking (SKIP
LOCKED)")
+
+ past_date = timezone.utcnow() - timedelta(minutes=5)
+ dag_id = "test_deadline_locked_by_other_scheduler"
+ callback_path = "classpath.notify"
+
+ with dag_maker(dag_id=dag_id):
+ EmptyOperator(task_id="empty")
+ dagrun_id = dag_maker.create_dagrun().id
+
+ serialized_dag =
session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id ==
dag_id))
+ assert serialized_dag is not None
+
+ deadline_alert = DeadlineAlert(
+ serialized_dag_id=serialized_dag.id,
+ name="Test Skip Locked",
+ reference={"type": "dag", "dag_id": dag_id},
+ interval=300.0,
+ callback_def={"classpath": callback_path, "kwargs": {}},
+ )
+ session.add(deadline_alert)
+ session.flush()
+
+ session.add(
+ Deadline(
+ deadline_time=past_date,
+ callback=AsyncCallback(callback_path),
+ dagrun_id=dagrun_id,
+ dag_id=dag_id,
+ deadline_alert_id=deadline_alert.id,
+ )
+ )
+ session.commit()
+
+ # scoped=False gives an independent session with its own connection;
the
+ # default scoped_session would reuse this thread's session and locks
held
+ # by "self" do not block "self".
+ with create_session(scoped=False) as competing_session:
+ locked_rows = competing_session.scalars(
+ with_row_locks(
+ select(Deadline).where(~Deadline.missed),
+ of=Deadline,
+ session=competing_session,
+ skip_locked=True,
+ key_share=False,
+ )
+ ).all()
+ assert len(locked_rows) == 1
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
num_runs=1, executors=[MockExecutor()])
+ self.job_runner._execute()
+
+ mock_handle_miss.assert_not_called()
+
def test_emit_running_dags_metric(self, dag_maker, monkeypatch):
"""Test that the running_dags metric is emitted correctly."""
with dag_maker("metric_dag") as dag: