1fanwang commented on code in PR #66820:
URL: https://github.com/apache/airflow/pull/66820#discussion_r3232841122
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1776,6 +1776,15 @@ def _do_scheduling(self, session: Session) -> int:
self._start_queued_dagruns(session)
guard.commit()
+ # Clear DagRun objects loaded by phase 1 from the identity map so
+ # phase 2 reloads them fresh. Otherwise stale rows can be
re-dirtied
+ # by flush/merge in _schedule_all_dag_runs and committed in a
row-lock
+ # order that differs from what other scheduler replicas are taking
+ # for their own work, producing A-B / B-A deadlocks on dag_run and
+ # task_instance under HA scheduler deployments. See
+ # https://github.com/apache/airflow/issues/66817.
+ session.expunge_all()
Review Comment:
Follow-up with the investigation. Leading with the captured probe output —
that's the load-bearing artefact; the rest is supporting characterisation.
## Captured probe: identity map + dirty set + SQL emitted
Wrote a temp pytest probe that registers `before_flush` on the session and
`before_cursor_execute` on the engine, then runs
`SchedulerJobRunner._do_scheduling()` end-to-end over a single QUEUED → RUNNING
→ SCHEDULED cycle. Ran it twice: once on the fix branch, once with
`session.expunge_all()` reverted to `main`.
**Without the fix (`main`):**
```
=== Identity map at the phase-1 -> phase-2 boundary ===
('DagVersion', UUID('019e208a-aadd-...'))
('SerializedDagModel', UUID('019e208a-aae4-...'))
('DagCode', UUID('019e208a-aae3-...'))
('DagModel', 'capture_phase2_sql')
('DagRun', 1)
('TaskInstance', UUID('019e208a-aaec-...'))
=== Dirty / new sets captured at each before_flush in phase 2 ===
flush[0]: dirty=[('DagRun', 1)] new=[]
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
**With the fix:**
```
=== Identity map at the phase-1 -> phase-2 boundary ===
(empty)
=== Dirty / new sets captured at each before_flush in phase 2 ===
(none)
=== UPDATEs emitted to dag_run / task_instance during _do_scheduling ===
UPDATE dag_run SET start_date=?, state=?, updated_at=? WHERE
dag_run.id = ?
UPDATE task_instance SET start_date=?, end_date=?, duration=?, state=?,
try_number=(task_instance.try_number + ?), updated_at=? WHERE task_instance.id
IN (?) AND ...
UPDATE dag_run SET last_scheduling_decision=?, updated_at=? WHERE
dag_run.id = ?
```
What this proves:
- **Six leaked entries** in the identity map at the phase boundary on
`main`, including `DagRun(1)` and its `TaskInstance` — exactly the rows phase 2
is about to write to. With the fix, the identity map is empty entering phase 2.
- **`flush[0]: dirty=[('DagRun', 1)]`** on `main` — phase 2's flush
re-dirties the *phase-1-loaded* `DagRun(1)` instance via the
SELECT-returns-identity-map-entry path, not a fresh phase-2 fetch. With the
fix, phase 2 fetches fresh and nothing from phase 1 ends up in its flush set.
- **Same UPDATE bytes** either way in a single-process run, because the
*symptom* (deadlock) only fires when two replicas hit this opposing-order
condition simultaneously. The cause is observable in one process; the deadlock
requires two.
So the cause-effect chain pinned to this captured trace:
> phase-1 entries leak into the identity map → phase-2's flush walks them
first → row-lock acquisition order across replicas is driven by *phase-1's*
per-replica load order → two replicas grab the same rows in opposite orders →
A-B / B-A deadlock on `dag_run` / `task_instance`.
## Production-shape scheduler log (characteristic, with caveat)
I can't copy production scheduler logs out due to company policy, but the
deadlock shape we hit lands in this form on MySQL/InnoDB:
```
sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (1213, 'Deadlock
found when trying to get lock; try restarting transaction')
[SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE
dag_run.id = %s]
```
and on Postgres:
```
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock
detected
DETAIL: Process N waits for ShareLock on transaction M; blocked by process
P.
Process P waits for ShareLock on transaction Q; blocked by process N.
CONTEXT: while updating tuple (B, T) in relation "dag_run"
[SQL: UPDATE dag_run SET last_scheduling_decision=%s, updated_at=%s WHERE
dag_run.id = %s]
```
Two markers that identify this as the same bug vs an unrelated deadlock:
- The offending statement is `UPDATE dag_run` (or `UPDATE task_instance`),
both written by `_schedule_dag_run`'s flush.
- The two transactions in conflict are different scheduler-job-id
transactions inside the same heartbeat window.
## What I'd need to capture a literal 1213 + INNODB STATUS
A two-thread Docker MySQL repro that forces opposing-order row locks on two
`dag_run` rows. Happy to spin one up if the captured mechanism above isn't
enough — it's a few minutes of setup, just calling it out as a separate ask
because the mechanism is fully evidenced by section 1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]