potiuk commented on issue #23361:
URL: https://github.com/apache/airflow/issues/23361#issuecomment-1193362751
Thanks for REALLY detailed investigation @dstaple.
I finally had some time to take a look at this and I think your assesment
was very correct.
However the solution you proposed is not good, because I think we DO want to
run "SELECT FOR UPDATE" on DagRun table. The whole scheduling is based on the
fact that DagRun row gets locked and no changes are happening to DagRun and any
TaskInstances of that DagRun while Scheduler processes those task instances.
And since local_task_runn potentially changes the state of the task instance it
runs (that's why ot locks it for update), if the whole task DagRun is currently
"being processed" by any of the schedulers. we should hold-off with running the
task before scheduler finishes this particular DagRun processing and releases
the lock.
And in this case the "local_task_run" actually locks the DagRun table too
(though I am not entirely sure why this is one thing that I do not understand
completely - see below). So it does what it should but with one very little
caveat - it locks the TaskInstance and DagRun in REVERSE ORDER comparing to
what Scheduler does. This is actually the root cause of ALL Deadlocks (at least
in Postgres, MySQL has it's own fair share of other kinds of deadlocks) -
non-consistent order. The deadlock appears when two threads want two (or more)
resources and gets lock on them in reverse order. This is actually the only
reason for any kind of deadlocks and your investigation was really nicely
showing what's going on.
The solution to that is simple - since we are going to get the DagRun lock
in a moment anyway in "refresh_from_db", we should simply get the lock on
DagRun table FIRST. This should fix the problem as we will then perform lock
grabbing in the same sequence in scheduler and task_run - > first DagRun, then
TaskInstance. This is what my proposed #25266 does.
The only thing I do not know is WHY the `TaskInstance.refresh_from_db`
actually does the JOIN query:
```
SELECT FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id =
task_instance.dag_id AND dag_run_1.run_id ... FOR UPDATE
```
The original query in the code looks like this:
```
qry = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
TaskInstance.map_index == self.map_index,
)
if lock_for_update:
for attempt in run_with_db_retries(logger=self.log):
with attempt:
ti: Optional[TaskInstance] =
qry.with_for_update().first()
```
And there is no obvious reason why the last line joins the dag_run table?
I hope someone else in this thread might shed some light on it, I have a
suspicion, that SQLALchemy will add the join in case there is a ForeignKey with
ONCASCADE with the dag_id (which we have) - but I could not find any reference
or documentation that would point to such behaviour.
@RNHTTR - since you mentioned you can reproduce the issue - maybe you could
apply my fix and see if it solves the problem (there is a bit of leap of faith
with this change).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]