vandonr-amz commented on code in PR #30704:
URL: https://github.com/apache/airflow/pull/30704#discussion_r1170303898


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1083,8 +1083,13 @@ def _do_scheduling(self, session: Session) -> int:
             callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, 
session)
 
         # Send the callbacks after we commit to ensure the context is up to 
date when it gets run
+        # cache saves time during scheduling of many dag_runs for same dag
+        cached_dags: dict = {}
         for dag_run, callback_to_run in callback_tuples:
-            dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+            if dag_run.dag_id not in cached_dags.keys():
+                cached_dags[dag_run.dag_id] = 
self.dagbag.get_dag(dag_run.dag_id, session=session)
+            
+            dag = cached_dags[dag_run.dag_id]

Review Comment:
   nit: when doing this, you do an extra lookup in the dict (when doing `x in 
dict.keys`) that can be avoided.
   
   Here is how you can rewrite it to minimize dict accesses:
   ```
   dag = cached_dags.get(dag_run.dag_id)  # returns None on miss
   if dag is None:
       dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
       cached_dags[dag_run.dag_id] = dag
   ```
   
   Also, this code is duplicated between the two methods. It'd be nice to wrap 
this into a `get_or_insert(dict, key, lambda)` function that could go in some 
utils file maybe ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to