ashb commented on code in PR #37436:
URL: https://github.com/apache/airflow/pull/37436#discussion_r1699925294


##########
airflow/www/views.py:
##########
@@ -839,21 +841,92 @@ def index(self):
         with create_session() as session:
             # read orm_dags from the db
             dags_query = select(DagModel).where(~DagModel.is_subdag, 
DagModel.is_active)
-
+            dags_query = dags_query.where(DagModel.dag_id.in_(filter_dag_ids))
             if arg_search_query:
-                escaped_arg_search_query = arg_search_query.replace("_", r"\_")
-                dags_query = dags_query.where(
-                    DagModel.dag_id.ilike("%" + escaped_arg_search_query + 
"%", escape="\\")
-                    | DagModel._dag_display_property_value.ilike(
-                        "%" + escaped_arg_search_query + "%", escape="\\"
+                prefix_search_match = re2.match(r"(?i)(dag|owner|task):\s*", 
arg_search_query)
+                if prefix_search_match:
+                    query_prefix = prefix_search_match[0].lower()
+                    query_value = arg_search_query[len(query_prefix) :]
+                    if query_value:
+                        escaped_query_value = "%" + query_value.replace("_", 
r"\_") + "%"
+                        if query_prefix.startswith("task:"):
+                            if session.bind.dialect.name == "mysql":
+                                dag_id_to_task_subq = (
+                                    select(SerializedDagModel.dag_id)
+                                    .where(
+                                        func.regexp_like(
+                                            func.json_keys(
+                                                func.json_extract(
+                                                    SerializedDagModel._data, 
"$.dag._task_group.children"
+                                                )
+                                            ),
+                                            f".*{query_value}.*",
+                                            "i",
+                                        )
+                                    )
+                                    .subquery()
+                                )
+                                dags_query = dags_query.join(
+                                    dag_id_to_task_subq, DagModel.dag_id == 
dag_id_to_task_subq.c.dag_id
+                                )
+                            elif session.bind.dialect.name == "postgresql":
+                                dag_id_to_task_subq = select(
+                                    SerializedDagModel.dag_id,
+                                    cast(
+                                        func.json_extract_path(
+                                            func.json_array_elements(
+                                                func.json_extract_path(
+                                                    SerializedDagModel._data, 
"dag", "tasks"
+                                                )
+                                            ),
+                                            "__var",
+                                            "task_id",
+                                        ),
+                                        String,
+                                    ).label("task_id"),
+                                ).subquery()
+                                dags_query = dags_query.join(
+                                    dag_id_to_task_subq, DagModel.dag_id == 
dag_id_to_task_subq.c.dag_id
+                                
).where(dag_id_to_task_subq.c.task_id.ilike(escaped_query_value, escape="\\"))
+                            else:
+                                filtered_dag_models = 
session.scalars(dags_query).all()
+                                dag_bag = get_airflow_app().dag_bag
+                                filtered_dags = [
+                                    dag_bag.get_dag(DM.dag_id, 
session=session) for DM in filtered_dag_models
+                                ]
+                                filtered_by_tasks_dag_ids = [
+                                    dag.dag_id
+                                    for dag in filtered_dags
+                                    if [
+                                        task
+                                        for task in dag.tasks
+                                        if re2.search(r"(?i)" + query_value, 
task.task_id)
+                                    ]
+                                ]
+                                dags_query = 
dags_query.where(DagModel.dag_id.in_(filtered_by_tasks_dag_ids))
+                        else:
+                            if query_prefix.startswith("dag:"):
+                                dags_query = dags_query.where(
+                                    DagModel.dag_id.ilike(escaped_query_value, 
escape="\\")
+                                    | 
DagModel._dag_display_property_value.ilike(
+                                        escaped_query_value, escape="\\"
+                                    )
+                                )
+                            elif query_prefix.startswith("dag:"):

Review Comment:
   ```
                               if query_prefix.startswith("dag:"):
                                   ...
                               elif query_prefix.startswith("dag:"):
   ```
   
   Something seems wrong here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to