1fanwang commented on code in PR #66773:
URL: https://github.com/apache/airflow/pull/66773#discussion_r3232486967


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2938,7 +2944,10 @@ def _find_task_instances_without_heartbeats(self, *, 
session: Session) -> list[T
                 .join(DM, TI.dag_id == DM.dag_id)
                 .where(
                     TI.state.in_((TaskInstanceState.RUNNING, 
TaskInstanceState.RESTARTING)),
-                    TI.last_heartbeat_at < limit_dttm,
+                    or_(
+                        TI.last_heartbeat_at < limit_dttm,

Review Comment:
   Fair point — let me walk through it. The `/run` endpoint you linked does set 
`state=RUNNING` and `last_heartbeat_at=utcnow()` atomically in the same 
`UPDATE`, so in steady-state Airflow 3 a freshly-running TI is not NULL on that 
path.
   
   The case I'm targeting is the **Airflow 2 → 3 upgrade legacy state**. 
Migration `0045_3_0_0_add_last_heartbeat_at_directly_to_ti` adds the column as 
`nullable=True` without a backfill. Any TI that was already `RUNNING` at 
upgrade time has `last_heartbeat_at IS NULL` until something writes to it. The 
codebase already acknowledges this exact state inside 
`adopt_or_reset_orphaned_tasks`:
   
   ```python
   # scheduler_job_runner.py:2854-2856
   # If old ti from Airflow 2 and last_heartbeat_at is None, set 
last_heartbeat_at to now
   if ti.last_heartbeat_at is None:
       ti.last_heartbeat_at = timezone.utcnow()
   ```
   
   `adopt_or_reset` only runs at scheduler startup / on the scheduler-lock 
timer. `_find_task_instances_without_heartbeats` runs on a tighter loop and 
currently has no matching fallback — a TI in the migration state is invisible 
to the cleanup query and stays `RUNNING` forever. This PR is the 
heartbeat-cleanup-path counterpart to the existing `adopt_or_reset` fallback.
   
   Captured the regression deterministically — reverting the new `or_(...)` 
predicate and rerunning the regression test on `main`:
   
   ```
   FAILED 
::test_find_and_purge_task_instances_without_heartbeats_null_last_heartbeat
       AssertionError: assert ti.key not in MockExecutor.running
       (the cleanup query silently skipped the NULL row; the TI key is still in 
executor.running)
   ```
   
   With the fix in place, the test passes — and the companion 
`..._null_last_heartbeat_fresh_start` case pins that a newly-started TI inside 
its first timeout window is left alone. Updated the PR body with the full 
before/after snippet and the steady-state walkthrough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to