This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b53e2aeefc Fix future DagRun rarely triggered by race conditions when
max_active_runs reached its upper limit. (#31414)
b53e2aeefc is described below
commit b53e2aeefc1714d306f93e58d211ad9d52356470
Author: doiken <[email protected]>
AuthorDate: Tue Aug 8 21:22:13 2023 +0900
Fix future DagRun rarely triggered by race conditions when max_active_runs
reached its upper limit. (#31414)
* feat: select dag_model with row lock
* fix: logging that scheduling was skipped
* fix: remove unused get_dagmodel
* fix: correct log message to more generic word
---------
Co-authored-by: doiken <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
Co-authored-by: eladkal <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 3b399d75b5..e47fc7bd86 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -35,7 +35,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection,
Iterable, Iterator
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.engine import Result
from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import Query, Session, load_only, make_transient,
selectinload
+from sqlalchemy.orm import Query, Session, joinedload, load_only,
make_transient, selectinload
from sqlalchemy.sql import expression
from airflow import settings
@@ -1397,11 +1397,24 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
callback: DagCallbackRequest | None = None
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
- dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+ # Adopt row locking to account for inconsistencies when
next_dagrun_create_after = None
+ query = (
+ session.query(DagModel)
+ .filter(DagModel.dag_id == dag_run.dag_id)
+ .options(joinedload(DagModel.parent_dag))
+ )
+ dag_model = with_row_locks(
+ query, of=DagModel, session=session, **skip_locked(session=session)
+ ).one_or_none()
- if not dag or not dag_model:
+ if not dag:
self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
return callback
+ if not dag_model:
+ self.log.info(
+ "DAG %s scheduling was skipped, probably because the DAG
record was locked", dag_run.dag_id
+ )
+ return callback
if (
dag_run.start_date