uranusjr commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r659635689



##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,33 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the 
configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        if dag_id in self.dags:
+            min_serialized_dag_fetch_secs = 
timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+            if (
+                dag_id in self.dags_last_fetched
+                and timezone.utcnow() > self.dags_last_fetched[dag_id] + 
min_serialized_dag_fetch_secs
+            ):
+                sd_last_updated_datetime = 
SerializedDagModel.get_last_updated_datetime(
+                    dag_id=dag_id,
+                    session=session,
+                )
+                if not sd_last_updated_datetime:
+                    self.log.warning("Serialized DAG %s no longer exists", 
dag_id)
+                    del self.dags[dag_id]
+                    del self.dags_last_fetched[dag_id]
+                    del self.dags_hash[dag_id]
+                    return False
+                return True
+        return SerializedDagModel.has_dag(dag_id=dag_id, session=session)

Review comment:
       ```suggestion
           sd_last_updated_datetime = 
SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           sd_has_dag = (sd_last_updated_datetime is not None)
           if dag_id not in self.dags:
               return sd_has_dag
           if dag_id not in self.dags_last_fetched:
               return sd_has_dag
           min_serialized_dag_fetch_secs = 
timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
           if timezone.utcnow() < self.dags_last_fetched[dag_id] + 
min_serialized_dag_fetch_secs:
               return sd_has_dag
           if sd_has_dag:
               return True
           self.log.warning("Serialized DAG %s no longer exists", dag_id)
           del self.dags[dag_id]
           del self.dags_last_fetched[dag_id]
           del self.dags_hash[dag_id]
           return False
   ```
   
   `SerializedDagModel.last_updated` is not nullable, so if 
`SerializedDagModel.get_last_updated_datetime()` returns None, it means the dag 
does not have a corresponding row. This saves us one database query. Since we 
can calculate the `SerializedDagModel` row’s existence from that, early returns 
can be used to make the logic easier to follow.
   
   (Note: I wrote the suggestion block directly in the text field, so please 
double check to make sure the code is correct.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to