potiuk commented on code in PR #42082:
URL: https://github.com/apache/airflow/pull/42082#discussion_r1799266761
##########
airflow/models/taskinstance.py:
##########
@@ -1760,16 +1758,16 @@ def _handle_reschedule(
ti.end_date = timezone.utcnow()
ti.set_duration()
- # Lock DAG run to be sure not to get into a deadlock situation when trying
to insert
- # TaskReschedule which apparently also creates lock on corresponding
DagRun entity
- with_row_locks(
- session.query(DagRun).filter_by(
- dag_id=ti.dag_id,
- run_id=ti.run_id,
- ),
- session=session,
- ).one()
- # Log reschedule request
+ # set state
+ ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+ ti.clear_next_method_args()
+
+ session.merge(ti)
+ session.commit()
Review Comment:
The only potential drawback is that if the system crashes right after
commit, the task status will be updated, but TaskReschedule will not be
created. Likelihood of it is small, but I think it's worth to at least do a
mental exercise of what happens then and whether it's worth it.
Now, when I think of it, it's little worrying - because it actually happens
in "_run_raw_task" - which means that it has **some** probabilty to happen (OOM
and others).
BTW. This part should look pretty differently for Airflow 3 with AIP-72
because reschedule will not happen in task but in the API component most
likely, so I am not quite sure if that is targetted to be only back-ported?
@ashb - any concerns and comments here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]