This is an automated email from the ASF dual-hosted git repository.
phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a17819ca8a Sqlalchemy 2.0 changes (#35120)
a17819ca8a is described below
commit a17819ca8a4542a92bd2ead7689c07e10809eb7c
Author: Phani Kumar <[email protected]>
AuthorDate: Tue Oct 24 17:24:27 2023 +0530
Sqlalchemy 2.0 changes (#35120)
---
airflow/models/xcom_arg.py | 8 ++++----
airflow/operators/trigger_dagrun.py | 10 ++++------
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index cf0e95f460..7dc5577e60 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -21,7 +21,7 @@ import contextlib
import inspect
from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Mapping,
Sequence, Union, overload
-from sqlalchemy import func, or_
+from sqlalchemy import func, or_, select
from airflow.exceptions import AirflowException, XComNotFound
from airflow.models.abstractoperator import AbstractOperator
@@ -385,7 +385,7 @@ class PlainXComArg(XComArg):
)
if unfinished_ti_exists:
return None # Not all of the expanded tis are done yet.
- query = session.query(func.count(XCom.map_index)).filter(
+ query = select(func.count(XCom.map_index)).where(
XCom.dag_id == task.dag_id,
XCom.run_id == run_id,
XCom.task_id == task.task_id,
@@ -393,13 +393,13 @@ class PlainXComArg(XComArg):
XCom.key == XCOM_RETURN_KEY,
)
else:
- query = session.query(TaskMap.length).filter(
+ query = select(TaskMap.length).where(
TaskMap.dag_id == task.dag_id,
TaskMap.run_id == run_id,
TaskMap.task_id == task.task_id,
TaskMap.map_index < 0,
)
- return query.scalar()
+ return session.scalar(query)
@provide_session
def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
diff --git a/airflow/operators/trigger_dagrun.py
b/airflow/operators/trigger_dagrun.py
index b8a61290dc..39e2737d6d 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -22,6 +22,7 @@ import json
import time
from typing import TYPE_CHECKING, Any, Sequence, cast
+from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound
from airflow.api.common.trigger_dag import trigger_dag
@@ -227,14 +228,11 @@ class TriggerDagRunOperator(BaseOperator):
# This execution date is parsed from the return trigger event
provided_execution_date = event[1]["execution_dates"][0]
try:
- dag_run = (
- session.query(DagRun)
- .filter(
+ dag_run = session.execute(
+ select(DagRun).where(
DagRun.dag_id == self.trigger_dag_id,
DagRun.execution_date == provided_execution_date
)
- .one()
- )
-
+ ).scalar_one()
except NoResultFound:
raise AirflowException(
f"No DAG run found for DAG {self.trigger_dag_id} and execution
date {self.execution_date}"