This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5f85eef2cc252affc7049278f507579e2f6f6814
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Oct 14 16:09:38 2021 +0100

    Row lock TI query in SchedulerJob._process_executor_events (#18975)
    
    Using multiple schedulers causes Deadlock in _process_executor_events.
    This PR fixes it.
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    (cherry picked from commit 52cc84c66afc2c73e8a2a80aae46f2208f07c4cb)
---
 airflow/jobs/scheduler_job.py | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2ea0bb5..1e74e99 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -27,7 +27,7 @@ import time
 import warnings
 from collections import defaultdict
 from datetime import timedelta
-from typing import Collection, DefaultDict, Dict, List, Optional, Tuple
+from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, 
Tuple
 
 from sqlalchemy import and_, func, not_, or_, tuple_
 from sqlalchemy.exc import OperationalError
@@ -510,7 +510,15 @@ class SchedulerJob(BaseJob):
 
         # Check state of finished tasks
         filter_for_tis = TI.filter_for_tis(tis_with_right_state)
-        tis: List[TI] = 
session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
+        query = 
session.query(TI).filter(filter_for_tis).options(selectinload('dag_model'))
+        # row lock this entire set of taskinstances to make sure the scheduler 
doesn't fail when we have
+        # multi-schedulers
+        tis: Iterator[TI] = with_row_locks(
+            query,
+            of=TI,
+            session=session,
+            **skip_locked(session=session),
+        )
         for ti in tis:
             try_number = ti_primary_key_to_try_number_map[ti.key.primary]
             buffer_key = ti.key.with_try_number(try_number)

Reply via email to