This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3aa6ba4d24 Sqlalchemy 2.0 changes to `DagFileProcessorManager` (#34665)
3aa6ba4d24 is described below
commit 3aa6ba4d248dc2d39a5d0237a06b2a740a59cc50
Author: Phani Kumar <[email protected]>
AuthorDate: Wed Oct 4 15:34:26 2023 +0530
Sqlalchemy 2.0 changes to `DagFileProcessorManager` (#34665)
---
airflow/dag_processing/manager.py | 36 ++++++++++++++++++++----------------
1 file changed, 20 insertions(+), 16 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 3d93daf66b..ddf5e93f89 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -36,6 +36,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast
from setproctitle import setproctitle
+from sqlalchemy import delete, select, update
from tabulate import tabulate
import airflow.models
@@ -525,13 +526,11 @@ class DagFileProcessorManager(LoggingMixin):
Deactivate them and remove them in the serialized_dag table.
"""
to_deactivate = set()
- query = session.query(DagModel.dag_id, DagModel.fileloc,
DagModel.last_parsed_time).filter(
- DagModel.is_active
- )
+ query = select(DagModel.dag_id, DagModel.fileloc,
DagModel.last_parsed_time).where(DagModel.is_active)
standalone_dag_processor = conf.getboolean("scheduler",
"standalone_dag_processor")
if standalone_dag_processor:
- query = query.filter(DagModel.processor_subdir == dag_directory)
- dags_parsed = query.all()
+ query = query.where(DagModel.processor_subdir == dag_directory)
+ dags_parsed = session.execute(query)
for dag in dags_parsed:
# The largest valid difference between a DagFileStat's
last_finished_time and a DAG's
@@ -546,11 +545,13 @@ class DagFileProcessorManager(LoggingMixin):
to_deactivate.add(dag.dag_id)
if to_deactivate:
- deactivated = (
- session.query(DagModel)
- .filter(DagModel.dag_id.in_(to_deactivate))
- .update({DagModel.is_active: False},
synchronize_session="fetch")
+ deactivated_dagmodel = session.execute(
+ update(DagModel)
+ .where(DagModel.dag_id.in_(to_deactivate))
+ .values(is_active=False)
+ .execution_options(synchronize_session="fetch")
)
+ deactivated = deactivated_dagmodel.rowcount
if deactivated:
cls.logger().info("Deactivated %i DAGs which are no longer
present in file.", deactivated)
@@ -695,15 +696,16 @@ class DagFileProcessorManager(LoggingMixin):
"""Fetch callbacks from database and add them to the internal queue
for execution."""
self.log.debug("Fetching callbacks from the database.")
with prohibit_commit(session) as guard:
- query = session.query(DbCallbackRequest)
+ query = select(DbCallbackRequest)
if self.standalone_dag_processor:
- query = query.filter(
+ query = query.where(
DbCallbackRequest.processor_subdir ==
self.get_dag_directory(),
)
query =
query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
- callbacks = with_row_locks(
+ query = with_row_locks(
query, of=DbCallbackRequest, session=session,
**skip_locked(session=session)
- ).all()
+ )
+ callbacks = session.scalars(query)
for callback in callbacks:
try:
self._add_callback_to_queue(callback.get_callback_request())
@@ -819,10 +821,12 @@ class DagFileProcessorManager(LoggingMixin):
:param file_paths: list of paths to DAG definition files
:param session: session for ORM operations
"""
- query = session.query(errors.ImportError)
+ query = delete(errors.ImportError)
+
if file_paths:
- query = query.filter(~errors.ImportError.filename.in_(file_paths))
- query.delete(synchronize_session="fetch")
+ query = query.where(~errors.ImportError.filename.in_(file_paths))
+
+ session.execute(query.execution_options(synchronize_session="fetch"))
session.commit()
def _log_file_processing_stats(self, known_file_paths):