pierrejeambrun commented on code in PR #62399:
URL: https://github.com/apache/airflow/pull/62399#discussion_r2853207282


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -152,6 +154,22 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: 
SessionDep):
     session.delete(dag_run)
 
 
+@dag_run_router.patch(
+    "",
+    dependencies=[
+        Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.RUN)),
+        Depends(action_logging()),
+    ],
+)
+def bulk_dag_runs(
+    dag_id: str,
+    request: BulkBody[BulkDagRunBody],

Review Comment:
   This isn't consistent with other implementation. This should be:
   
   
   ```suggestion
       request: BulkBody[DAGRunPatchBody],
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -152,6 +154,22 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: 
SessionDep):
     session.delete(dag_run)
 
 
+@dag_run_router.patch(
+    "",
+    dependencies=[
+        Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.RUN)),

Review Comment:
   Permission is wrong. That should be `requires_access_dag_run_bulk` and you 
need to impement that similarly to connections variables and pools



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -89,3 +99,84 @@ async def wait(self) -> AsyncGenerator[str, None]:
             await asyncio.sleep(self.interval)
             yield self._serialize_response(dag_run := await 
self._get_dag_run())
             yield "\n"
+
+
+class BulkDagRunService(BulkService[BulkDagRunBody]):
+    """Service for handling bulk operations on dag runs."""
+
+    def _categorize_dag_runs(
+        self, dag_run_keys: set[tuple[str, str]]

Review Comment:
   Maybe extract the `tuple[str, str]` into a reusable type `DagRunKey` with a 
comment there it will be more readable for functions signatures



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -152,6 +154,22 @@ def delete_dag_run(dag_id: str, dag_run_id: str, session: 
SessionDep):
     session.delete(dag_run)
 
 
+@dag_run_router.patch(
+    "",
+    dependencies=[
+        Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.RUN)),
+        Depends(action_logging()),
+    ],
+)
+def bulk_dag_runs(
+    dag_id: str,
+    request: BulkBody[BulkDagRunBody],
+    session: SessionDep,
+) -> BulkResponse:
+    """Bulk delete dag runs."""

Review Comment:
   Docstring not in line with the method name. Method name suggests 'bulk' 
operation. The docstring only mentions 'delete'.
   
   Just state that other bulk operation are 'no implemented' and raise a 'not 
implemented error'.



##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -89,3 +99,84 @@ async def wait(self) -> AsyncGenerator[str, None]:
             await asyncio.sleep(self.interval)
             yield self._serialize_response(dag_run := await 
self._get_dag_run())
             yield "\n"
+
+
+class BulkDagRunService(BulkService[BulkDagRunBody]):
+    """Service for handling bulk operations on dag runs."""
+
+    def _categorize_dag_runs(

Review Comment:
   Why a private function? Other services categorize function isn't private. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to