pierrejeambrun commented on code in PR #54263:
URL: https://github.com/apache/airflow/pull/54263#discussion_r2266981466


##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py:
##########
@@ -839,3 +938,85 @@ def test_delete_dag_should_response_401(self, 
unauthenticated_test_client):
     def test_delete_dag_should_response_403(self, unauthorized_test_client):
         response = unauthorized_test_client.delete(f"{API_PREFIX}/{DAG1_ID}")
         assert response.status_code == 403
+
+
+class TestDagAssetFilters(TestDagEndpoint):
+    """Unit tests for DAG asset-based filters."""
+
+    @pytest.fixture(autouse=True)
+    @provide_session
+    def setup_with_assets(self, dag_maker, session=None) -> None:
+        """Set up test data including asset-based DAGs."""
+        # Clear and set up base data first
+        self._clear_db()
+
+        with dag_maker(
+            DAG1_ID,
+            dag_display_name=DAG1_DISPLAY_NAME,

Review Comment:
   There is already a class test for `listing dags` and 
`test_should_return_200` to test different parameters, it would be nice to 
integrate this tests to those so we have 1 centralize test case for testing 
filtering.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py:
##########
@@ -839,3 +938,85 @@ def test_delete_dag_should_response_401(self, 
unauthenticated_test_client):
     def test_delete_dag_should_response_403(self, unauthorized_test_client):
         response = unauthorized_test_client.delete(f"{API_PREFIX}/{DAG1_ID}")
         assert response.status_code == 403
+
+
+class TestDagAssetFilters(TestDagEndpoint):
+    """Unit tests for DAG asset-based filters."""
+
+    @pytest.fixture(autouse=True)
+    @provide_session

Review Comment:
   ```suggestion
   ```
   
   This shouldn't be needed, session is a fixture, and we should probably 
remove that everywhere in this file.



##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -606,6 +606,65 @@ def depends_float(
 QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter.depends)]
 QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter.depends)]
 
+
+class _HasAssetScheduleFilter(BaseParam[bool]):
+    """Filter DAGs that have asset-based scheduling."""
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        if self.value:
+            # Filter DAGs that have asset-based scheduling
+            return select.where(
+                and_(
+                    DagModel.asset_expression.is_not(None),
+                    func.cast(DagModel.asset_expression, String) != "null",

Review Comment:
   That's not really accurate, `asset_expression` is not a string, but a JSON 
field, not sure about that there might be a cleaner way to achieve this.



##########
airflow-core/src/airflow/api_fastapi/common/parameters.py:
##########
@@ -606,6 +606,65 @@ def depends_float(
 QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter.depends)]
 QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter.depends)]
 
+
+class _HasAssetScheduleFilter(BaseParam[bool]):
+    """Filter DAGs that have asset-based scheduling."""
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        if self.value:
+            # Filter DAGs that have asset-based scheduling
+            return select.where(
+                and_(
+                    DagModel.asset_expression.is_not(None),
+                    func.cast(DagModel.asset_expression, String) != "null",
+                )
+            )
+        # Filter DAGs that do NOT have asset-based scheduling
+        return select.where(
+            or_(DagModel.asset_expression.is_(None), 
func.cast(DagModel.asset_expression, String) == "null")
+        )
+
+    @classmethod
+    def depends(
+        cls,
+        has_asset_schedule: bool | None = Query(None, description="Filter DAGs 
with asset-based scheduling"),
+    ) -> _HasAssetScheduleFilter:
+        return cls().set_value(has_asset_schedule)
+
+
+class _AssetDependencyFilter(BaseParam[str]):
+    """Filter DAGs by specific asset dependencies."""
+
+    def to_orm(self, select: Select) -> Select:
+        if self.value is None and self.skip_none:
+            return select
+
+        # Join with DagScheduleAssetReference and AssetModel to filter by 
asset name/URI
+        select = select.join(
+            DagScheduleAssetReference, DagModel.dag_id == 
DagScheduleAssetReference.dag_id, isouter=False
+        ).join(AssetModel, DagScheduleAssetReference.asset_id == 
AssetModel.id, isouter=False)
+

Review Comment:
   I have a doubt if this will not duplicate Dag rows when there is more than 1 
`DagScheduleAssetReference`. For instance two references with similar uris 
matching the same search pattern for the same dag. (1 dag, two asset 
references, one is `some_bucket`, the other one is `some_bucket2` and we search 
by `some_bucket`). 2 references will match, and the join will most likely 
duplicate rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to