dstandish commented on code in PR #42082:
URL: https://github.com/apache/airflow/pull/42082#discussion_r1801195560


##########
airflow/models/taskinstance.py:
##########
@@ -1760,16 +1758,16 @@ def _handle_reschedule(
     ti.end_date = timezone.utcnow()
     ti.set_duration()
 
-    # Lock DAG run to be sure not to get into a deadlock situation when trying 
to insert
-    # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
-    with_row_locks(
-        session.query(DagRun).filter_by(
-            dag_id=ti.dag_id,
-            run_id=ti.run_id,
-        ),
-        session=session,
-    ).one()
-    # Log reschedule request
+    # set state
+    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+    ti.clear_next_method_args()
+
+    session.merge(ti)
+    session.commit()

Review Comment:
   One other nuance here.
   
   Before, we wait to take an exclusive lock on dag run with select for update.
   
   And all other tasks that are trying to write to TaskReschedule must do the 
same.  Thus if there are multiple sensors in same dag, they will block each 
other and pile up.  But if we remove the dag run lock, then the only lock 
obtained is the implicit SELECT FOR SHARE taken internally by mysql when it 
inserts the record into TR.  And this does not block other sensors from taking 
the same lock.
   
   So to me, this would just have a positive effect on throughput and not 
really any meaningful downside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to