1fanwang commented on code in PR #66773:
URL: https://github.com/apache/airflow/pull/66773#discussion_r3232486967
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *,
session: Session) -> list[T
.join(DM, TI.dag_id == DM.dag_id)
.where(
TI.state.in_((TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING)),
- TI.last_heartbeat_at < limit_dttm,
+ or_(
+ TI.last_heartbeat_at < limit_dttm,
Review Comment:
Fair point — let me walk through it. The `/run` endpoint you linked does set
`state=RUNNING` and `last_heartbeat_at=utcnow()` atomically in the same
`UPDATE`, so in steady-state Airflow 3 a freshly-running TI is not NULL on that
path.
The case I'm targeting is the **Airflow 2 → 3 upgrade legacy state**.
Migration `0045_3_0_0_add_last_heartbeat_at_directly_to_ti` adds the column as
`nullable=True` without a backfill. Any TI that was already `RUNNING` at
upgrade time has `last_heartbeat_at IS NULL` until something writes to it. The
codebase already acknowledges this exact state inside
`adopt_or_reset_orphaned_tasks`:
```python
# scheduler_job_runner.py:2854-2856
# If old ti from Airflow 2 and last_heartbeat_at is None, set
last_heartbeat_at to now
if ti.last_heartbeat_at is None:
ti.last_heartbeat_at = timezone.utcnow()
```
`adopt_or_reset` only runs at scheduler startup / on the scheduler-lock
timer. `_find_task_instances_without_heartbeats` runs on a tighter loop and
currently has no matching fallback — a TI in the migration state is invisible
to the cleanup query and stays `RUNNING` forever. This PR is the
heartbeat-cleanup-path counterpart to the existing `adopt_or_reset` fallback.
Captured the regression deterministically — reverting the new `or_(...)`
predicate and rerunning the regression test on `main`:
```
FAILED
::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
AssertionError: assert ti.key not in MockExecutor.running
(the cleanup query silently skipped the NULL row; the TI key is still in
executor.running)
```
With the fix in place, the test passes — and the companion
`..._null_last_heartbeat_fresh_start` case pins that a newly-started TI inside
its first timeout window is left alone. Updated the PR body with the full
before/after snippet and the steady-state walkthrough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]