pierrejeambrun commented on code in PR #43947:
URL: https://github.com/apache/airflow/pull/43947#discussion_r1840803943
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or
self.get_primary_key_string()) -> SortParam
return inner
+class FilterOptionEnum(Enum):
+ """Filter options for FilterParam."""
+
+ EQUAL = "eq"
+ NOT_EQUAL = "ne"
+ LESS_THAN = "lt"
+ LESS_THAN_EQUAL = "le"
+ GREATER_THAN = "gt"
+ GREATER_THAN_EQUAL = "ge"
+ IN = "in"
+ NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+ """Filter on attribute."""
+
+ def __init__(
+ self,
+ attribute: ColumnElement,
+ value: T | None = None,
+ filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+ skip_none: bool = True,
+ ) -> None:
+ super().__init__(value, skip_none)
+ self.attribute: ColumnElement = attribute
+ self.value: T | None = value
+ self.filter_option: FilterOptionEnum = filter_option
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
Review Comment:
When you do the actually implementation, careful, some filters do not allow
to set `skip_none` to False, and raise an exception.
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or
self.get_primary_key_string()) -> SortParam
return inner
+class FilterOptionEnum(Enum):
+ """Filter options for FilterParam."""
+
+ EQUAL = "eq"
+ NOT_EQUAL = "ne"
+ LESS_THAN = "lt"
+ LESS_THAN_EQUAL = "le"
+ GREATER_THAN = "gt"
+ GREATER_THAN_EQUAL = "ge"
+ IN = "in"
+ NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+ """Filter on attribute."""
+
+ def __init__(
+ self,
+ attribute: ColumnElement,
+ value: T | None = None,
+ filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+ skip_none: bool = True,
+ ) -> None:
+ super().__init__(value, skip_none)
+ self.attribute: ColumnElement = attribute
+ self.value: T | None = value
+ self.filter_option: FilterOptionEnum = filter_option
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+
+ if isinstance(self.value, list):
+ if self.filter_option == FilterOptionEnum.IN:
+ return select.where(self.attribute.in_(self.value))
+ if self.filter_option == FilterOptionEnum.NOT_IN:
+ return select.where(self.attribute.notin_(self.value))
Review Comment:
I think for tags, pool, etc..., there something a bit more complicated with
```
conditions = [DagModel.tags.any(DagTag.name == tag) for tag in
self.value]
return select.where(or_(*conditions))
```
That is equivalent to a generic `ArrayContains`
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or
self.get_primary_key_string()) -> SortParam
return inner
+class FilterOptionEnum(Enum):
+ """Filter options for FilterParam."""
+
+ EQUAL = "eq"
+ NOT_EQUAL = "ne"
+ LESS_THAN = "lt"
+ LESS_THAN_EQUAL = "le"
+ GREATER_THAN = "gt"
+ GREATER_THAN_EQUAL = "ge"
+ IN = "in"
+ NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+ """Filter on attribute."""
+
+ def __init__(
+ self,
+ attribute: ColumnElement,
+ value: T | None = None,
+ filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+ skip_none: bool = True,
+ ) -> None:
+ super().__init__(value, skip_none)
+ self.attribute: ColumnElement = attribute
+ self.value: T | None = value
+ self.filter_option: FilterOptionEnum = filter_option
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+
+ if isinstance(self.value, list):
+ if self.filter_option == FilterOptionEnum.IN:
+ return select.where(self.attribute.in_(self.value))
+ if self.filter_option == FilterOptionEnum.NOT_IN:
+ return select.where(self.attribute.notin_(self.value))
+ raise HTTPException(
+ 400, f"Invalid filter option {self.filter_option} for list
value {self.value}"
+ )
+
+ if self.filter_option == FilterOptionEnum.EQUAL:
+ return select.where(self.attribute == self.value)
+ if self.filter_option == FilterOptionEnum.NOT_EQUAL:
+ return select.where(self.attribute != self.value)
+ if self.filter_option == FilterOptionEnum.LESS_THAN:
+ return select.where(self.attribute < self.value)
+ if self.filter_option == FilterOptionEnum.LESS_THAN_EQUAL:
+ return select.where(self.attribute <= self.value)
+ if self.filter_option == FilterOptionEnum.GREATER_THAN:
+ return select.where(self.attribute > self.value)
+ if self.filter_option == FilterOptionEnum.GREATER_THAN_EQUAL:
+ return select.where(self.attribute >= self.value)
+ raise ValueError(f"Invalid filter option {self.filter_option} for
value {self.value}")
+
+ def depends(self, *args: Any, **kwargs: Any) -> Self:
+ raise NotImplementedError("Use filter_param_factory instead , depends
is not implemented.")
Review Comment:
When we remove everything from the old system, we can remove that as well.
##########
airflow/api_fastapi/core_api/routes/public/event_logs.py:
##########
@@ -90,46 +96,47 @@ def get_event_logs(
).dynamic_depends()
),
],
- dag_id: str | None = None,
- task_id: str | None = None,
- run_id: str | None = None,
- map_index: int | None = None,
- try_number: int | None = None,
- owner: str | None = None,
- event: str | None = None,
- excluded_events: list[str] | None = Query(None),
- included_events: list[str] | None = Query(None),
- before: datetime | None = None,
- after: datetime | None = None,
+ dag_id: Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.dag_id))],
+ task_id: Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.task_id))],
+ run_id: Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.run_id))],
+ map_index: Annotated[FilterParam[int | None],
Depends(int_filter_param_factory(Log.map_index))],
+ try_number: Annotated[FilterParam[int | None],
Depends(int_filter_param_factory(Log.try_number))],
+ owner: Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.owner))],
+ event: Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.event))],
Review Comment:
This we need to think about. It's a little bit verbous and not super easy to
write. Maybe we can have only on meta `factory` that will call the appropriate
`type_factory` based on the given type ?
Maybe we can just extract that in to the parameter file, so we can just
reuse some:
```
LogTaskIdFilter = Annotated[FilterParam[str | None],
Depends(str_filter_param_factory(Log.dag_id))]
```
To not have to copy past that line between routes. Not sure, but we can try
to make this easier for the developer, that would be great.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]