This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 47a1eb85a26 Fix MyPy type errors in api_fastapi remaining files for 
Sqlalchemy 2 migration (#57466)
47a1eb85a26 is described below

commit 47a1eb85a26aef8eccfebacc6b8874dec7e612aa
Author: Anusha Kovi <[email protected]>
AuthorDate: Fri Oct 31 21:34:32 2025 +0530

    Fix MyPy type errors in api_fastapi remaining files for Sqlalchemy 2 
migration (#57466)
    
    * Fix MyPy type errors in api_fastapi remaining files for Sqlalchemy 2 
migration
    
    * add imports at the top
    
    * remove duplicate import
---
 airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py   |  5 ++++-
 airflow-core/src/airflow/api_fastapi/common/parameters.py    | 12 ++++++++----
 .../airflow/api_fastapi/core_api/services/public/dag_run.py  |  4 ++--
 .../tests/unit/api_fastapi/core_api/routes/ui/test_dags.py   |  2 +-
 4 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 1ca5f1d261b..2efb6967653 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -17,7 +17,10 @@
 
 from __future__ import annotations
 
+from typing import cast
+
 from sqlalchemy import func, select
+from sqlalchemy.sql import ColumnElement
 
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
@@ -27,7 +30,7 @@ dagruns_select_with_state_count = (
         DagRun.dag_id,
         DagRun.state,
         DagModel.dag_display_name,
-        func.count(DagRun.state),
+        cast("ColumnElement[int]", func.count(DagRun.state).label("count")),
     )
     .join(DagModel, DagRun.dag_id == DagModel.dag_id)
     .group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py 
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index 6674dd55fef..dd7b0d808a7 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -28,6 +28,7 @@ from typing import (
     Generic,
     Literal,
     TypeVar,
+    cast,
     overload,
 )
 
@@ -67,6 +68,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm.attributes import InstrumentedAttribute
     from sqlalchemy.sql import ColumnElement, Select
 
+
 T = TypeVar("T")
 
 
@@ -75,7 +77,7 @@ class BaseParam(OrmClause[T], ABC):
 
     def __init__(self, value: T | None = None, skip_none: bool = True) -> None:
         super().__init__(value)
-        self.attribute: ColumnElement | None = None
+        self.attribute: ColumnElement | InstrumentedAttribute | None = None
         self.skip_none = skip_none
 
     def set_value(self, value: T | None) -> Self:
@@ -387,7 +389,7 @@ class FilterParam(BaseParam[T]):
 
 
 def filter_param_factory(
-    attribute: ColumnElement,
+    attribute: ColumnElement | InstrumentedAttribute,
     _type: type,
     filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
     filter_name: str | None = None,
@@ -399,7 +401,7 @@ def filter_param_factory(
     description: str | None = None,
 ) -> Callable[[T | None], FilterParam[T | None]]:
     # if filter_name is not provided, use the attribute name as the default
-    filter_name = filter_name or attribute.name
+    filter_name = filter_name or getattr(attribute, "name", str(attribute))
     # can only set either default_value or default_factory
     query = (
         Query(alias=filter_name, default_factory=default_factory, 
description=description)
@@ -410,7 +412,9 @@ def filter_param_factory(
     def depends_filter(value: T | None = query) -> FilterParam[T | None]:
         if transform_callable:
             value = transform_callable(value)
-        return FilterParam(attribute, value, filter_option, skip_none)
+        # Cast to InstrumentedAttribute for type compatibility
+        attr = cast("InstrumentedAttribute", attribute)
+        return FilterParam(attr, value, filter_option, skip_none)
 
     # add type hint to value at runtime
     depends_filter.__annotations__["value"] = _type
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 37a5d761b30..5a08ed1c3b0 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -57,7 +57,7 @@ class DagRunWaiter:
             task_ids=self.result_task_ids,
             dag_ids=self.dag_id,
         )
-        xcom_query = 
self.session.scalars(xcom_query.order_by(XComModel.task_id, 
XComModel.map_index)).all()
+        xcom_results = 
self.session.scalars(xcom_query.order_by(XComModel.task_id, 
XComModel.map_index))
 
         def _group_xcoms(g: Iterator[XComModel]) -> Any:
             entries = list(g)
@@ -67,7 +67,7 @@ class DagRunWaiter:
 
         return {
             task_id: _group_xcoms(g)
-            for task_id, g in itertools.groupby(xcom_query, 
key=operator.attrgetter("task_id"))
+            for task_id, g in itertools.groupby(xcom_results, 
key=operator.attrgetter("task_id"))
         }
 
     def _serialize_response(self, dag_run: DagRun) -> str:
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
index 2d1a896542d..5f2c0e9c808 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
@@ -69,7 +69,7 @@ class TestGetDagRuns(TestPublicDagEndpoint):
                     triggered_by=DagRunTriggeredByType.TEST,
                 )
                 if dag_run.start_date is not None:
-                    dag_run.end_date = dag_run.start_date.add(hours=1)
+                    dag_run.end_date = dag_run.start_date + 
pendulum.duration(hours=1)
                 session.add(dag_run)
         session.commit()
 

Reply via email to