[ 
https://issues.apache.org/jira/browse/AIRFLOW-1195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618013#comment-16618013
 ] 

ASF GitHub Bot commented on AIRFLOW-1195:
-----------------------------------------

kaxil closed pull request #3907: [AIRFLOW-1195] Add feature to clear tasks in 
Parent Dag
URL: https://github.com/apache/incubator-airflow/pull/3907
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fd8765588a..fb9ddbe2b0 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -705,7 +705,9 @@ def clear(args):
         only_failed=args.only_failed,
         only_running=args.only_running,
         confirm_prompt=not args.no_confirm,
-        include_subdags=not args.exclude_subdags)
+        include_subdags=not args.exclude_subdags,
+        include_parentdag=not args.exclude_parentdag,
+    )
 
 
 def get_num_ready_workers_running(gunicorn_master_proc):
@@ -1604,6 +1606,10 @@ class CLIFactory(object):
         'exclude_subdags': Arg(
             ("-x", "--exclude_subdags"),
             "Exclude subdags", "store_true"),
+        'exclude_parentdag': Arg(
+            ("-xp", "--exclude_parentdag"),
+            "Exclude ParentDAGS if the task cleared is a part of a SubDAG",
+            "store_true"),
         'dag_regex': Arg(
             ("-dx", "--dag_regex"),
             "Search dag_id as regex instead of exact string", "store_true"),
