ashb commented on code in PR #44907:
URL: https://github.com/apache/airflow/pull/44907#discussion_r1886671257
##########
airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -147,7 +149,26 @@ def ti_update_state(
next_kwargs=ti_patch_payload.trigger_kwargs,
trigger_timeout=timeout,
)
+ elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
+ task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
+ actual_start_date = timezone.utcnow()
+ # add changes to TaskReschedule table to the session
+ session.add(
+ TaskReschedule(
+ task_instance.task_id,
+ task_instance.dag_id,
+ task_instance.run_id,
+ task_instance.try_number,
+ actual_start_date,
+ task_instance.end_date,
+ ti_patch_payload.reschedule_date,
+ task_instance.map_index,
+ )
+ )
+ query = update(TI).where(TI.id == ti_id_str)
+ # clear the next_method and next_kwargs so that none of the retries
pick them up
+ query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None,
next_kwargs=None)
Review Comment:
We've already got the TI row loaded, so lets do this:
```suggestion
ti.state = State.UP_FOR_RESCHEDULE
ti.next_method = None
ti.next_kwargs = None
session.flush()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]