This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b38d59bdce Revert "Fix future DagRun rarely triggered by race
conditions when max_active_runs reached its upper limit. (#31414)" (#37596)
b38d59bdce is described below
commit b38d59bdce302ba393398f85f5bdf95d6ae25f5b
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Feb 21 18:00:37 2024 +0100
Revert "Fix future DagRun rarely triggered by race conditions when
max_active_runs reached its upper limit. (#31414)" (#37596)
This reverts commit b53e2aeefc1714d306f93e58d211ad9d52356470.
---
airflow/jobs/scheduler_job_runner.py | 17 +++--------------
1 file changed, 3 insertions(+), 14 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index c0c9474913..32cc9f5a63 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -33,7 +33,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection,
Iterable, Iterator
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient,
selectinload
+from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
from airflow import settings
@@ -1418,22 +1418,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
callback: DagCallbackRequest | None = None
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
- # Adopt row locking to account for inconsistencies when
next_dagrun_create_after = None
- query = (
- select(DagModel).where(DagModel.dag_id ==
dag_run.dag_id).options(joinedload(DagModel.parent_dag))
- )
- dag_model = session.scalars(
- with_row_locks(query, of=DagModel, session=session,
skip_locked=True)
- ).one_or_none()
+ dag_model = DM.get_dagmodel(dag_run.dag_id, session)
- if not dag:
+ if not dag or not dag_model:
self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
return callback
- if not dag_model:
- self.log.info(
- "DAG %s scheduling was skipped, probably because the DAG
record was locked", dag_run.dag_id
- )
- return callback
if (
dag_run.start_date