pierrejeambrun commented on code in PR #43407:
URL: https://github.com/apache/airflow/pull/43407#discussion_r1822800070
##########
airflow/api_fastapi/common/parameters.py:
##########
@@ -219,6 +219,55 @@ def inner(order_by: str = self.get_primary_key_string())
-> SortParam:
return inner
+_filter_options = Literal["in", "not_in", "eq", "ne", "lt", "le", "gt", "ge"]
+
+
+class FilterParam(BaseParam[T]):
+ """Filter on attribute."""
+
+ def __init__(
+ self,
+ attribute: ColumnElement,
+ value: T | None = None,
+ filter_option: _filter_options = "eq",
+ skip_none: bool = True,
+ ) -> None:
+ super().__init__(skip_none)
+ self.attribute: ColumnElement = attribute
+ self.value: T | None = value
+ self.filter_option: _filter_options = filter_option
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+
+ if isinstance(self.value, list):
+ if self.filter_option == "in":
+ return select.where(self.attribute.in_(self.value))
+ if self.filter_option == "not_in":
+ return select.where(self.attribute.notin_(self.value))
+ raise ValueError(f"Invalid filter option {self.filter_option} for
list value {self.value}")
+
+ if self.filter_option == "eq":
+ return select.where(self.attribute == self.value)
+ if self.filter_option == "ne":
+ return select.where(self.attribute != self.value)
+ if self.filter_option == "lt":
+ return select.where(self.attribute < self.value)
+ if self.filter_option == "le":
+ return select.where(self.attribute <= self.value)
+ if self.filter_option == "gt":
+ return select.where(self.attribute > self.value)
+ if self.filter_option == "ge":
+ return select.where(self.attribute >= self.value)
+ raise ValueError(f"Invalid filter option {self.filter_option} for
value {self.value}")
+
+ def depends(self, *args: Any, **kwargs: Any) -> Self:
+ raise NotImplementedError(
+ "Construct FilterParam directly within the router handler, depends
is not implemented."
+ )
Review Comment:
I think that with a `dynamic_depends` cf others parameters, dependency
injection might work. And it will be more consistent with the rest of the
application.
##########
airflow/api_fastapi/core_api/routes/public/event_logs.py:
##########
@@ -49,3 +59,75 @@ async def get_event_log(
event_log,
from_attributes=True,
)
+
+
+@event_logs_router.get("/", responses=create_openapi_http_exception_doc([401,
403]))
+async def get_event_logs(
+ limit: QueryLimit,
+ offset: QueryOffset,
+ session: Annotated[Session, Depends(get_session)],
+ order_by: Annotated[
+ SortParam,
+ Depends(
+ SortParam(
+ [
+ "id", # event_log_id
+ "dttm", # when
+ "dag_id",
+ "task_id",
+ "run_id",
+ "event",
+ "execution_date", # logical_date
+ "owner",
+ "extra",
+ ],
+ Log,
+ ).dynamic_depends()
+ ),
+ ],
+ dag_id: str | None = None,
+ task_id: str | None = None,
+ run_id: str | None = None,
+ map_index: int | None = None,
+ try_number: int | None = None,
+ owner: str | None = None,
+ event: str | None = None,
+ excluded_events: list[str] | None = Query(None),
+ included_events: list[str] | None = Query(None),
+ before: datetime | None = None,
+ after: datetime | None = None,
+) -> EventLogCollectionResponse:
+ """Get all Event Logs."""
+ base_select = select(Log).group_by(Log.id)
+ event_logs_select, total_entries = paginated_select(
+ base_select,
+ [
+ FilterParam(Log.dag_id, dag_id),
+ FilterParam(Log.task_id, task_id),
+ FilterParam(Log.run_id, run_id),
+ FilterParam(Log.map_index, map_index),
+ FilterParam(Log.event, event),
+ FilterParam(Log.try_number, try_number),
+ FilterParam(Log.owner, owner),
+ FilterParam(Log.event, excluded_events, "not_in"),
Review Comment:
Maybe we can create a concrete enum instead of a literal to avoid having
string duplicated all over the place for `not_in`, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]