1fanwang commented on code in PR #66773:
URL: https://github.com/apache/airflow/pull/66773#discussion_r3232486967


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *, 
session: Session) -> list[T
                 .join(DM, TI.dag_id == DM.dag_id)
                 .where(
                     TI.state.in_((TaskInstanceState.RUNNING, 
TaskInstanceState.RESTARTING)),
-                    TI.last_heartbeat_at < limit_dttm,
+                    or_(
+                        TI.last_heartbeat_at < limit_dttm,

Review Comment:
   **Update 2026-05-19:** the timing assumption in this argument was wrong. 
Live repro on a freshly-migrated A3 deployment showed 
`adopt_or_reset_orphaned_tasks` fires within ~14 ms of scheduler startup, 
rotates the NULL-heartbeat TIs into `task_instance_history`, and 
`_find_task_instances_without_heartbeats` only ever runs against the fresh rows 
that already have `last_heartbeat_at` populated by `/run`. The cleanup query 
never observes the NULL state, so the predicate this PR adds is unreachable on 
a normal restart path. See the 
[close-comment](https://github.com/apache/airflow/pull/66773#discussion_r3269906184)
 for the trace and the real-bug hypothesis on #58307.
   
   <details>
   <summary>Original argument, preserved for thread continuity</summary>
   
   Fair point — let me walk through it. The `/run` endpoint you linked does set 
`state=RUNNING` and `last_heartbeat_at=utcnow()` atomically in the same 
`UPDATE`, so in steady-state Airflow 3 a freshly-running TI is not NULL on that 
path.
   
   The case I'm targeting is the Airflow 2 → 3 upgrade legacy state. Migration 
`0045_3_0_0_add_last_heartbeat_at_directly_to_ti` adds the column as 
`nullable=True` without a backfill. Any TI that was already RUNNING at upgrade 
time has `last_heartbeat_at IS NULL` until something writes to it. The codebase 
already acknowledges this exact state inside `adopt_or_reset_orphaned_tasks`:
   
   ```python
   # scheduler_job_runner.py:2854-2856
   # If old ti from Airflow 2 and last_heartbeat_at is None, set 
last_heartbeat_at to now
   if ti.last_heartbeat_at is None:
       ti.last_heartbeat_at = timezone.utcnow()
   ```
   
   `adopt_or_reset` only runs at scheduler startup / on the scheduler-lock 
timer. `_find_task_instances_without_heartbeats` runs on a tighter loop and 
currently has no matching fallback — a TI in the migration state is invisible 
to the cleanup query and stays RUNNING forever. This PR is the 
heartbeat-cleanup-path counterpart to the existing `adopt_or_reset` fallback.
   
   Captured the regression deterministically — reverting the new `or_(...)` 
predicate and rerunning the regression test on `main`:
   
   ```
   FAILED 
::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
       AssertionError: assert ti.key not in MockExecutor.running
       (the cleanup query silently skipped the NULL row; the TI key is still in 
executor.running)
   ```
   
   With the fix in place, the test passes — and the companion 
`..._null_last_heartbeat_fresh_start` case pins that a newly-started TI inside 
its first timeout window is left alone. Updated the PR body with the full 
before/after snippet and the steady-state walkthrough.
   
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to