peachchen0716 opened a new pull request, #64816:
URL: https://github.com/apache/airflow/pull/64816

   ## Description
   
   Fixes `TaskInstance.start_date` not being restored to the original 
first-poke time for rescheduled sensors/tasks, which inflates 
`dagrun.first_task_start_delay` and `dagrun.first_task_scheduling_delay` 
metrics by the full reschedule wait time.
   
   ## Problem
   
   In `_check_and_change_state_before_execution`, the code that restores 
`start_date` to the first attempt's time is gated on `ti.state == 
UP_FOR_RESCHEDULE`:
   
   ```python
   ti.start_date = ti.start_date if ti.next_method else timezone.utcnow()
   if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:  # always False in 
practice
       tr_start_date = session.scalar(TR.stmt_for_task_instance(...))
       if tr_start_date:
           ti.start_date = tr_start_date
   ```
   
   In the normal scheduler flow, the scheduler advances state 
`UP_FOR_RESCHEDULE → QUEUED` before dispatching the task to any executor. By 
the time a worker calls `_check_and_change_state_before_execution`, 
`ti.refresh_from_db()` returns `QUEUED`, so the guard is always `False`. 
`start_date` is reset to `utcnow()` on every re-execution — the time of the 
*last* poke, not the *first*.
   
   This bug is invisible in unit tests because those call `ti.run()` directly 
(bypassing the scheduler), preserving `UP_FOR_RESCHEDULE` state and making the 
guard fire correctly. It only manifests in production with any executor type 
(Local, Celery, Kubernetes).
   
   ## Fix
   
   Replace the state check with an unconditional `TaskReschedule` lookup scoped 
to `try_number`. The query returns `None` for non-rescheduled tasks (no 
behavior change for the common case); for rescheduled tasks it correctly 
restores `start_date` from the first poke regardless of the current state.
   
   ```python
   ti.start_date = ti.start_date if ti.next_method else timezone.utcnow()
   if not ti.next_method:
       tr_start_date = session.scalar(TR.stmt_for_task_instance(...))
       if tr_start_date:
           ti.start_date = tr_start_date
   ```
   
   ## Testing
   
   Added 
`test_reschedule_start_date_preserved_when_scheduler_advances_to_queued` which 
explicitly reproduces the production path:
   1. Runs one reschedule cycle → `start_date = date1`, state = 
`UP_FOR_RESCHEDULE`
   2. Manually advances state to `QUEUED` (simulating what the scheduler does)
   3. Runs the sensor to success
   4. Asserts `start_date == date1` (not `date2`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to