uranusjr commented on code in PR #31414:
URL: https://github.com/apache/airflow/pull/31414#discussion_r1200342583
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1375,10 +1375,21 @@ def _schedule_dag_run(
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id,
session=session)
dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+ # Adopt row locking to account for inconsistencies when
next_dagrun_create_after = None
+ query = (
+ session.query(DagModel)
+ .filter(DagModel.dag_id == dag_run.dag_id)
+ .options(joinedload(DagModel.parent_dag))
+ )
+ dag_model = with_row_locks(
+ query, of=DagModel, session=session, **skip_locked(session=session)
+ ).one_or_none()
- if not dag or not dag_model:
+ if not dag:
self.log.error("Couldn't find DAG %s in DAG bag or database!",
dag_run.dag_id)
return callback
+ if not dag_model:
+ return callback
Review Comment:
Why exclude `dag_model` from the log message?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]