hussein-awala commented on code in PR #34665:
URL: https://github.com/apache/airflow/pull/34665#discussion_r1340522124


##########
airflow/dag_processing/manager.py:
##########
@@ -526,13 +527,11 @@ def deactivate_stale_dags(
         Deactivate them and remove them in the serialized_dag table.
         """
         to_deactivate = set()
-        query = session.query(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time).filter(
-            DagModel.is_active
-        )
+        query = select(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time).where(DagModel.is_active)
         standalone_dag_processor = conf.getboolean("scheduler", 
"standalone_dag_processor")
         if standalone_dag_processor:
-            query = query.filter(DagModel.processor_subdir == dag_directory)
-        dags_parsed = query.all()
+            query = query.where(DagModel.processor_subdir == dag_directory)
+        dags_parsed = session.execute(query)

Review Comment:
   Based on the migration doc, you need to add `.scalars().all()`
   ```suggestion
           dags_parsed = session.execute(query).scalars().all()
   ```



##########
airflow/dag_processing/manager.py:
##########
@@ -547,11 +546,13 @@ def deactivate_stale_dags(
                 to_deactivate.add(dag.dag_id)
 
         if to_deactivate:
-            deactivated = (
-                session.query(DagModel)
-                .filter(DagModel.dag_id.in_(to_deactivate))
-                .update({DagModel.is_active: False}, 
synchronize_session="fetch")
+            deactivated_dagmodel = session.execute(
+                update(DagModel)
+                .where(DagModel.dag_id.in_(to_deactivate))
+                .values(is_active=False)
+                .execution_options(synchronize_session="fetch")
             )
+            deactivated = len([dagmodel for dagmodel, in deactivated_dagmodel])

Review Comment:
   I wonder if we can just get the number of affected rows; something like 
`deactivated_dagmodel.rowcount` instead of unnecessary `fetch`



##########
airflow/dag_processing/manager.py:
##########
@@ -820,10 +821,12 @@ def clear_nonexistent_import_errors(file_paths: list[str] 
| None, session=NEW_SE
         :param file_paths: list of paths to DAG definition files
         :param session: session for ORM operations
         """
-        query = session.query(errors.ImportError)
         if file_paths:
-            query = query.filter(~errors.ImportError.filename.in_(file_paths))
-        query.delete(synchronize_session="fetch")
+            query = 
delete(errors.ImportError).where(~errors.ImportError.filename.in_(file_paths))
+        else:
+            query = delete(errors.ImportError)

Review Comment:
   you can define the delete before the if, and add the where in the if block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to