potiuk commented on pull request #20894:
URL: https://github.com/apache/airflow/pull/20894#issuecomment-1016765008


   > What made me try this change was because a similar thing happened with 
multiple schedulers and #18975 solved it.
   
   Yeah - but this is really the difference SELECT vs. UPDATE. The #18975 was 
SELECT query:
   
   ```
   tis: List[TI] = 
session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
   ```
   
   Where here we have an UPDATE query:
   
   ```
   session.query(TI)
   .filter(
       TI.dag_id == self.dag_id,
       TI.run_id == self.run_id,
       TI.task_id.in_(schedulable_ti_ids),
   )
   .update({TI.state: State.SCHEDULED}, synchronize_session=False)
   ```
   
   UPDATE query obtains lock automatically for all rows accessed (so what 
happens first in the query it tries to obtain locks on all the rows it accesses 
and `with_row_lock` has no effect.
   
   > I'm of the opinion that if dag_run lock is the problem, then the log would 
be around dagrun and not task_instance
   
   The dag_run is not the problem. In theory (that how the DAG_RUN lock was 
designed), you should not even attempt to get a lock on any TASK_INSTANCE 
before obtaining DAG_RUN lock first. This is what "with_row_lock" SELECT FROM 
DAG_RUN SKIP_LOCKED few methods up in the stack should be doing before we get 
here. 
   
   This is what  `next_dagruns_to_examine` is doing by:
   
   ```
          query = (
               session.query(cls)
               .filter(cls.state == state, cls.run_type != 
DagRunType.BACKFILL_JOB)
               .join(DagModel, DagModel.dag_id == cls.dag_id)
               .filter(DagModel.is_paused == false(), DagModel.is_active == 
true())
           )
          ....
   
           return with_row_locks(
               query.limit(max_number), of=cls, session=session, 
**skip_locked(session=session)
           )
   ```
   
   @ashb? Is that correct ? 
   
   What happens here - we have the DAG_RUN lock from the few methods up, so we 
should be safe to update TASK_INSTANCES. but for some reason someone else who 
did not have the DAG_RUN lock also tries to update those instances.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to