@@ -1936,7 +1942,7 @@ class CLIFactory(object):
             'args': (
                 'dag_id', 'task_regex', 'start_date', 'end_date', 'subdir',
                 'upstream', 'downstream', 'no_confirm', 'only_failed',
-                'only_running', 'exclude_subdags', 'dag_regex'),
+                'only_running', 'exclude_subdags', 'exclude_parentdag', 
'dag_regex'),
         }, {
             'func': pause,
             'help': "Pause a DAG",
diff --git a/airflow/models.py b/airflow/models.py
index d703810a77..1e4949e563 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3798,9 +3798,11 @@ def clear(
             only_running=False,
             confirm_prompt=False,
             include_subdags=True,
+            include_parentdag=True,
             reset_dag_runs=True,
             dry_run=False,
             session=None,
+            get_tis=False,
     ):
         """
         Clears a set of task instances associated with the current dag for
@@ -3821,6 +3823,25 @@ def clear(
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
             tis = tis.filter(TI.task_id.in_(self.task_ids))
 
+        if include_parentdag and self.is_subdag:
+
+            p_dag = self.parent_dag.sub_dag(
+                task_regex=self.dag_id.split('.')[1],
+                include_upstream=False,
+                include_downstream=True)
+
+            tis = tis.union(p_dag.clear(
+                start_date=start_date, end_date=end_date,
+                only_failed=only_failed,
+                only_running=only_running,
+                confirm_prompt=confirm_prompt,
+                include_subdags=include_subdags,
+                include_parentdag=False,
+                reset_dag_runs=reset_dag_runs,
+                get_tis=True,
+                session=session,
+            ))
+
         if start_date:
             tis = tis.filter(TI.execution_date >= start_date)
         if end_date:
@@ -3832,6 +3853,9 @@ def clear(
         if only_running:
             tis = tis.filter(TI.state == State.RUNNING)
 
+        if get_tis:
+            return tis
+
         if dry_run:
             tis = tis.all()
             session.expunge_all()
@@ -3875,6 +3899,7 @@ def clear_dags(
             only_running=False,
             confirm_prompt=False,
             include_subdags=True,
+            include_parentdag=False,
             reset_dag_runs=True,
             dry_run=False,
     ):
@@ -3887,6 +3912,7 @@ def clear_dags(
                 only_running=only_running,
                 confirm_prompt=False,
                 include_subdags=include_subdags,
+                include_parentdag=include_parentdag,
                 reset_dag_runs=reset_dag_runs,
                 dry_run=True)
             all_tis.extend(tis)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index aa2530e458..be11b11376 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1111,7 +1111,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, 
origin,
             count = dag.clear(
                 start_date=start_date,
                 end_date=end_date,
-                include_subdags=recursive)
+                include_subdags=recursive,
+                include_parentdag=recursive,
+            )
 
             flash("{0} task instances have been cleared".format(count))
             return redirect(origin)
@@ -1120,7 +1122,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, 
origin,
             start_date=start_date,
             end_date=end_date,
             include_subdags=recursive,
-            dry_run=True)
+            dry_run=True,
+            include_parentdag=recursive,
+        )
         if not tis:
             flash("No task instances to clear", 'error')
             response = redirect(origin)
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 3dc3400968..38835998e8 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -837,7 +837,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             count = dag.clear(
                 start_date=start_date,
                 end_date=end_date,
-                include_subdags=recursive)
+                include_subdags=recursive,
+                include_parentdag=recursive,
+            )
 
             flash("{0} task instances have been cleared".format(count))
             return redirect(origin)
@@ -846,7 +848,9 @@ def _clear_dag_tis(self, dag, start_date, end_date, origin,
             start_date=start_date,
             end_date=end_date,
             include_subdags=recursive,
-            dry_run=True)
+            include_parentdag=recursive,
+            dry_run=True,
+        )
         if not tis:
             flash("No task instances to clear", 'error')
             response = redirect(origin)
diff --git a/tests/core.py b/tests/core.py
index a517070614..b937178a9e 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1429,8 +1429,18 @@ def test_subdag_clear(self):
             'clear', 'example_subdag_operator', '--no_confirm', 
'--exclude_subdags'])
         cli.clear(args)
 
+    def test_parentdag_downstream_clear(self):
+        args = self.parser.parse_args([
+            'clear', 'example_subdag_operator.section-1', '--no_confirm'])
+        cli.clear(args)
+        args = self.parser.parse_args([
+            'clear', 'example_subdag_operator.section-1', '--no_confirm',
+            '--exclude_parentdag'])
+        cli.clear(args)
+
     def test_get_dags(self):
-        dags = cli.get_dags(self.parser.parse_args(['clear', 
'example_subdag_operator', '-c']))
+        dags = cli.get_dags(self.parser.parse_args(['clear', 
'example_subdag_operator',
+                                                    '-c']))
         self.assertEqual(len(dags), 1)
 
         dags = cli.get_dags(self.parser.parse_args(['clear', 'subdag', '-dx', 
'-c']))
@@ -1942,6 +1952,34 @@ def test_dag_views(self):
         response = self.app.get(url)
         self.assertIn("Wait a minute", response.data.decode('utf-8'))
         response = self.app.get(url + "&confirmed=true")
+        url = (
+            "/admin/airflow/clear?task_id=section-1-task-1&"
+            "dag_id=example_subdag_operator.section-1&future=false&past=false&"
+            "upstream=false&downstream=true&recursive=true&"
+            "execution_date={}&"
+            "origin=/admin".format(DEFAULT_DATE_DS))
+        response = self.app.get(url)
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.end",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-1.section-1-task-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-1",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-2",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-3",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-4",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.section-2.section-2-task-5",
+                      response.data.decode('utf-8'))
+        self.assertIn("example_subdag_operator.some-other-task",
+                      response.data.decode('utf-8'))
         url = (
             "/admin/airflow/run?task_id=runme_0&"
             
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
diff --git a/tests/jobs.py b/tests/jobs.py
index dc3381e8e0..e18a87e6cf 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -901,6 +901,59 @@ def test_backfill_execute_subdag(self):
         subdag.clear()
         dag.clear()
 
+    def test_subdag_clear_parentdag_downstream_clear(self):
+        dag = self.dagbag.get_dag('example_subdag_operator')
+        subdag_op_task = dag.get_task('section-1')
+
+        subdag = subdag_op_task.subdag
+        subdag.schedule_interval = '@daily'
+
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=subdag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor,
+                          donot_pickle=True)
+
+        with timeout(seconds=30):
+            job.run()
+
+        ti0 = TI(
+            task=subdag.get_task('section-1-task-1'),
+            execution_date=DEFAULT_DATE)
+        ti0.refresh_from_db()
+        self.assertEqual(ti0.state, State.SUCCESS)
+
+        sdag = subdag.sub_dag(
+            task_regex='section-1-task-1',
+            include_downstream=True,
+            include_upstream=False)
+
+        sdag.clear(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            include_parentdag=True)
+
+        ti0.refresh_from_db()
+        self.assertEquals(State.NONE, ti0.state)
+
+        ti1 = TI(
+            task=dag.get_task('some-other-task'),
+            execution_date=DEFAULT_DATE)
+        self.assertEquals(State.NONE, ti1.state)
+
+        # Checks that all the Downstream tasks for Parent DAG
+        # have been cleared
+        for task in subdag_op_task.downstream_list:
+            ti = TI(
+                task=dag.get_task(task.task_id),
+                execution_date=DEFAULT_DATE
+            )
+            self.assertEquals(State.NONE, ti.state)
+
+        subdag.clear()
+        dag.clear()
+
     def test_backfill_execute_subdag_with_removed_task(self):
         """
         Ensure that subdag operators execute properly in the case where


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Cleared tasks in SubDagOperator do not trigger Parent dag_runs
> --------------------------------------------------------------
>
>                 Key: AIRFLOW-1195
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1195
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: subdag
>    Affects Versions: 1.8.1
>            Reporter: Paul Zaczkieiwcz
>            Assignee: Kaxil Naik
>            Priority: Minor
>         Attachments: example_subdag_operator.not-cleared.png, 
> example_subdag_operator.section-2.cleared.png
>
>
> Let's say that you had a task fail in a SubDag.  You fix the underlying issue 
> and want Airflow to resume the DagRun where it left off.  If this were a flat 
> DAG, then all you need to do is clear the failed TaskInstance and its 
> downstream dependencies. The GUI will happily clear all of them for you in a 
> single PUT request!  In order to resume a SubDag, you must clear the 
> TaskInstance + downstream dependencies AND you must clear the SubDagOperator 
> + downstream depencies WITHOUT clearing its recursive dependencies. There 
> should be an option to recursively clear task instances in upstream SubDags.
> The attached files use the example_subdag_operator DAG to illustrate the 
> problem.  Before the screenshot, I ran the operator to completion, then 
> cleared {{example_subdag_operator.section-2.section-2-task-5}}. Notice that 
> {{example_subdag_operator.section-2}} is in the `running` state, but 
> {{example_subdag_operator}} is still in the `success` state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to