This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5f85eef2cc252affc7049278f507579e2f6f6814 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu Oct 14 16:09:38 2021 +0100 Row lock TI query in SchedulerJob._process_executor_events (#18975) Using multiple schedulers causes Deadlock in _process_executor_events. This PR fixes it. Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit 52cc84c66afc2c73e8a2a80aae46f2208f07c4cb) --- airflow/jobs/scheduler_job.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2ea0bb5..1e74e99 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -27,7 +27,7 @@ import time import warnings from collections import defaultdict from datetime import timedelta -from typing import Collection, DefaultDict, Dict, List, Optional, Tuple +from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Tuple from sqlalchemy import and_, func, not_, or_, tuple_ from sqlalchemy.exc import OperationalError @@ -510,7 +510,15 @@ class SchedulerJob(BaseJob): # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) - tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all() + query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')) + # row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have + # multi-schedulers + tis: Iterator[TI] = with_row_locks( + query, + of=TI, + session=session, + **skip_locked(session=session), + ) for ti in tis: try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number)
