This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bc1d7b03ab0ba903718b4098196de24977ee1261
Author: doiken <[email protected]>
AuthorDate: Tue Aug 8 21:22:13 2023 +0900

    Fix future DagRun rarely triggered by race conditions when max_active_runs 
reached its upper limit. (#31414)
    
    * feat: select dag_model with row lock
    
    * fix: logging that scheduling was skipped
    
    * fix: remove unused get_dagmodel
    
    * fix: correct log message to more generic word
    
    ---------
    
    Co-authored-by: doiken <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: eladkal <[email protected]>
    (cherry picked from commit b53e2aeefc1714d306f93e58d211ad9d52356470)
---
 airflow/jobs/scheduler_job_runner.py | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 3b399d75b5..e47fc7bd86 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -35,7 +35,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, 
Iterable, Iterator
 from sqlalchemy import and_, delete, func, not_, or_, select, text, update
 from sqlalchemy.engine import Result
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import Query, Session, load_only, make_transient, 
selectinload
+from sqlalchemy.orm import Query, Session, joinedload, load_only, 
make_transient, selectinload
 from sqlalchemy.sql import expression
 
 from airflow import settings
@@ -1397,11 +1397,24 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
         callback: DagCallbackRequest | None = None
 
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-        dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+        # Adopt row locking to account for inconsistencies when 
next_dagrun_create_after = None
+        query = (
+            session.query(DagModel)
+            .filter(DagModel.dag_id == dag_run.dag_id)
+            .options(joinedload(DagModel.parent_dag))
+        )
+        dag_model = with_row_locks(
+            query, of=DagModel, session=session, **skip_locked(session=session)
+        ).one_or_none()
 
-        if not dag or not dag_model:
+        if not dag:
             self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
             return callback
+        if not dag_model:
+            self.log.info(
+                "DAG %s scheduling was skipped, probably because the DAG 
record was locked", dag_run.dag_id
+            )
+            return callback
 
         if (
             dag_run.start_date

Reply via email to