This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b38d59bdce Revert "Fix future DagRun rarely triggered by race 
conditions when max_active_runs reached its upper limit. (#31414)" (#37596)
b38d59bdce is described below

commit b38d59bdce302ba393398f85f5bdf95d6ae25f5b
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Feb 21 18:00:37 2024 +0100

    Revert "Fix future DagRun rarely triggered by race conditions when 
max_active_runs reached its upper limit. (#31414)" (#37596)
    
    This reverts commit b53e2aeefc1714d306f93e58d211ad9d52356470.
---
 airflow/jobs/scheduler_job_runner.py | 17 +++--------------
 1 file changed, 3 insertions(+), 14 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index c0c9474913..32cc9f5a63 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, 
Iterable, Iterator
 
 from sqlalchemy import and_, delete, func, not_, or_, select, text, update
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
+from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
 from sqlalchemy.sql import expression
 
 from airflow import settings
@@ -1418,22 +1418,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         callback: DagCallbackRequest | None = None
 
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
-        # Adopt row locking to account for inconsistencies when 
next_dagrun_create_after = None
-        query = (
-            select(DagModel).where(DagModel.dag_id == 
dag_run.dag_id).options(joinedload(DagModel.parent_dag))
-        )
-        dag_model = session.scalars(
-            with_row_locks(query, of=DagModel, session=session, 
skip_locked=True)
-        ).one_or_none()
+        dag_model = DM.get_dagmodel(dag_run.dag_id, session)
 
-        if not dag:
+        if not dag or not dag_model:
             self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
             return callback
-        if not dag_model:
-            self.log.info(
-                "DAG %s scheduling was skipped, probably because the DAG 
record was locked", dag_run.dag_id
-            )
-            return callback
 
         if (
             dag_run.start_date

Reply via email to