potiuk commented on issue #23361:
URL: https://github.com/apache/airflow/issues/23361#issuecomment-1193362751

   Thanks for REALLY detailed investigation @dstaple.
   
   I finally had some time to take a look at this and I think your assesment 
was very correct.  
   
   However the solution you proposed is not good, because I think we DO want to 
run "SELECT FOR UPDATE" on DagRun table. The whole scheduling is based on the 
fact that DagRun row gets locked and no changes are happening to DagRun and any 
TaskInstances of that DagRun while Scheduler processes those task instances. 
And since local_task_runn potentially changes the state of the task instance it 
runs (that's why ot locks it for update), if the whole task DagRun is currently 
"being processed" by any of the schedulers. we should hold-off with running the 
task before scheduler finishes this particular DagRun processing and releases 
the lock.
   
   And in this case the "local_task_run" actually locks the DagRun table too 
(though I am not entirely sure why this is one thing that I do not understand 
completely - see below). So it does what it should but with one very little 
caveat - it locks the TaskInstance and DagRun in REVERSE ORDER comparing to 
what Scheduler does. This is actually the root cause of ALL Deadlocks (at least 
in Postgres, MySQL has it's own fair share of other kinds of deadlocks) - 
non-consistent order. The deadlock appears when two threads want two (or more) 
resources and gets lock on them in reverse order. This is actually the only 
reason for any kind of deadlocks and your investigation was really nicely 
showing what's going on.
   
   The solution to that is simple - since we are going to get the DagRun lock 
in a moment anyway in "refresh_from_db", we should simply get the lock on 
DagRun table FIRST. This should fix the problem as we will then perform lock 
grabbing in the same sequence in scheduler and task_run - > first DagRun, then 
TaskInstance.  This is what my proposed #25266 does.
   
   The only thing I do not know is WHY the `TaskInstance.refresh_from_db` 
actually does the JOIN query:
   
   ```
   SELECT FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = 
task_instance.dag_id AND dag_run_1.run_id ... FOR UPDATE
   ```
   
   The original query in the code looks like this:
   
   ```
           qry = session.query(TaskInstance).filter(
               TaskInstance.dag_id == self.dag_id,
               TaskInstance.task_id == self.task_id,
               TaskInstance.run_id == self.run_id,
               TaskInstance.map_index == self.map_index,
           )
   
           if lock_for_update:
               for attempt in run_with_db_retries(logger=self.log):
                   with attempt:
                       ti: Optional[TaskInstance] = 
qry.with_for_update().first()
   ```
   
   And there is no obvious reason why the last line joins the dag_run table?
   
   I hope someone else in this thread might shed some light on it, I have a 
suspicion, that SQLALchemy will add the join in case there is a ForeignKey with 
ONCASCADE with the dag_id (which we have) - but I could not find any reference 
or documentation that would point to such behaviour.
   
   @RNHTTR  - since you mentioned you can reproduce the issue - maybe you could 
apply my fix and see if it solves the problem (there is a bit of leap of faith 
with this change).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to