doiken commented on code in PR #31414:
URL: https://github.com/apache/airflow/pull/31414#discussion_r1201514769


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1375,10 +1375,21 @@ def _schedule_dag_run(
 
         dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
         dag_model = DM.get_dagmodel(dag_run.dag_id, session)
+        # Adopt row locking to account for inconsistencies when 
next_dagrun_create_after = None
+        query = (
+            session.query(DagModel)
+            .filter(DagModel.dag_id == dag_run.dag_id)
+            .options(joinedload(DagModel.parent_dag))
+        )
+        dag_model = with_row_locks(
+            query, of=DagModel, session=session, **skip_locked(session=session)
+        ).one_or_none()
 
-        if not dag or not dag_model:
+        if not dag:
             self.log.error("Couldn't find DAG %s in DAG bag or database!", 
dag_run.dag_id)
             return callback
+        if not dag_model:
+            return callback

Review Comment:
   I took your advice and added log 
https://github.com/apache/airflow/pull/31414/commits/82c976a52af763a7463cc69b60198822df3e04fe
   
   In the above work, I noticed that I forgot removing existing `get_model`, so 
I removed it.
   
https://github.com/apache/airflow/pull/31414/commits/8492e3df0d7bb95a17bba1191786addab794db99
   
   I know some of the tests are failing, but I think it has nothing to do with 
my edits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to