pierrejeambrun commented on code in PR #45420:
URL: https://github.com/apache/airflow/pull/45420#discussion_r1906847776
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
session: SessionDep,
) -> DAGCollectionResponse:
"""Get all DAGs."""
+ dag_runs_select, total_entries = paginated_select(
+ statement=select(DagRun),
+ filters=[
+ dag_run_start_date_range,
+ dag_run_end_date_range,
+ dag_run_state,
+ ],
+ session=session,
+ )
+
dags_select, total_entries = paginated_select(
- statement=dags_select_with_latest_dag_run,
+ statement=generate_dag_select_query(
Review Comment:
I think we should do that the other way around. First `generate` the base
query with `generate_dag_select_query`. Then give it to the `paginated_select`
wrapper. That will avoid having another base select `select(DagRun)` to provide
to the paginated_select, and having the `stmt` arg in
`generate_dag_select_query`
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) ->
FilterParam[T | None]:
return depends_filter
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+ """Tag Filter Model with a match mode parameter."""
+
+ tags: list[str]
+ tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
"""Filter on tags."""
def to_orm(self, select: Select) -> Select:
if self.skip_none is False:
raise ValueError(f"Cannot set 'skip_none' to False on a
{type(self)}")
- if not self.value:
+ if not self.value or not self.value.tags:
return select
- conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value]
- return select.where(or_(*conditions))
+ if not self.value.tags_match_mode or self.value.tags_match_mode ==
"any":
+ conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value.tags]
+ return select.where(or_(*conditions))
Review Comment:
This line is duplicated and can be taken out of the `if, else`.
```
conditions = ...
operator = or_
return return select.where(operator(*conditions))
```
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) ->
FilterParam[T | None]:
return depends_filter
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+ """Tag Filter Model with a match mode parameter."""
+
+ tags: list[str]
+ tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
"""Filter on tags."""
def to_orm(self, select: Select) -> Select:
if self.skip_none is False:
raise ValueError(f"Cannot set 'skip_none' to False on a
{type(self)}")
- if not self.value:
+ if not self.value or not self.value.tags:
return select
- conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value]
- return select.where(or_(*conditions))
+ if not self.value.tags_match_mode or self.value.tags_match_mode ==
"any":
+ conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value.tags]
+ return select.where(or_(*conditions))
+ else:
+ conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value.tags]
+ return select.where(and_(*conditions))
- def depends(self, tags: list[str] = Query(default_factory=list)) ->
_TagsFilter:
- return self.set_value(tags)
+ def depends(
+ self, tags: list[str] = Query(default_factory=list), tags_match_mode:
str | None = Query(default=None)
Review Comment:
```suggestion
self, tags: list[str] = Query(default_factory=list),
tags_match_mode: str | None = None)
```
Only list need explicit `Query` for the default_factor and because without
additionnal information fastapi assume that's coming from the `body` and not
from the query parameters.
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) ->
FilterParam[T | None]:
return depends_filter
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+ """Tag Filter Model with a match mode parameter."""
+
+ tags: list[str]
+ tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
"""Filter on tags."""
def to_orm(self, select: Select) -> Select:
if self.skip_none is False:
raise ValueError(f"Cannot set 'skip_none' to False on a
{type(self)}")
- if not self.value:
+ if not self.value or not self.value.tags:
return select
- conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value]
- return select.where(or_(*conditions))
+ if not self.value.tags_match_mode or self.value.tags_match_mode ==
"any":
+ conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value.tags]
+ return select.where(or_(*conditions))
+ else:
+ conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value.tags]
+ return select.where(and_(*conditions))
- def depends(self, tags: list[str] = Query(default_factory=list)) ->
_TagsFilter:
- return self.set_value(tags)
+ def depends(
+ self, tags: list[str] = Query(default_factory=list), tags_match_mode:
str | None = Query(default=None)
+ ) -> _TagsFilter:
Review Comment:
`tags_match_mode` shouldn't be a `string` but a litteral of `"any"` and
`"all"`.
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -473,6 +490,13 @@ def depends_float(
return depends_float
+def is_range_filter_active(range_filter: RangeFilter) -> bool:
+ """Check if a range filter has any active bounds."""
+ if range_filter is not None and range_filter.value is not None:
+ return range_filter.value.lower_bound is not None or
range_filter.value.upper_bound is not None
+ return False
Review Comment:
That should be a method of `RangeFilter` this way we avoid having a free
function out there.
`range_filter.is_active()`, or even have it as a property
`range_filter.is_active`.
##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from sqlalchemy import func, select
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
-latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
-)
-
-dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
+def generate_dag_select_query(
+ stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+ latest_dag_run_per_dag_id_cte = (
+ select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))
Review Comment:
stmt is a bad name. `dag_runs_cte`
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -69,6 +73,13 @@ def get_dags(
only_active: QueryOnlyActiveFilter,
paused: QueryPausedFilter,
last_dag_run_state: QueryLastDagRunStateFilter,
+ dag_run_start_date_range: Annotated[
+ RangeFilter, Depends(datetime_range_filter_factory("start_date",
DagRun))
+ ],
+ dag_run_end_date_range: Annotated[
+ RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))
+ ],
+ dag_run_state: QueryDagRunStateFilter,
Review Comment:
this ends up in the query `&state=...`.
I think it's confusing there's no state on the `Dag` resource. I think
`&dag_run_state=` would be more explicit.
I wouldn't reuse the `QueryDagRunStateFilter` and simply inline here a
custom:
```python
Annotated[
FilterParam[list[str]],
Depends(
filter_param_factory(
DagRun.state,
list[str],
FilterOptionEnum.ANY_EQUAL,
"dag_run_state", # here is the alias, comment to delete.
default_factory=list,
transform_callable=_transform_dag_run_states,
)
),
]
```
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
session: SessionDep,
) -> DAGCollectionResponse:
"""Get all DAGs."""
+ dag_runs_select, total_entries = paginated_select(
Review Comment:
```suggestion
dag_runs_select, _ = paginated_select(
```
##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from sqlalchemy import func, select
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
-latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
-)
-
-dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
+def generate_dag_select_query(
+ stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
Review Comment:
+1 for that.
##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from sqlalchemy import func, select
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
-latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
-)
-
-dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
+def generate_dag_select_query(
+ stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+ latest_dag_run_per_dag_id_cte = (
+ select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))
+ .where()
+ .group_by(stmt.c.dag_id)
+ .cte()
Review Comment:
We should have the original `latest_dag_run_per_dag_id_cte` on one hand.
And another `historically_filtered_dag_run_per_dag_id_cte` or something like
that. Naming is not accurate but you get the idea.
And doing the appropriate join with that extra historical CTE.
##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from sqlalchemy import func, select
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
-latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
-)
-
-dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
+def generate_dag_select_query(
+ stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+ latest_dag_run_per_dag_id_cte = (
+ select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))
+ .where()
+ .group_by(stmt.c.dag_id)
+ .cte()
Review Comment:
I don't think latest_dag_run and other `dag_run_*` filter should interfere.
Here the `last_dag_run_state` operates on the result set of DagRuns filtered
run on:
- dag_run_start_date_range
- dag_run_start_end_range
- dag_run_state
Which I think it shouldn't. `latest_dag_run` should always be on the latest
dagrun. unrelated of the dagrun other filters on 'historical' data.
If for instance we provide both:
- last_dag_run_state
- dag_run_start_date_range
User might want "Latest dag run state is something, and has dag_run within
that range", While it actually does "Latest dag run state **within that range**
is something, and has dag runs within that range". We should add a test for
that.
Also therefore when`dag_run_state` is used, `latest_dag_run_state` does not
make any sense. pre-filter on 'running' dagruns, and then we try to get those
that have their last dag_run in `failed` for instance this will always return
None I think
##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
session: SessionDep,
) -> DAGCollectionResponse:
"""Get all DAGs."""
+ dag_runs_select, total_entries = paginated_select(
+ statement=select(DagRun),
+ filters=[
+ dag_run_start_date_range,
+ dag_run_end_date_range,
+ dag_run_state,
+ ],
+ session=session,
+ )
+
dags_select, total_entries = paginated_select(
- statement=dags_select_with_latest_dag_run,
+ statement=generate_dag_select_query(
+ dag_runs_select.cte(),
+ use_outer_join=not (
+ is_range_filter_active(dag_run_start_date_range)
+ or is_range_filter_active(dag_run_end_date_range)
+ )
Review Comment:
This can be improved I think.
Basically moving the check earlier to know if we have some `dag_run_*`
filters or not:
```python
dag_runs_select = None
if "any dag run filters":
dag_runs_select, _ = paginated...
dag_runs_select = dag_runs_select.cte()
```
Then you can provide the `dag_runs_select` to the
`generate_dag_select_query` and depending if `dag_runs_select` is None or not
act on it. If `dag_runs_select` is None, no join to make in the function, if is
not None, then we need to do the inner join. This get rid of the
`use_outer_join` param.
##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from sqlalchemy import func, select
+if TYPE_CHECKING:
+ from sqlalchemy.sql import Select
+
from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun
-latest_dag_run_per_dag_id_cte = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where()
- .group_by(DagRun.dag_id)
- .cte()
-)
-
-dags_select_with_latest_dag_run = (
- select(DagModel)
- .join(
- latest_dag_run_per_dag_id_cte,
- DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
- isouter=True,
+def generate_dag_select_query(
Review Comment:
Function name should be improved. That does not just generate `any`
dag_select query. But a very specific one. Name should be more descriptive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]