This is an automated email from the ASF dual-hosted git repository.

phanikumv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a17819ca8a Sqlalchemy 2.0 changes (#35120)
a17819ca8a is described below

commit a17819ca8a4542a92bd2ead7689c07e10809eb7c
Author: Phani Kumar <[email protected]>
AuthorDate: Tue Oct 24 17:24:27 2023 +0530

    Sqlalchemy 2.0 changes (#35120)
---
 airflow/models/xcom_arg.py          |  8 ++++----
 airflow/operators/trigger_dagrun.py | 10 ++++------
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index cf0e95f460..7dc5577e60 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -21,7 +21,7 @@ import contextlib
 import inspect
 from typing import TYPE_CHECKING, Any, Callable, Iterable, Iterator, Mapping, 
Sequence, Union, overload
 
-from sqlalchemy import func, or_
+from sqlalchemy import func, or_, select
 
 from airflow.exceptions import AirflowException, XComNotFound
 from airflow.models.abstractoperator import AbstractOperator
@@ -385,7 +385,7 @@ class PlainXComArg(XComArg):
             )
             if unfinished_ti_exists:
                 return None  # Not all of the expanded tis are done yet.
-            query = session.query(func.count(XCom.map_index)).filter(
+            query = select(func.count(XCom.map_index)).where(
                 XCom.dag_id == task.dag_id,
                 XCom.run_id == run_id,
                 XCom.task_id == task.task_id,
@@ -393,13 +393,13 @@ class PlainXComArg(XComArg):
                 XCom.key == XCOM_RETURN_KEY,
             )
         else:
-            query = session.query(TaskMap.length).filter(
+            query = select(TaskMap.length).where(
                 TaskMap.dag_id == task.dag_id,
                 TaskMap.run_id == run_id,
                 TaskMap.task_id == task.task_id,
                 TaskMap.map_index < 0,
             )
-        return query.scalar()
+        return session.scalar(query)
 
     @provide_session
     def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index b8a61290dc..39e2737d6d 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -22,6 +22,7 @@ import json
 import time
 from typing import TYPE_CHECKING, Any, Sequence, cast
 
+from sqlalchemy import select
 from sqlalchemy.orm.exc import NoResultFound
 
 from airflow.api.common.trigger_dag import trigger_dag
@@ -227,14 +228,11 @@ class TriggerDagRunOperator(BaseOperator):
         # This execution date is parsed from the return trigger event
         provided_execution_date = event[1]["execution_dates"][0]
         try:
-            dag_run = (
-                session.query(DagRun)
-                .filter(
+            dag_run = session.execute(
+                select(DagRun).where(
                     DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == provided_execution_date
                 )
-                .one()
-            )
-
+            ).scalar_one()
         except NoResultFound:
             raise AirflowException(
                 f"No DAG run found for DAG {self.trigger_dag_id} and execution 
date {self.execution_date}"

Reply via email to