1fanwang opened a new pull request, #66790:
URL: https://github.com/apache/airflow/pull/66790
## Problem
A sensor in `mode="reschedule"` loses its first-poke `start_date` on every
re-execution. The supervisor sends `start_date=utcnow()` as part of the
`ti_run` execution-API payload on each poke, and the endpoint writes that value
through unconditionally. Net effect: `dagrun.first_task_scheduling_delay`
(computed from `start_date - queued_at`) collapses to ~0 for any DAG fronted by
a reschedule-mode sensor, even when the sensor waited minutes or hours.
A guard already exists for deferred tasks at
`airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:180-182`:
```python
if ti.next_kwargs:
data.pop("start_date")
```
There is no equivalent for rescheduled tasks. A legacy guard exists in
`_check_and_change_state_before_execution`
(`airflow-core/src/airflow/models/taskinstance.py:1315`) but is gated on
`ti.state == UP_FOR_RESCHEDULE`, which never holds at the time the worker runs
the check: the scheduler transitions `UP_FOR_RESCHEDULE -> QUEUED` before the
worker picks up the task, so `refresh_from_db` returns `QUEUED` and the lookup
is skipped.
## Fix
Two changes:
1. **`task_instances.py::ti_run`** (production path) — When `start_date` is
present in the payload and the task has prior `TaskReschedule` rows, restore
`start_date` from the first row instead of accepting `utcnow()`. Mirror the
same value into `context.start_date` on the response so the supervisor pins
`context["ti"].start_date` to the first poke as well.
2. **`taskinstance.py::_check_and_change_state_before_execution`** (legacy /
test-utility path) — Drop the `ti.state == UP_FOR_RESCHEDULE` gate. The lookup
is scoped by `ti.id`, and `prepare_db_for_next_try` clears `TaskReschedule`
rows and rotates `ti.id` on each retry, so try-number scoping is implicit and
the query is harmless for non-rescheduled tasks.
## Why try_number scoping is implicit
`prepare_db_for_next_try`
(`airflow-core/src/airflow/models/taskinstance.py:973-979`):
```python
def prepare_db_for_next_try(self, session: Session):
...
session.execute(delete(TaskReschedule).filter_by(ti_id=self.id))
self.id = uuid7()
```
On every retry, all `TaskReschedule` rows for the previous try's `ti.id` are
deleted and the TI gets a fresh UUID. So rows with `ti_id == current
task_instance_id` always belong to the current try. No additional `try_number`
filter is needed.
## Reproducer
```python
from airflow.providers.standard.sensors.python import PythonSensor
PythonSensor(
task_id="poll",
mode="reschedule",
poke_interval=10,
timeout=300,
python_callable=lambda: False, # forces reschedule until timeout
)
```
Before the fix, `ti.start_date` advances by ~`poke_interval` on every
reschedule. After the fix, `ti.start_date` stays pinned at the first-poke value
and `dagrun.first_task_scheduling_delay` reflects the actual wait.
## Tests
-
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_restores_start_date_for_rescheduled_task`
-- the API path restores `start_date` from `TaskReschedule` on a subsequent
poke
-
`airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py::TestTIRunState::test_ti_run_uses_payload_start_date_when_no_reschedule_rows`
-- non-rescheduled tasks preserve the payload value
-
`airflow-core/tests/unit/models/test_taskinstance.py::TestTaskInstance::test_check_and_change_state_before_execution_restores_reschedule_start_date`
-- the legacy method restores `start_date` even when the scheduler has already
advanced state to `QUEUED`
Closes #66784
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]