ashb commented on a change in pull request #5283: [AIRFLOW-4521] Pause dag also 
pause its subdags
URL: https://github.com/apache/airflow/pull/5283#discussion_r288530195
 
 

 ##########
 File path: airflow/models/dag.py
 ##########
 @@ -1522,3 +1522,46 @@ def create_dagrun(self,
                                             external_trigger=external_trigger,
                                             conf=conf,
                                             session=session)
+
+    @classmethod
+    def _find_dag_ids_including_subdags(cls, dag: DAG):
+        from airflow.operators.subdag_operator import SubDagOperator  # Avoid 
circular imports
+        dag_ids = [dag.dag_id]
+        for task in dag.tasks:
+            if isinstance(task, SubDagOperator):
+                subdag = task.subdag
+                dag_ids.extend(cls._find_dag_ids_including_subdags(subdag))
+        return dag_ids
+
+    @classmethod
+    @provide_session
+    def set_is_paused(cls,
+                      dag_id: str,
+                      is_paused: bool,
+                      including_subdags: bool = True,
+                      subdir: str = None,
+                      session=None) -> None:
+        """
+        Pause/Un-pause a DAG.
+
+        :param dag_id: DAG ID
+        :param is_paused: Is the DAG paused
+        :param including_subdags: whether to include the DAG's subdags
+        :param subdir: where to find the DAG files
+        :param session: session
+        """
+        dag_ids = []  # type: List[str]
+        if including_subdags:
+            dagbag = DagBag(dag_folder=subdir)
 
 Review comment:
   @milton0825 Creating a full DagBag is a potentially very expensive 
operation. Please can you create a new PR (against the same Jira ticket) to 
change this so that we don't parse all Dags here?
   
   Additionally there is already a `subdags` property on the DAG class that we 
should use instead of duplicating this logic in 
_find_dag_ids_including_subdags. (We possiibly don't need that fn anymore 
either? `[dag.dag_id] + [ dag.dag_id for dag in dag.subdags]`. (The subdags 
will recurse in to find nested subdags already)
   
    Could this be done by looking at the DAG model only? Or at least only 
loading the one specific subdag, or at least not loading all dags even when 
pausing a dag that doesn't contain any subdags?
   
   Creating a DagBag is probably a code smell now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to