pierrejeambrun commented on code in PR #56022:
URL: https://github.com/apache/airflow/pull/56022#discussion_r2459532917


##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -815,6 +817,93 @@ def _transform_dag_run_types(types: list[str] | None) -> 
list[DagRunType | None]
     _SearchParam, Depends(search_param_factory(DagRun.triggering_user_name, 
"triggering_user"))
 ]
 
+
+class _DagRunConsumingAssetFilter(BaseParam[str | None]):
+    """
+    Filter DagRuns by an asset they consumed (triggered by).
+
+    Matches by asset name or URI substring on related consumed_asset_events.
+    """
+
+    def __init__(self) -> None:
+        super().__init__(skip_none=True)
+
+    @classmethod
+    def depends(
+        cls,
+        consuming_asset: str | None = Query(
+            None, description="Filter DagRuns that consumed an asset (match by 
asset name or URI)"
+        ),
+    ) -> _DagRunConsumingAssetFilter:
+        return cls().set_value(consuming_asset)
+
+    def to_orm(self, select: Select) -> Select:  # type: ignore[name-defined]
+        if not self.value:
+            return select
+        asset_match = or_(AssetModel.name.ilike(f"%{self.value}%"), 
AssetModel.uri.ilike(f"%{self.value}%"))
+        exists_clause = (
+            sql_select(1)
+            .select_from(dagrun_asset_event_table)
+            .join(AssetEvent, AssetEvent.id == 
dagrun_asset_event_table.c.event_id)
+            .join(AssetEvent.asset)
+            .where(
+                and_(
+                    dagrun_asset_event_table.c.dag_run_id == DagRun.id,
+                    asset_match,
+                )
+            )
+            .exists()
+        )
+        return select.where(exists_clause)
+
+
+class _DagRunProducingAssetFilter(BaseParam[str | None]):
+    """
+    Filter DagRuns by an asset they produced (downstream created by tasks in 
the run).
+
+    This links from AssetEvent.source_dag_id/source_run_id back to the DagRun 
and then matches the
+    asset by name or URI.
+    """
+
+    def __init__(self) -> None:
+        super().__init__(skip_none=True)
+
+    @classmethod
+    def depends(
+        cls,
+        producing_asset: str | None = Query(
+            None, description="Filter DagRuns that produced an asset (match by 
asset name or URI)"
+        ),
+    ) -> _DagRunProducingAssetFilter:
+        return cls().set_value(producing_asset)
+
+    def to_orm(self, select: Select) -> Select:  # type: ignore[name-defined]
+        if not self.value:
+            return select
+        asset_match = or_(AssetModel.name.ilike(f"%{self.value}%"), 
AssetModel.uri.ilike(f"%{self.value}%"))
+        exists_clause = (
+            sql_select(1)
+            .select_from(AssetEvent)
+            .join(AssetEvent.asset)
+            .where(
+                and_(
+                    AssetEvent.source_dag_id == DagRun.dag_id,
+                    AssetEvent.source_run_id == DagRun.run_id,
+                    asset_match,
+                )
+            )
+            .exists()
+        )
+        return select.where(exists_clause)
+
+
+QueryDagRunConsumingAssetFilter = Annotated[

Review Comment:
   Similarly to other 'search' operating on ilike Pattern suffix should be in 
the name, take a look at other search params in the same file.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -626,9 +663,35 @@ def get_list_dag_runs_batch(
         session=session,
     )
 
-    dag_runs = session.scalars(dag_runs_select)
-
-    return DAGRunCollectionResponse(
-        dag_runs=dag_runs,
-        total_entries=total_entries,
-    )
+    dag_runs = list(session.scalars(dag_runs_select))
+
+    source_keys = {(dr.dag_id, dr.run_id) for dr in dag_runs}
+    produced_map: dict[tuple[str, str], list[AssetSummary]] = {}

Review Comment:
   you can use a default dict, that will avoid the 'setdefault' call for each 
item below.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -359,13 +363,18 @@ def get_dag_runs(
         Depends(search_param_factory(DagRun.triggering_user_name, 
"triggering_user_name_pattern")),
     ],
     dag_id_pattern: Annotated[_SearchParam, 
Depends(search_param_factory(DagRun.dag_id, "dag_id_pattern"))],
+    consuming_asset: QueryDagRunConsumingAssetFilter,
+    producing_asset: QueryDagRunProducingAssetFilter,
 ) -> DAGRunCollectionResponse:
     """
     Get all DAG Runs.
 
     This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
     """
-    query = select(DagRun)
+    query = select(DagRun).options(
+        
selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset),
+        joinedload(DagRun.dag_model),
+    )

Review Comment:
   I think we do need the first part 
`selectinload(DagRun.consumed_asset_events)` for the serialization to not emit 
N lazy db queries at serialization time.
   
   The second part I believe can be moved inside the `parameter` logic because 
they seem to be the one using it, and there is no need to always do that eager 
loading if there is no filtering on `consuming` or `producing` assets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to