milton0825 commented on a change in pull request #5498: [AIRFLOW-4509] 
SubDagOperator using scheduler instead of backfill
URL: https://github.com/apache/airflow/pull/5498#discussion_r305694663
 
 

 ##########
 File path: tests/operators/test_subdag_operator.py
 ##########
 @@ -128,24 +139,100 @@ def test_subdag_pools_no_possible_conflict(self):
         session.delete(pool_10)
         session.commit()
 
-    def test_subdag_deadlock(self):
-        dagbag = DagBag()
-        dag = dagbag.get_dag('test_subdag_deadlock')
-        dag.clear()
-        subdag = dagbag.get_dag('test_subdag_deadlock.subdag')
-        subdag.clear()
+    def test_execute_create_dagrun_wait_until_success(self):
+        """
+        When SubDagOperator executes, it creates a DagRun if there is no 
existing one
+        and wait until the DagRun succeeds.
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.test', default_args=default_args)
+        subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag, 
poke_interval=1)
+
+        subdag.create_dagrun = Mock()
+        subdag.create_dagrun.return_value = self.dag_run_running
+
+        subdag_task._get_dagrun = Mock()
+        subdag_task._get_dagrun.side_effect = [None, self.dag_run_success, 
self.dag_run_success]
 
-        # first make sure subdag has failed
-        self.assertRaises(AirflowException, subdag.run, 
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
 
-        # now make sure dag picks up the subdag error
-        self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE)
+        subdag.create_dagrun.assert_called_once_with(
+            run_id="scheduled__{}".format(DEFAULT_DATE.isoformat()),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            external_trigger=True,
+        )
 
-    def test_subdag_executor(self):
+        self.assertEqual(3, len(subdag_task._get_dagrun.mock_calls))
+
+    def test_execute_dagrun_failed(self):
         """
-        Test default subdag executor is SequentialExecutor
+        When the DagRun failed during the execution, it raises an Airflow 
Exception.
         """
         dag = DAG('parent', default_args=default_args)
-        subdag_good = DAG('parent.test', default_args=default_args)
-        subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good)
-        self.assertEqual(type(subdag.executor), SequentialExecutor)
+        subdag = DAG('parent.test', default_args=default_args)
+        subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag, 
poke_interval=1)
+
+        subdag.create_dagrun = Mock()
+        subdag.create_dagrun.return_value = self.dag_run_running
+
+        subdag_task._get_dagrun = Mock()
+        subdag_task._get_dagrun.side_effect = [None, self.dag_run_failed, 
self.dag_run_failed]
+
+        with self.assertRaises(AirflowException):
+            subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
+            subdag_task.execute(context={'execution_date': DEFAULT_DATE})
+            subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+
+    def test_execute_skip_if_dagrun_success(self):
+        """
+        When there is an existing DagRun in SUCCESS state, skip the execution.
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.test', default_args=default_args)
+
+        subdag.create_dagrun = Mock()
+        subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag, 
poke_interval=1)
+        subdag_task._get_dagrun = Mock()
+        subdag_task._get_dagrun.return_value = self.dag_run_success
+
+        subdag_task.pre_execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.execute(context={'execution_date': DEFAULT_DATE})
+        subdag_task.post_execute(context={'execution_date': DEFAULT_DATE})
+
+        subdag.create_dagrun.assert_not_called()
+        self.assertEqual(3, len(subdag_task._get_dagrun.mock_calls))
+
+    def test_rerun_failed_subdag(self):
+        """
+        When there is an existing DagRun with failed state, reset the DagRun 
and the
+        corresponding TaskInstances
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag = DAG('parent.test', default_args=default_args)
+        subdag_task = SubDagOperator(task_id='test', subdag=subdag, dag=dag, 
poke_interval=1)
+        dummy_task = DummyOperator(task_id='dummy', dag=subdag)
+
+        session = Session()
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to