This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 87d35caaf99 Fix duplicate deadline callbacks with HA scheduler 
replicas (#64737)
87d35caaf99 is described below

commit 87d35caaf993d60b86f483f216b74638c9e70ec3
Author: Shivam Rastogi <[email protected]>
AuthorDate: Mon Apr 6 15:31:27 2026 -0700

    Fix duplicate deadline callbacks with HA scheduler replicas (#64737)
    
    The scheduler's deadline-check loop selected expired, unhandled
    Deadline rows without any row-level lock. Under HA deployments with
    multiple scheduler replicas, two schedulers could read the same row
    concurrently, both call handle_miss, and both create Trigger records
    for the same deadline -- causing the deadline breach callback to fire
    multiple times.
    
    Wrap the SELECT in with_row_locks(..., of=Deadline, skip_locked=True,
    key_share=False) so the first scheduler to reach a row holds a
    FOR UPDATE lock until commit, and any other scheduler silently skips
    that row. key_share=False is required because the default FOR KEY SHARE
    does not conflict with itself.
    
    closes: #64710
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 16 ++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 62 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 67006a7d8ee..d949012d4f5 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1624,13 +1624,23 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                         self.log.exception("Something went wrong when trying 
to save task event logs.")
 
                 with create_session() as session:
-                    # Only retrieve expired deadlines that haven't been 
processed yet.
-                    # `missed` is False by default until the handler sets it.
-                    for deadline in session.scalars(
+                    # Lock expired, unhandled deadlines with FOR UPDATE SKIP 
LOCKED so
+                    # concurrent HA scheduler replicas don't both process the 
same row
+                    # and create duplicate callbacks.
+                    deadline_query = (
                         select(Deadline)
                         .where(Deadline.deadline_time < 
datetime.now(timezone.utc))
                         .where(~Deadline.missed)
                         .options(selectinload(Deadline.callback), 
selectinload(Deadline.dagrun))
+                    )
+                    for deadline in session.scalars(
+                        with_row_locks(
+                            deadline_query,
+                            of=Deadline,
+                            session=session,
+                            skip_locked=True,
+                            key_share=False,
+                        )
                     ):
                         deadline.handle_miss(session)
 
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 5ccbb7e4056..b0f72b91db3 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -101,6 +101,7 @@ from airflow.serialization.definitions.dag import 
SerializedDAG
 from airflow.serialization.serialized_objects import LazyDeserializedDAG
 from airflow.timetables.base import DagRunInfo, DataInterval
 from airflow.utils.session import create_session, provide_session
+from airflow.utils.sqlalchemy import with_row_locks
 from airflow.utils.state import CallbackState, DagRunState, State, 
TaskInstanceState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
@@ -7897,6 +7898,67 @@ class TestSchedulerJob:
         # The handler should not be called, but no exceptions should be raised 
either.`
         mock_handle_miss.assert_not_called()
 
+    @mock.patch("airflow.models.Deadline.handle_miss")
+    def test_expired_deadline_locked_by_other_scheduler_is_skipped(
+        self, mock_handle_miss, session, dag_maker
+    ):
+        """The scheduler's deadline loop must skip rows another replica 
already holds."""
+        if session.get_bind().dialect.name == "sqlite":
+            pytest.skip("SQLite does not support row-level locking (SKIP 
LOCKED)")
+
+        past_date = timezone.utcnow() - timedelta(minutes=5)
+        dag_id = "test_deadline_locked_by_other_scheduler"
+        callback_path = "classpath.notify"
+
+        with dag_maker(dag_id=dag_id):
+            EmptyOperator(task_id="empty")
+        dagrun_id = dag_maker.create_dagrun().id
+
+        serialized_dag = 
session.scalar(select(SerializedDagModel).where(SerializedDagModel.dag_id == 
dag_id))
+        assert serialized_dag is not None
+
+        deadline_alert = DeadlineAlert(
+            serialized_dag_id=serialized_dag.id,
+            name="Test Skip Locked",
+            reference={"type": "dag", "dag_id": dag_id},
+            interval=300.0,
+            callback_def={"classpath": callback_path, "kwargs": {}},
+        )
+        session.add(deadline_alert)
+        session.flush()
+
+        session.add(
+            Deadline(
+                deadline_time=past_date,
+                callback=AsyncCallback(callback_path),
+                dagrun_id=dagrun_id,
+                dag_id=dag_id,
+                deadline_alert_id=deadline_alert.id,
+            )
+        )
+        session.commit()
+
+        # scoped=False gives an independent session with its own connection; 
the
+        # default scoped_session would reuse this thread's session and locks 
held
+        # by "self" do not block "self".
+        with create_session(scoped=False) as competing_session:
+            locked_rows = competing_session.scalars(
+                with_row_locks(
+                    select(Deadline).where(~Deadline.missed),
+                    of=Deadline,
+                    session=competing_session,
+                    skip_locked=True,
+                    key_share=False,
+                )
+            ).all()
+            assert len(locked_rows) == 1
+
+            scheduler_job = Job()
+            self.job_runner = SchedulerJobRunner(job=scheduler_job, 
num_runs=1, executors=[MockExecutor()])
+            self.job_runner._execute()
+
+            mock_handle_miss.assert_not_called()
+
     def test_emit_running_dags_metric(self, dag_maker, monkeypatch):
         """Test that the running_dags metric is emitted correctly."""
         with dag_maker("metric_dag") as dag:

Reply via email to