vandonr-amz commented on code in PR #30704:
URL: https://github.com/apache/airflow/pull/30704#discussion_r1171808855


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1348,8 +1354,14 @@ def _update_state(dag: DAG, dag_run: DagRun):
                     tags={"dag_id": dag.dag_id},
                 )
 
+        # cache saves time during scheduling of many dag_runs for same dag
+        cached_dags: dict = {}
+
         for dag_run in dag_runs:
-            dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, 
session=session)
+            dag = dag_run.dag = get_value_with_cache(
+                cached_dags, dag_run.dag_id, self.dagbag.get_dag, 
dag_run.dag_id, session=session
+            )

Review Comment:
   ```suggestion
               dag = dag_run.dag = get_value_with_cache(
                   cached_dags, dag_run.dag_id, lambda: 
self.dagbag.get_dag(dag_run.dag_id, session=session)
               )
   ```



##########
airflow/utils/helpers.py:
##########
@@ -45,6 +45,15 @@
 S = TypeVar("S")
 
 
+def get_value_with_cache(cache: dict, key: str, insert_fn: Callable, *arg, 
**kwargs) -> Any:
+    """Returns value from cache or function"""
+    return_value = cache.get(key)
+    if not return_value:
+        return_value = cache[key] = insert_fn(*arg, **kwargs)
+
+    return return_value

Review Comment:
   I'll let you fix the test if you apply this change ;)



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1083,8 +1084,13 @@ def _do_scheduling(self, session: Session) -> int:
             callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, 
session)
 
         # Send the callbacks after we commit to ensure the context is up to 
date when it gets run
+        # cache saves time during scheduling of many dag_runs for same dag
+        cached_dags: dict = {}
         for dag_run, callback_to_run in callback_tuples:
-            dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+            dag = get_value_with_cache(
+                cached_dags, dag_run.dag_id, self.dagbag.get_dag, 
dag_run.dag_id, session=session
+            )

Review Comment:
   Passing the params individually is a convoluted way to do it... Using a 
lambda you can just pass the whole packaged call as a Callable without the need 
for extra parameters
   ```suggestion
               dag = get_value_with_cache(
                   cached_dags, dag_run.dag_id, lambda: 
self.dagbag.get_dag(dag_run.dag_id, session=session)
               )
   ```



##########
airflow/utils/helpers.py:
##########
@@ -45,6 +45,15 @@
 S = TypeVar("S")
 
 
+def get_value_with_cache(cache: dict, key: str, insert_fn: Callable, *arg, 
**kwargs) -> Any:
+    """Returns value from cache or function"""
+    return_value = cache.get(key)
+    if not return_value:
+        return_value = cache[key] = insert_fn(*arg, **kwargs)
+
+    return return_value

Review Comment:
   By using a lambda, we don't need to handle paramters here. (suggesting a 
rename of the function passed as well ? WDYT ?)
   
   Also, `if not return_value` will trigger if the saved value is "falsy", 
which is the case for None, but also for empty lists/dict for instance, which 
can lead to surprising behavior. I think explicitly checking against None would 
be better here.
   
   ```suggestion
   def get_value_with_cache(cache: dict, key: str, do_get_value: Callable) -> 
Any:
       """Returns value from cache or function"""
       return_value = cache.get(key)
       if return_value is None:
           return_value = cache[key] = do_get_value()
   
       return return_value
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to