1fanwang opened a new pull request, #66790:
URL: https://github.com/apache/airflow/pull/66790

   ## Problem
   
   A sensor in `mode="reschedule"` loses its first-poke `start_date` on every 
re-execution. The supervisor sends `start_date=utcnow()` as part of the 
`ti_run` execution-API payload on each poke, and the endpoint writes that value 
through unconditionally. Net effect: `dagrun.first_task_scheduling_delay` 
(computed from `start_date - queued_at`) collapses to ~0 for any DAG fronted by 
a reschedule-mode sensor, even when the sensor waited minutes or hours.
   
   A guard already exists for deferred tasks at 
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:180-182`:
   
   ```python
   if ti.next_kwargs:
       data.pop("start_date")
   ```
   
   There is no equivalent for rescheduled tasks. A legacy guard exists in 
`_check_and_change_state_before_execution` 
(`airflow-core/src/airflow/models/taskinstance.py:1315`) but is gated on 
`ti.state == UP_FOR_RESCHEDULE`, which never holds at the time the worker runs 
the check: the scheduler transitions `UP_FOR_RESCHEDULE -> QUEUED` before the 
worker picks up the task, so `refresh_from_db` returns `QUEUED` and the lookup 
is skipped.
   
   ## Fix
   
   Two changes:
   
   1. **`task_instances.py::ti_run`** (production path) — When `start_date` is 
present in the payload and the task has prior `TaskReschedule` rows, restore 
`start_date` from the first row instead of accepting `utcnow()`. Mirror the 
same value into `context.start_date` on the response so the supervisor pins 
`context["ti"].start_date` to the first poke as well.
   
   2. **`taskinstance.py::_check_and_change_state_before_execution`** (legacy / 
test-utility path) — Drop the `ti.state == UP_FOR_RESCHEDULE` gate. The lookup 
is scoped by `ti.id`, and `prepare_db_for_next_try` clears `TaskReschedule` 
rows and rotates `ti.id` on each retry, so try-number scoping is implicit and 
the query is harmless for non-rescheduled tasks.
   
   ## Why try_number scoping is implicit
   
   `prepare_db_for_next_try` 
(`airflow-core/src/airflow/models/taskinstance.py:973-979`):
   
   ```python
   def prepare_db_for_next_try(self, session: Session):
       ...
       session.execute(delete(TaskReschedule).filter_by(ti_id=self.id))
       self.id = uuid7()
   ```
   
   On every retry, all `TaskReschedule` rows for the previous try's `ti.id` are 
deleted and the TI gets a fresh UUID. So rows with `ti_id == current 
task_instance_id` always belong to the current try. No additional `try_number` 
filter is needed.
   
   ## Reproducer
   
   ```python
   from airflow.providers.standard.sensors.python import PythonSensor
   
   PythonSensor(
       task_id="poll",
       mode="reschedule",
       poke_interval=10,
       timeout=300,
       python_callable=lambda: False,  # forces reschedule until timeout
   )
   ```
   
   Before the fix, `ti.start_date` advances by ~`poke_interval` on every 
reschedule. After the fix, `ti.start_date` stays pinned at the first-poke value 
and `dagrun.first_task_scheduling_delay` reflects the actual wait.
   
   ## Tests
   
   - 
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_restores_start_date_for_rescheduled_task`
 -- the API path restores `start_date` from `TaskReschedule` on a subsequent 
poke
   - 
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_uses_payload_start_date_when_no_reschedule_rows`
 -- non-rescheduled tasks preserve the payload value
   - 
`airflow-core/tests/unit/models/test_taskinstance.py::TestTaskInstance::test_check_and_change_state_before_execution_restores_reschedule_start_date`
 -- the legacy method restores `start_date` even when the scheduler has already 
advanced state to `QUEUED`
   
   Closes #66784


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to