potiuk commented on code in PR #42082:
URL: https://github.com/apache/airflow/pull/42082#discussion_r1799266761


##########
airflow/models/taskinstance.py:
##########
@@ -1760,16 +1758,16 @@ def _handle_reschedule(
     ti.end_date = timezone.utcnow()
     ti.set_duration()
 
-    # Lock DAG run to be sure not to get into a deadlock situation when trying 
to insert
-    # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
-    with_row_locks(
-        session.query(DagRun).filter_by(
-            dag_id=ti.dag_id,
-            run_id=ti.run_id,
-        ),
-        session=session,
-    ).one()
-    # Log reschedule request
+    # set state
+    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+    ti.clear_next_method_args()
+
+    session.merge(ti)
+    session.commit()

Review Comment:
   The only potential drawback is that if the system crashes right after 
commit, the task status will be updated, but TaskReschedule will not be 
created. Likelihood of it is small, but I think it's worth to at least do a 
mental exercise of what happens then and whether it's worth it.
   
   Now, when I think of it, it's little worrying - because it actually happens 
in "_run_raw_task" - which means that it has **some** probabilty to happen (OOM 
and others). 
   
   BTW. This part should look pretty differently for Airflow 3 with AIP-72 
because reschedule will not happen in task but in the API component most 
likely, so I am not quite sure if that is targetted to be only back-ported? 
   
   @ashb - any concerns and comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to