pierrejeambrun commented on code in PR #45420:
URL: https://github.com/apache/airflow/pull/45420#discussion_r1906847776


##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
     session: SessionDep,
 ) -> DAGCollectionResponse:
     """Get all DAGs."""
+    dag_runs_select, total_entries = paginated_select(
+        statement=select(DagRun),
+        filters=[
+            dag_run_start_date_range,
+            dag_run_end_date_range,
+            dag_run_state,
+        ],
+        session=session,
+    )
+
     dags_select, total_entries = paginated_select(
-        statement=dags_select_with_latest_dag_run,
+        statement=generate_dag_select_query(

Review Comment:
   I think we should do that the other way around. First `generate` the base 
query with `generate_dag_select_query`. Then give it to the `paginated_select` 
wrapper. That will avoid having another base select `select(DagRun)` to provide 
to the paginated_select, and having the `stmt` arg in 
`generate_dag_select_query` 



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) -> 
FilterParam[T | None]:
     return depends_filter
 
 
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+    """Tag Filter Model with a match mode parameter."""
+
+    tags: list[str]
+    tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
     """Filter on tags."""
 
     def to_orm(self, select: Select) -> Select:
         if self.skip_none is False:
             raise ValueError(f"Cannot set 'skip_none' to False on a 
{type(self)}")
 
-        if not self.value:
+        if not self.value or not self.value.tags:
             return select
 
-        conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value]
-        return select.where(or_(*conditions))
+        if not self.value.tags_match_mode or self.value.tags_match_mode == 
"any":
+            conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value.tags]
+            return select.where(or_(*conditions))

Review Comment:
   This line is duplicated and can be taken out of the `if, else`.
   ```
   conditions = ...
   operator = or_
   
   
   return return select.where(operator(*conditions))
   ```



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) -> 
FilterParam[T | None]:
     return depends_filter
 
 
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+    """Tag Filter Model with a match mode parameter."""
+
+    tags: list[str]
+    tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
     """Filter on tags."""
 
     def to_orm(self, select: Select) -> Select:
         if self.skip_none is False:
             raise ValueError(f"Cannot set 'skip_none' to False on a 
{type(self)}")
 
-        if not self.value:
+        if not self.value or not self.value.tags:
             return select
 
-        conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value]
-        return select.where(or_(*conditions))
+        if not self.value.tags_match_mode or self.value.tags_match_mode == 
"any":
+            conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value.tags]
+            return select.where(or_(*conditions))
+        else:
+            conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value.tags]
+            return select.where(and_(*conditions))
 
-    def depends(self, tags: list[str] = Query(default_factory=list)) -> 
_TagsFilter:
-        return self.set_value(tags)
+    def depends(
+        self, tags: list[str] = Query(default_factory=list), tags_match_mode: 
str | None = Query(default=None)

Review Comment:
   ```suggestion
           self, tags: list[str] = Query(default_factory=list), 
tags_match_mode: str | None = None)
   ```
   
   Only list need explicit `Query` for the default_factor and because without 
additionnal information fastapi assume that's coming from the `body` and not 
from the query parameters.



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -324,21 +328,34 @@ def depends_filter(value: T | None = query) -> 
FilterParam[T | None]:
     return depends_filter
 
 
-class _TagsFilter(BaseParam[list[str]]):
+class _TagFilterModel(BaseModel):
+    """Tag Filter Model with a match mode parameter."""
+
+    tags: list[str]
+    tags_match_mode: str | None
+
+
+class _TagsFilter(BaseParam[_TagFilterModel]):
     """Filter on tags."""
 
     def to_orm(self, select: Select) -> Select:
         if self.skip_none is False:
             raise ValueError(f"Cannot set 'skip_none' to False on a 
{type(self)}")
 
-        if not self.value:
+        if not self.value or not self.value.tags:
             return select
 
-        conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value]
-        return select.where(or_(*conditions))
+        if not self.value.tags_match_mode or self.value.tags_match_mode == 
"any":
+            conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value.tags]
+            return select.where(or_(*conditions))
+        else:
+            conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value.tags]
+            return select.where(and_(*conditions))
 
