pierrejeambrun commented on code in PR #43947:
URL: https://github.com/apache/airflow/pull/43947#discussion_r1840803943


##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or 
self.get_primary_key_string()) -> SortParam
         return inner
 
 
+class FilterOptionEnum(Enum):
+    """Filter options for FilterParam."""
+
+    EQUAL = "eq"
+    NOT_EQUAL = "ne"
+    LESS_THAN = "lt"
+    LESS_THAN_EQUAL = "le"
+    GREATER_THAN = "gt"
+    GREATER_THAN_EQUAL = "ge"
+    IN = "in"
+    NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+    """Filter on attribute."""
+
+    def __init__(
+        self,
+        attribute: ColumnElement,
+        value: T | None = None,
+        filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+        skip_none: bool = True,
+    ) -> None:
+        super().__init__(value, skip_none)
+        self.attribute: ColumnElement = attribute
+        self.value: T | None = value
+        self.filter_option: FilterOptionEnum = filter_option
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:

Review Comment:
   When you do the actually implementation, careful, some filters do not allow 
to set `skip_none` to False, and raise an exception.



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or 
self.get_primary_key_string()) -> SortParam
         return inner
 
 
+class FilterOptionEnum(Enum):
+    """Filter options for FilterParam."""
+
+    EQUAL = "eq"
+    NOT_EQUAL = "ne"
+    LESS_THAN = "lt"
+    LESS_THAN_EQUAL = "le"
+    GREATER_THAN = "gt"
+    GREATER_THAN_EQUAL = "ge"
+    IN = "in"
+    NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+    """Filter on attribute."""
+
+    def __init__(
+        self,
+        attribute: ColumnElement,
+        value: T | None = None,
+        filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+        skip_none: bool = True,
+    ) -> None:
+        super().__init__(value, skip_none)
+        self.attribute: ColumnElement = attribute
+        self.value: T | None = value
+        self.filter_option: FilterOptionEnum = filter_option
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        if isinstance(self.value, list):
+            if self.filter_option == FilterOptionEnum.IN:
+                return select.where(self.attribute.in_(self.value))
+            if self.filter_option == FilterOptionEnum.NOT_IN:
+                return select.where(self.attribute.notin_(self.value))

Review Comment:
   I think for tags, pool, etc..., there something a bit more complicated with 
   ```
           conditions = [DagModel.tags.any(DagTag.name == tag) for tag in 
self.value]
           return select.where(or_(*conditions))
   ```
   
   That is equivalent to a generic `ArrayContains`



##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -239,6 +240,146 @@ def inner(order_by: str = default or 
self.get_primary_key_string()) -> SortParam
         return inner
 
 
+class FilterOptionEnum(Enum):
+    """Filter options for FilterParam."""
+
+    EQUAL = "eq"
+    NOT_EQUAL = "ne"
+    LESS_THAN = "lt"
+    LESS_THAN_EQUAL = "le"
+    GREATER_THAN = "gt"
+    GREATER_THAN_EQUAL = "ge"
+    IN = "in"
+    NOT_IN = "not_in"
+
+
+class FilterParam(BaseParam[T]):
+    """Filter on attribute."""
+
+    def __init__(
+        self,
+        attribute: ColumnElement,
+        value: T | None = None,
+        filter_option: FilterOptionEnum = FilterOptionEnum.EQUAL,
+        skip_none: bool = True,
+    ) -> None:
+        super().__init__(value, skip_none)
+        self.attribute: ColumnElement = attribute
+        self.value: T | None = value
+        self.filter_option: FilterOptionEnum = filter_option
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        if isinstance(self.value, list):
+            if self.filter_option == FilterOptionEnum.IN:
+                return select.where(self.attribute.in_(self.value))
+            if self.filter_option == FilterOptionEnum.NOT_IN:
+                return select.where(self.attribute.notin_(self.value))
+            raise HTTPException(
+                400, f"Invalid filter option {self.filter_option} for list 
value {self.value}"
+            )
+
+        if self.filter_option == FilterOptionEnum.EQUAL:
+            return select.where(self.attribute == self.value)
+        if self.filter_option == FilterOptionEnum.NOT_EQUAL:
+            return select.where(self.attribute != self.value)
+        if self.filter_option == FilterOptionEnum.LESS_THAN:
+            return select.where(self.attribute < self.value)
+        if self.filter_option == FilterOptionEnum.LESS_THAN_EQUAL:
+            return select.where(self.attribute <= self.value)
+        if self.filter_option == FilterOptionEnum.GREATER_THAN:
+            return select.where(self.attribute > self.value)
+        if self.filter_option == FilterOptionEnum.GREATER_THAN_EQUAL:
+            return select.where(self.attribute >= self.value)
+        raise ValueError(f"Invalid filter option {self.filter_option} for 
value {self.value}")
+
+    def depends(self, *args: Any, **kwargs: Any) -> Self:
+        raise NotImplementedError("Use filter_param_factory instead , depends 
is not implemented.")

Review Comment:
   When we remove everything from the old system, we can remove that as well.



##########
airflow/api_fastapi/core_api/routes/public/event_logs.py:
##########
@@ -90,46 +96,47 @@ def get_event_logs(
             ).dynamic_depends()
         ),
     ],
-    dag_id: str | None = None,
-    task_id: str | None = None,
-    run_id: str | None = None,
-    map_index: int | None = None,
-    try_number: int | None = None,
-    owner: str | None = None,
-    event: str | None = None,
-    excluded_events: list[str] | None = Query(None),
-    included_events: list[str] | None = Query(None),
-    before: datetime | None = None,
-    after: datetime | None = None,
+    dag_id: Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.dag_id))],
+    task_id: Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.task_id))],
+    run_id: Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.run_id))],
+    map_index: Annotated[FilterParam[int | None], 
Depends(int_filter_param_factory(Log.map_index))],
+    try_number: Annotated[FilterParam[int | None], 
Depends(int_filter_param_factory(Log.try_number))],
+    owner: Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.owner))],
+    event: Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.event))],

Review Comment:
   This we need to think about. It's a little bit verbous and not super easy to 
write. Maybe we can have only on meta `factory` that will call the appropriate 
`type_factory` based on the given type ?
   
   Maybe we can just extract that in to the parameter file, so we can just 
reuse some:
   ```
   LogTaskIdFilter = Annotated[FilterParam[str | None], 
Depends(str_filter_param_factory(Log.dag_id))]
   ```
   
   To not have to copy past that line between routes. Not sure, but we can try 
to make this easier for the developer, that would be great.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to