SamWheating commented on a change in pull request #19758:
URL: https://github.com/apache/airflow/pull/19758#discussion_r755581047



##########
File path: airflow/api_connexion/endpoints/dag_endpoint.py
##########
@@ -108,6 +108,48 @@ def patch_dag(session, dag_id, update_mask=None):
     session.commit()
     return dag_schema.dump(dag)
 
[email protected]_access([(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_DAG)])
+@format_parameters({'limit': check_limit})
+@provide_session
+def patch_dags(limit, session, offset=0, only_active=True, tags=None, 
dag_id_pattern=None, update_mask=None):
+    """Patch multiple DAGs."""
+    if only_active:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag, 
DagModel.is_active)
+    else:
+        dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
+
+    if dag_id_pattern:
+        dags_query = 
dags_query.filter(DagModel.dag_id.ilike(f'%{dag_id_pattern}%'))
+
+    editable_dags = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+
+    dags_query = dags_query.filter(DagModel.dag_id.in_(editable_dags))
+    if tags:
+        cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags]
+        dags_query = dags_query.filter(or_(*cond))
+
+    total_entries = len(dags_query.all())
+
+    dags = 
dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
+
+    try:
+        patch_body = dag_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest("Invalid Dag schema", detail=str(err.messages))
+    if update_mask:
+        patch_body_ = {}
+        if len(update_mask) > 1:
+            raise BadRequest(detail="Only `is_paused` field can be updated 
through the REST API")
+        update_mask = update_mask[0]
+        if update_mask != 'is_paused':
+            raise BadRequest(detail="Only `is_paused` field can be updated 
through the REST API")
+        patch_body_[update_mask] = patch_body[update_mask]
+        patch_body = patch_body_
+    for dag in dags:

Review comment:
       Would it be possible to do this in a single operation rather than a 
`for` loop? I know that mySQL doesn't support `update .. returning` queries, 
but maybe we could update all of the DAGs in a `query.update()` and then fetch 
the list separately?
   
   I wanna say we can just do something like:
   ```python
   dags_query = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit)
   
   dags_query.update({DagModel.is_paused: is_paused}, 
synchronize_session='fetch')
   
   dags = dags_query.all()
   ```
   
   But I don't think that the set of DAGs updated will necessarily be the same 
as the set of DAGs returned? I guess it depends on the isolation level of the 
database 🤔 
   
   Anyways, let me know if you have any suggestions, otherwise I think it's 
fine to leave this as an O(n) operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to