potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when 
trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error 
that can happen here is the same as in Scheduler or Mini-Scheduler (we already 
do exactly the same call there). The error that might happen here is if we 
cannot obtain the lock for a "long time" I think. Which should not happen 
usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which 
(as I understand) MUST have a corresponding entry in `DagRun`. So if we try to 
insert TaskReschedule that will not have a dagrun, it will fail because of the 
relationship with `DagRun` and inability to have foreign key to it. So if we 
have "none" returned here we are in a deep trouble. The whole point of this row 
is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so 
that any operations on the related task instances are "protected" from others 
who want to obtain the same DagRun row (and wait for the same lock).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only 
makes sense because mini-scheduler is really "optional" and any error can be 
safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just 
an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting 
TaskReschedule in this case is not optional, so I prefer the error to propagate 
up to the top and FAIL the task instead (that's what woudl happen if we just 
let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to