jason810496 commented on code in PR #58092:
URL: https://github.com/apache/airflow/pull/58092#discussion_r2517532839


##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -184,6 +184,44 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self:
         raise NotImplementedError("Use search_param_factory instead , depends 
is not implemented.")
 
 
+class QueryTITaskDisplayNameOrGroupPattern(_SearchParam):
+    """Task display name search with task group expansion."""
+
+    def __init__(self, attribute: ColumnElement, dag=None, skip_none: bool = 
True):
+        super().__init__(attribute, skip_none)
+        self.dag = dag
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        conditions = [self.attribute.ilike(f"%{self.value}%")]
+
+        # Task group expansion: check if value matches a task group ID
+        if self.dag and hasattr(self.dag, "task_group"):
+            task_groups = self.dag.task_group.get_task_group_dict()
+            if target_group := task_groups.get(self.value):

Review Comment:
   If we are searching by pattern, should we get all key contain the pattern 
instead of directly get by value?



##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -184,6 +184,44 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self:
         raise NotImplementedError("Use search_param_factory instead , depends 
is not implemented.")
 
 
+class QueryTITaskDisplayNameOrGroupPattern(_SearchParam):
+    """Task display name search with task group expansion."""
+
+    def __init__(self, attribute: ColumnElement, dag=None, skip_none: bool = 
True):
+        super().__init__(attribute, skip_none)
+        self.dag = dag
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        conditions = [self.attribute.ilike(f"%{self.value}%")]
+
+        # Task group expansion: check if value matches a task group ID
+        if self.dag and hasattr(self.dag, "task_group"):
+            task_groups = self.dag.task_group.get_task_group_dict()
+            if target_group := task_groups.get(self.value):
+                task_ids = [task.task_id for task in target_group.iter_tasks()]
+                conditions.append(TaskInstance.task_id.in_(task_ids))
+
+        return select.where(or_(*conditions))
+
+    @classmethod
+    def depends(
+        cls,
+        value: str | None = Query(
+            alias="task_display_name_pattern",
+            default=None,
+            description="SQL LIKE expression to filter by task display name — 
use `%` / `_` wildcards "
+            "(e.g. `%customer_%`). Regular expressions are **not** supported. "
+            "Can also be a task group ID to filter by all tasks within that 
group.",

Review Comment:
   Or maybe we could have another filter just for task group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to