This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 47a1eb85a26 Fix MyPy type errors in api_fastapi remaining files for
Sqlalchemy 2 migration (#57466)
47a1eb85a26 is described below
commit 47a1eb85a26aef8eccfebacc6b8874dec7e612aa
Author: Anusha Kovi <[email protected]>
AuthorDate: Fri Oct 31 21:34:32 2025 +0530
Fix MyPy type errors in api_fastapi remaining files for Sqlalchemy 2
migration (#57466)
* Fix MyPy type errors in api_fastapi remaining files for Sqlalchemy 2
migration
* add imports at the top
* remove duplicate import
---
airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py | 5 ++++-
airflow-core/src/airflow/api_fastapi/common/parameters.py | 12 ++++++++----
.../airflow/api_fastapi/core_api/services/public/dag_run.py | 4 ++--
.../tests/unit/api_fastapi/core_api/routes/ui/test_dags.py | 2 +-
4 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 1ca5f1d261b..2efb6967653 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -17,7 +17,10 @@
from __future__ import annotations
+from typing import cast
+
from sqlalchemy import func, select
+from sqlalchemy.sql import ColumnElement
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
@@ -27,7 +30,7 @@ dagruns_select_with_state_count = (
DagRun.dag_id,
DagRun.state,
DagModel.dag_display_name,
- func.count(DagRun.state),
+ cast("ColumnElement[int]", func.count(DagRun.state).label("count")),
)
.join(DagModel, DagRun.dag_id == DagModel.dag_id)
.group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index 6674dd55fef..dd7b0d808a7 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -28,6 +28,7 @@ from typing import (
Generic,
Literal,
TypeVar,
+ cast,
overload,
)
@@ -67,6 +68,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql import ColumnElement, Select
+
T = TypeVar("T")
@@ -75,7 +77,7 @@ class BaseParam(OrmClause[T], ABC):
def __init__(self, value: T | None = None, skip_none: bool = True) -> None:
super().__init__(value)
- self.attribute: ColumnElement | None = None
+ self.attribute: ColumnElement | InstrumentedAttribute | None = None
self.skip_none = skip_none
def set_value(self, value: T | None) -> Self:
@@ -387,7 +389,7 @@ class FilterParam(BaseParam[T]):
def filter_param_factory(
- attribute: ColumnElement,
+ attribute: ColumnElement | InstrumentedAttribute,
_type: type,
filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
filter_name: str | None = None,
@@ -399,7 +401,7 @@ def filter_param_factory(
description: str | None = None,
) -> Callable[[T | None], FilterParam[T | None]]:
# if filter_name is not provided, use the attribute name as the default
- filter_name = filter_name or attribute.name
+ filter_name = filter_name or getattr(attribute, "name", str(attribute))
# can only set either default_value or default_factory
query = (
Query(alias=filter_name, default_factory=default_factory,
description=description)
@@ -410,7 +412,9 @@ def filter_param_factory(
def depends_filter(value: T | None = query) -> FilterParam[T | None]:
if transform_callable:
value = transform_callable(value)
- return FilterParam(attribute, value, filter_option, skip_none)
+ # Cast to InstrumentedAttribute for type compatibility
+ attr = cast("InstrumentedAttribute", attribute)
+ return FilterParam(attr, value, filter_option, skip_none)
# add type hint to value at runtime
depends_filter.__annotations__["value"] = _type
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index 37a5d761b30..5a08ed1c3b0 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -57,7 +57,7 @@ class DagRunWaiter:
task_ids=self.result_task_ids,
dag_ids=self.dag_id,
)
- xcom_query =
self.session.scalars(xcom_query.order_by(XComModel.task_id,
XComModel.map_index)).all()
+ xcom_results =
self.session.scalars(xcom_query.order_by(XComModel.task_id,
XComModel.map_index))
def _group_xcoms(g: Iterator[XComModel]) -> Any:
entries = list(g)
@@ -67,7 +67,7 @@ class DagRunWaiter:
return {
task_id: _group_xcoms(g)
- for task_id, g in itertools.groupby(xcom_query,
key=operator.attrgetter("task_id"))
+ for task_id, g in itertools.groupby(xcom_results,
key=operator.attrgetter("task_id"))
}
def _serialize_response(self, dag_run: DagRun) -> str:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
index 2d1a896542d..5f2c0e9c808 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
@@ -69,7 +69,7 @@ class TestGetDagRuns(TestPublicDagEndpoint):
triggered_by=DagRunTriggeredByType.TEST,
)
if dag_run.start_date is not None:
- dag_run.end_date = dag_run.start_date.add(hours=1)
+ dag_run.end_date = dag_run.start_date +
pendulum.duration(hours=1)
session.add(dag_run)
session.commit()