-    def depends(self, tags: list[str] = Query(default_factory=list)) -> 
_TagsFilter:
-        return self.set_value(tags)
+    def depends(
+        self, tags: list[str] = Query(default_factory=list), tags_match_mode: 
str | None = Query(default=None)
+    ) -> _TagsFilter:

Review Comment:
   `tags_match_mode` shouldn't be a `string` but a litteral of `"any"` and 
`"all"`.



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -473,6 +490,13 @@ def depends_float(
     return depends_float
 
 
+def is_range_filter_active(range_filter: RangeFilter) -> bool:
+    """Check if a range filter has any active bounds."""
+    if range_filter is not None and range_filter.value is not None:
+        return range_filter.value.lower_bound is not None or 
range_filter.value.upper_bound is not None
+    return False

Review Comment:
   That should be a method of `RangeFilter` this way we avoid having a free 
function out there.
   
   `range_filter.is_active()`, or even have it as a property 
`range_filter.is_active`.



##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from sqlalchemy import func, select
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 
-latest_dag_run_per_dag_id_cte = (
-    select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
-    .where()
-    .group_by(DagRun.dag_id)
-    .cte()
-)
 
-
-dags_select_with_latest_dag_run = (
-    select(DagModel)
-    .join(
-        latest_dag_run_per_dag_id_cte,
-        DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
-        isouter=True,
+def generate_dag_select_query(
+    stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+    latest_dag_run_per_dag_id_cte = (
+        select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))

Review Comment:
   stmt is a bad name. `dag_runs_cte` 



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -69,6 +73,13 @@ def get_dags(
     only_active: QueryOnlyActiveFilter,
     paused: QueryPausedFilter,
     last_dag_run_state: QueryLastDagRunStateFilter,
+    dag_run_start_date_range: Annotated[
+        RangeFilter, Depends(datetime_range_filter_factory("start_date", 
DagRun))
+    ],
+    dag_run_end_date_range: Annotated[
+        RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))
+    ],
+    dag_run_state: QueryDagRunStateFilter,

Review Comment:
   this ends up in the query `&state=...`.
   
   I think it's confusing there's no state on the `Dag` resource. I think 
`&dag_run_state=` would be more explicit.
   
   I wouldn't reuse the `QueryDagRunStateFilter` and simply inline here a 
custom:
   ```python
   Annotated[
       FilterParam[list[str]],
       Depends(
           filter_param_factory(
               DagRun.state,
               list[str],
               FilterOptionEnum.ANY_EQUAL,
               "dag_run_state", # here is the alias, comment to delete.
               default_factory=list,
               transform_callable=_transform_dag_run_states,
           )
       ),
   ]
   
   ```



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
     session: SessionDep,
 ) -> DAGCollectionResponse:
     """Get all DAGs."""
+    dag_runs_select, total_entries = paginated_select(

Review Comment:
   ```suggestion
       dag_runs_select, _ = paginated_select(
   ```



##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from sqlalchemy import func, select
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 
-latest_dag_run_per_dag_id_cte = (
-    select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
-    .where()
-    .group_by(DagRun.dag_id)
-    .cte()
-)
 
-
-dags_select_with_latest_dag_run = (
-    select(DagModel)
-    .join(
-        latest_dag_run_per_dag_id_cte,
-        DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
-        isouter=True,
+def generate_dag_select_query(
+    stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:

Review Comment:
   +1 for that.



##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from sqlalchemy import func, select
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 
-latest_dag_run_per_dag_id_cte = (
-    select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
-    .where()
-    .group_by(DagRun.dag_id)
-    .cte()
-)
 
-
-dags_select_with_latest_dag_run = (
-    select(DagModel)
-    .join(
-        latest_dag_run_per_dag_id_cte,
-        DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
-        isouter=True,
+def generate_dag_select_query(
+    stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+    latest_dag_run_per_dag_id_cte = (
+        select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))
+        .where()
+        .group_by(stmt.c.dag_id)
+        .cte()

Review Comment:
   We should have the original `latest_dag_run_per_dag_id_cte` on one hand.
   
   And another `historically_filtered_dag_run_per_dag_id_cte` or something like 
that. Naming is not accurate but you get the idea.
   
   And doing the appropriate join with that extra historical CTE.



##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from sqlalchemy import func, select
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 
-latest_dag_run_per_dag_id_cte = (
-    select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
-    .where()
-    .group_by(DagRun.dag_id)
-    .cte()
-)
 
-
-dags_select_with_latest_dag_run = (
-    select(DagModel)
-    .join(
-        latest_dag_run_per_dag_id_cte,
-        DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
-        isouter=True,
+def generate_dag_select_query(
+    stmt: Select = select(DagRun).where().cte(), use_outer_join: bool = True
+) -> Select:
+    latest_dag_run_per_dag_id_cte = (
+        select(stmt.c.dag_id, func.max(stmt.c.start_date).label("start_date"))
+        .where()
+        .group_by(stmt.c.dag_id)
+        .cte()

Review Comment:
   I don't think latest_dag_run and other `dag_run_*` filter should interfere.
   
   Here the `last_dag_run_state` operates on the result set of DagRuns filtered 
run on:
   - dag_run_start_date_range
   - dag_run_start_end_range
   - dag_run_state
   
   Which I think it shouldn't. `latest_dag_run` should always be on the latest 
dagrun. unrelated of the dagrun other filters on 'historical' data.
   
   If for instance we provide both:
   - last_dag_run_state
   - dag_run_start_date_range
   
   User might want "Latest dag run state is something, and has dag_run within 
that range", While it actually does "Latest dag run state **within that range** 
is something, and has dag runs within that range". We should add a test for 
that.
   
   Also therefore when`dag_run_state` is used, `latest_dag_run_state` does not 
make any sense. pre-filter on 'running' dagruns, and then we try to get those 
that have their last dag_run in `failed` for instance this will always return 
None I think



##########
airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -82,8 +93,25 @@ def get_dags(
     session: SessionDep,
 ) -> DAGCollectionResponse:
     """Get all DAGs."""
+    dag_runs_select, total_entries = paginated_select(
+        statement=select(DagRun),
+        filters=[
+            dag_run_start_date_range,
+            dag_run_end_date_range,
+            dag_run_state,
+        ],
+        session=session,
+    )
+
     dags_select, total_entries = paginated_select(
-        statement=dags_select_with_latest_dag_run,
+        statement=generate_dag_select_query(
+            dag_runs_select.cte(),
+            use_outer_join=not (
+                is_range_filter_active(dag_run_start_date_range)
+                or is_range_filter_active(dag_run_end_date_range)
+            )

Review Comment:
   This can be improved I think.
   
   Basically moving the check earlier to know if we have some `dag_run_*` 
filters or not:
   
   ```python
   dag_runs_select = None
   
   if "any dag run filters":
       dag_runs_select, _ = paginated...
       dag_runs_select = dag_runs_select.cte()
   ```
   
   Then you can provide the `dag_runs_select` to the 
`generate_dag_select_query` and depending if `dag_runs_select` is None or not 
act on it. If `dag_runs_select` is None, no join to make in the function, if is 
not None, then we need to do the inner join. This get rid of the 
`use_outer_join` param.



##########
airflow/api_fastapi/common/db/dags.py:
##########
@@ -17,31 +17,40 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from sqlalchemy import func, select
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql import Select
+
 from airflow.models.dag import DagModel
 from airflow.models.dagrun import DagRun
 
-latest_dag_run_per_dag_id_cte = (
-    select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
-    .where()
-    .group_by(DagRun.dag_id)
-    .cte()
-)
 
-
-dags_select_with_latest_dag_run = (
-    select(DagModel)
-    .join(
-        latest_dag_run_per_dag_id_cte,
-        DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
-        isouter=True,
+def generate_dag_select_query(

Review Comment:
   Function name should be improved. That does not just generate `any` 
dag_select query. But a very specific one. Name should be more descriptive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to