ephraimbuddy commented on code in PR #32645:
URL: https://github.com/apache/airflow/pull/32645#discussion_r1269045606


##########
airflow/models/serialized_dag.py:
##########
@@ -275,7 +272,7 @@ def has_dag(cls, dag_id: str, session: Session = 
NEW_SESSION) -> bool:
         :param dag_id: the DAG to check
         :param session: ORM Session
         """
-        return session.query(literal(True)).filter(cls.dag_id == 
dag_id).first() is not None
+        return session.scalars(select(literal(True)).where(cls.dag_id == 
dag_id)).first() is not None

Review Comment:
   Also here, is limit better?



##########
airflow/models/serialized_dag.py:
##########
@@ -144,23 +145,19 @@ def write_dag(
         # If Yes, does nothing
         # If No or the DAG does not exists, updates / writes Serialized DAG to 
DB
         if min_update_interval is not None:
-            if (
-                session.query(literal(True))
-                .filter(
-                    and_(
-                        cls.dag_id == dag.dag_id,
-                        (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.last_updated,
-                    )
+            if session.scalar(
+                select(literal(True)).where(
+                    cls.dag_id == dag.dag_id,
+                    (timezone.utcnow() - 
timedelta(seconds=min_update_interval)) < cls.last_updated,
                 )
-                .scalar()
             ):
                 return False
 
         log.debug("Checking if DAG (%s) changed", dag.dag_id)
         new_serialized_dag = cls(dag, processor_subdir)
-        serialized_dag_db = (
-            session.query(cls.dag_hash, 
cls.processor_subdir).filter(cls.dag_id == dag.dag_id).first()
-        )
+        serialized_dag_db = session.execute(
+            select(cls.dag_hash, cls.processor_subdir).where(cls.dag_id == 
dag.dag_id)
+        ).first()

Review Comment:
   Should we use limit here?



##########
airflow/models/serialized_dag.py:
##########
@@ -350,7 +347,7 @@ def get_max_last_updated_datetime(cls, session: Session = 
NEW_SESSION) -> dateti
 
         :param session: ORM Session
         """
-        return session.query(func.max(cls.last_updated)).scalar()
+        return session.scalar(func.max(cls.last_updated))

Review Comment:
   We might need a select here i.e: 
`session.scalar(select(func.max(cls.last_updated)))` but if it works, then no 
issue



##########
airflow/models/serialized_dag.py:
##########
@@ -379,7 +376,10 @@ def get_latest_version_hash_and_updated_datetime(
         :param session: ORM Session
         :return: A tuple of DAG Hash and last updated datetime, or None if the 
DAG is not found
         """
-        return session.query(cls.dag_hash, cls.last_updated).filter(cls.dag_id 
== dag_id).one_or_none()
+        try:
+            return session.execute(select(cls.dag_hash, 
cls.last_updated).where(cls.dag_id == dag_id)).one()
+        except NoResultFound:

Review Comment:
   I think execute has `one_or_none` too, we can use it instead of try/except



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to