This is an automated email from the ASF dual-hosted git repository.

phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3aa6ba4d24 Sqlalchemy 2.0 changes to `DagFileProcessorManager` (#34665)
3aa6ba4d24 is described below

commit 3aa6ba4d248dc2d39a5d0237a06b2a740a59cc50
Author: Phani Kumar <[email protected]>
AuthorDate: Wed Oct 4 15:34:26 2023 +0530

    Sqlalchemy 2.0 changes to `DagFileProcessorManager` (#34665)
---
 airflow/dag_processing/manager.py | 36 ++++++++++++++++++++----------------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 3d93daf66b..ddf5e93f89 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -36,6 +36,7 @@ from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast
 
 from setproctitle import setproctitle
+from sqlalchemy import delete, select, update
 from tabulate import tabulate
 
 import airflow.models
@@ -525,13 +526,11 @@ class DagFileProcessorManager(LoggingMixin):
         Deactivate them and remove them in the serialized_dag table.
         """
         to_deactivate = set()
-        query = session.query(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time).filter(
-            DagModel.is_active
-        )
+        query = select(DagModel.dag_id, DagModel.fileloc, 
DagModel.last_parsed_time).where(DagModel.is_active)
         standalone_dag_processor = conf.getboolean("scheduler", 
"standalone_dag_processor")
         if standalone_dag_processor:
-            query = query.filter(DagModel.processor_subdir == dag_directory)
-        dags_parsed = query.all()
+            query = query.where(DagModel.processor_subdir == dag_directory)
+        dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
             # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
@@ -546,11 +545,13 @@ class DagFileProcessorManager(LoggingMixin):
                 to_deactivate.add(dag.dag_id)
 
         if to_deactivate:
-            deactivated = (
-                session.query(DagModel)
-                .filter(DagModel.dag_id.in_(to_deactivate))
-                .update({DagModel.is_active: False}, 
synchronize_session="fetch")
+            deactivated_dagmodel = session.execute(
+                update(DagModel)
+                .where(DagModel.dag_id.in_(to_deactivate))
+                .values(is_active=False)
+                .execution_options(synchronize_session="fetch")
             )
+            deactivated = deactivated_dagmodel.rowcount
             if deactivated:
                 cls.logger().info("Deactivated %i DAGs which are no longer 
present in file.", deactivated)
 
@@ -695,15 +696,16 @@ class DagFileProcessorManager(LoggingMixin):
         """Fetch callbacks from database and add them to the internal queue 
for execution."""
         self.log.debug("Fetching callbacks from the database.")
         with prohibit_commit(session) as guard:
-            query = session.query(DbCallbackRequest)
+            query = select(DbCallbackRequest)
             if self.standalone_dag_processor:
-                query = query.filter(
+                query = query.where(
                     DbCallbackRequest.processor_subdir == 
self.get_dag_directory(),
                 )
             query = 
query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
-            callbacks = with_row_locks(
+            query = with_row_locks(
                 query, of=DbCallbackRequest, session=session, 
**skip_locked(session=session)
-            ).all()
+            )
+            callbacks = session.scalars(query)
             for callback in callbacks:
                 try:
                     
self._add_callback_to_queue(callback.get_callback_request())
@@ -819,10 +821,12 @@ class DagFileProcessorManager(LoggingMixin):
         :param file_paths: list of paths to DAG definition files
         :param session: session for ORM operations
         """
-        query = session.query(errors.ImportError)
+        query = delete(errors.ImportError)
+
         if file_paths:
-            query = query.filter(~errors.ImportError.filename.in_(file_paths))
-        query.delete(synchronize_session="fetch")
+            query = query.where(~errors.ImportError.filename.in_(file_paths))
+
+        session.execute(query.execution_options(synchronize_session="fetch"))
         session.commit()
 
     def _log_file_processing_stats(self, known_file_paths):

Reply via email to