BasPH commented on a change in pull request #5303:  [AIRFLOW-4535] Break 
jobs.py into multiple files
URL: https://github.com/apache/airflow/pull/5303#discussion_r285373230
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -79,1622 +68,7 @@
 # created at runtime
 TEMP_DAG_FILENAME = "temp_dag.py"
 TEST_DAGS_FOLDER = os.path.join(
-    os.path.dirname(os.path.realpath(__file__)), 'dags')
-
-
-class BaseJobTest(unittest.TestCase):
-    class TestJob(BaseJob):
-        __mapper_args__ = {
-            'polymorphic_identity': 'TestJob'
-        }
-
-        def __init__(self, cb):
-            self.cb = cb
-            super().__init__()
-
-        def _execute(self):
-            return self.cb()
-
-    def test_state_success(self):
-        job = self.TestJob(lambda: True)
-        job.run()
-
-        self.assertEqual(job.state, State.SUCCESS)
-        self.assertIsNotNone(job.end_date)
-
-    def test_state_sysexit(self):
-        import sys
-        job = self.TestJob(lambda: sys.exit(0))
-        job.run()
-
-        self.assertEqual(job.state, State.SUCCESS)
-        self.assertIsNotNone(job.end_date)
-
-    def test_state_failed(self):
-        def abort():
-            raise RuntimeError("fail")
-
-        job = self.TestJob(abort)
-        with self.assertRaises(RuntimeError):
-            job.run()
-
-        self.assertEqual(job.state, State.FAILED)
-        self.assertIsNotNone(job.end_date)
-
-
-class BackfillJobTest(unittest.TestCase):
-
-    def _get_dummy_dag(self, dag_id, pool=None, task_concurrency=None):
-        dag = DAG(
-            dag_id=dag_id,
-            start_date=DEFAULT_DATE,
-            schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(
-                task_id='op',
-                pool=pool,
-                task_concurrency=task_concurrency,
-                dag=dag)
-
-        dag.clear()
-        return dag
-
-    def _times_called_with(self, method, class_):
-        count = 0
-        for args in method.call_args_list:
-            if isinstance(args[0][0], class_):
-                count += 1
-        return count
-
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag(include_examples=True)
-
-    def setUp(self):
-        clear_db_runs()
-        clear_db_pools()
-
-        self.parser = cli.CLIFactory.get_parser()
-
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
-
-        dag_run = dag.create_dagrun(
-            run_id='test',
-            state=State.RUNNING,
-        )
-
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=8),
-            ignore_first_depends_on_past=True
-        )
-
-        job._set_unfinished_dag_runs_to_failed([dag_run])
-
-        dag_run.refresh_from_db()
-
-        self.assertEquals(State.FAILED, dag_run.state)
-
-    def test_dag_run_with_finished_tasks_set_to_success(self):
-        dag = self._get_dummy_dag('dummy_dag')
-
-        dag_run = dag.create_dagrun(
-            run_id='test',
-            state=State.RUNNING,
-        )
-
-        for ti in dag_run.get_task_instances():
-            ti.set_state(State.SUCCESS)
-
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=8),
-            ignore_first_depends_on_past=True
-        )
-
-        job._set_unfinished_dag_runs_to_failed([dag_run])
-
-        dag_run.refresh_from_db()
-
-        self.assertEquals(State.SUCCESS, dag_run.state)
-
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 
'sql_alchemy_conn'),
-                     "concurrent access not supported in sqlite")
-    def test_trigger_controller_dag(self):
-        dag = self.dagbag.get_dag('example_trigger_controller_dag')
-        target_dag = self.dagbag.get_dag('example_trigger_target_dag')
-        target_dag.sync_to_db()
-
-        scheduler = SchedulerJob()
-        task_instances_list = Mock()
-        scheduler._process_task_instances(target_dag, 
task_instances_list=task_instances_list)
-        self.assertFalse(task_instances_list.append.called)
-
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE,
-            ignore_first_depends_on_past=True
-        )
-        job.run()
-
-        scheduler._process_task_instances(target_dag, 
task_instances_list=task_instances_list)
-
-        self.assertTrue(task_instances_list.append.called)
-
-    @unittest.skipIf('sqlite' in configuration.conf.get('core', 
'sql_alchemy_conn'),
-                     "concurrent access not supported in sqlite")
-    def test_backfill_multi_dates(self):
-        dag = self.dagbag.get_dag('example_bash_operator')
-
-        end_date = DEFAULT_DATE + datetime.timedelta(days=1)
-
-        executor = TestExecutor()
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=end_date,
-            executor=executor,
-            ignore_first_depends_on_past=True
-        )
-
-        job.run()
-
-        expected_execution_order = [
-            ("runme_0", DEFAULT_DATE),
-            ("runme_1", DEFAULT_DATE),
-            ("runme_2", DEFAULT_DATE),
-            ("runme_0", end_date),
-            ("runme_1", end_date),
-            ("runme_2", end_date),
-            ("also_run_this", DEFAULT_DATE),
-            ("also_run_this", end_date),
-            ("run_after_loop", DEFAULT_DATE),
-            ("run_after_loop", end_date),
-            ("run_this_last", DEFAULT_DATE),
-            ("run_this_last", end_date),
-        ]
-        self.maxDiff = None
-        self.assertListEqual(
-            [((dag.dag_id, task_id, when, 1), State.SUCCESS)
-             for (task_id, when) in expected_execution_order],
-            executor.sorted_tasks
-        )
-
-        session = settings.Session()
-        drs = session.query(DagRun).filter(
-            DagRun.dag_id == dag.dag_id
-        ).order_by(DagRun.execution_date).all()
-
-        self.assertTrue(drs[0].execution_date == DEFAULT_DATE)
-        self.assertTrue(drs[0].state == State.SUCCESS)
-        self.assertTrue(drs[1].execution_date ==
-                        DEFAULT_DATE + datetime.timedelta(days=1))
-        self.assertTrue(drs[1].state == State.SUCCESS)
-
-        dag.clear()
-        session.close()
-
-    @unittest.skipIf(
-        "sqlite" in configuration.conf.get("core", "sql_alchemy_conn"),
-        "concurrent access not supported in sqlite",
-    )
-    @parameterized.expand(
-        [
-            [
-                "example_branch_operator",
-                (
-                    "run_this_first",
-                    "branching",
-                    "branch_a",
-                    "branch_b",
-                    "branch_c",
-                    "branch_d",
-                    "follow_branch_a",
-                    "follow_branch_b",
-                    "follow_branch_c",
-                    "follow_branch_d",
-                    "join",
-                ),
-            ],
-            [
-                "example_bash_operator",
-                ("runme_0", "runme_1", "runme_2", "also_run_this", 
"run_after_loop", "run_this_last"),
-            ],
-            [
-                "example_skip_dag",
-                (
-                    "always_true_1",
-                    "always_true_2",
-                    "skip_operator_1",
-                    "skip_operator_2",
-                    "all_success",
-                    "one_success",
-                    "final_1",
-                    "final_2",
-                ),
-            ],
-            ["latest_only", ("latest_only", "task1")],
-        ]
-    )
-    def test_backfill_examples(self, dag_id, expected_execution_order):
-        """
-        Test backfilling example dags
-
-        Try to backfill some of the example dags. Be careful, not all dags are 
suitable
-        for doing this. For example, a dag that sleeps forever, or does not 
have a
-        schedule won't work here since you simply can't backfill them.
-        """
-        self.maxDiff = None
-        dag = self.dagbag.get_dag(dag_id)
-
-        logger.info('*** Running example DAG: %s', dag.dag_id)
-        executor = TestExecutor()
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE,
-            executor=executor,
-            ignore_first_depends_on_past=True)
-
-        job.run()
-        self.assertListEqual(
-            [((dag_id, task_id, DEFAULT_DATE, 1), State.SUCCESS)
-             for task_id in expected_execution_order],
-            executor.sorted_tasks
-        )
-
-    def test_backfill_conf(self):
-        dag = self._get_dummy_dag('test_backfill_conf')
-
-        executor = TestExecutor()
-
-        conf = json.loads("""{"key": "value"}""")
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          conf=conf)
-        job.run()
-
-        dr = DagRun.find(dag_id='test_backfill_conf')
-
-        self.assertEqual(conf, dr[0].conf)
-
-    @patch('airflow.jobs.LoggingMixin.log')
-    def test_backfill_respect_task_concurrency_limit(self, mock_log):
-        task_concurrency = 2
-        dag = self._get_dummy_dag(
-            'test_backfill_respect_task_concurrency_limit',
-            task_concurrency=task_concurrency,
-        )
-
-        executor = TestExecutor()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=executor,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-        )
-
-        job.run()
-
-        self.assertTrue(0 < len(executor.history))
-
-        task_concurrency_limit_reached_at_least_once = False
-
-        num_running_task_instances = 0
-        for running_task_instances in executor.history:
-            self.assertLessEqual(len(running_task_instances), task_concurrency)
-            num_running_task_instances += len(running_task_instances)
-            if len(running_task_instances) == task_concurrency:
-                task_concurrency_limit_reached_at_least_once = True
-
-        self.assertEquals(8, num_running_task_instances)
-        self.assertTrue(task_concurrency_limit_reached_at_least_once)
-
-        times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            DagConcurrencyLimitReached,
-        )
-
-        times_pool_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            NoAvailablePoolSlot,
-        )
-
-        times_task_concurrency_limit_reached_in_debug = 
self._times_called_with(
-            mock_log.debug,
-            TaskConcurrencyLimitReached,
-        )
-
-        self.assertEquals(0, times_pool_limit_reached_in_debug)
-        self.assertEquals(0, times_dag_concurrency_limit_reached_in_debug)
-        self.assertGreater(times_task_concurrency_limit_reached_in_debug, 0)
-
-    @patch('airflow.jobs.LoggingMixin.log')
-    def test_backfill_respect_dag_concurrency_limit(self, mock_log):
-
-        dag = 
self._get_dummy_dag('test_backfill_respect_dag_concurrency_limit')
-        dag.concurrency = 2
-
-        executor = TestExecutor()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=executor,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-        )
-
-        job.run()
-
-        self.assertTrue(0 < len(executor.history))
-
-        concurrency_limit_reached_at_least_once = False
-
-        num_running_task_instances = 0
-
-        for running_task_instances in executor.history:
-            self.assertLessEqual(len(running_task_instances), dag.concurrency)
-            num_running_task_instances += len(running_task_instances)
-            if len(running_task_instances) == dag.concurrency:
-                concurrency_limit_reached_at_least_once = True
-
-        self.assertEquals(8, num_running_task_instances)
-        self.assertTrue(concurrency_limit_reached_at_least_once)
-
-        times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            DagConcurrencyLimitReached,
-        )
-
-        times_pool_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            NoAvailablePoolSlot,
-        )
-
-        times_task_concurrency_limit_reached_in_debug = 
self._times_called_with(
-            mock_log.debug,
-            TaskConcurrencyLimitReached,
-        )
-
-        self.assertEquals(0, times_pool_limit_reached_in_debug)
-        self.assertEquals(0, times_task_concurrency_limit_reached_in_debug)
-        self.assertGreater(times_dag_concurrency_limit_reached_in_debug, 0)
-
-    @patch('airflow.jobs.LoggingMixin.log')
-    @patch('airflow.jobs.conf.getint')
-    def test_backfill_with_no_pool_limit(self, mock_getint, mock_log):
-        non_pooled_backfill_task_slot_count = 2
-
-        def getint(section, key):
-            if section.lower() == 'core' and \
-                    'non_pooled_backfill_task_slot_count' == key.lower():
-                return non_pooled_backfill_task_slot_count
-            else:
-                return configuration.conf.getint(section, key)
-
-        mock_getint.side_effect = getint
-
-        dag = self._get_dummy_dag('test_backfill_with_no_pool_limit')
-
-        executor = TestExecutor()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=executor,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-        )
-
-        job.run()
-
-        self.assertTrue(0 < len(executor.history))
-
-        non_pooled_task_slot_count_reached_at_least_once = False
-
-        num_running_task_instances = 0
-
-        # if no pool is specified, the number of tasks running in
-        # parallel per backfill should be less than
-        # non_pooled_backfill_task_slot_count at any point of time.
-        for running_task_instances in executor.history:
-            self.assertLessEqual(
-                len(running_task_instances),
-                non_pooled_backfill_task_slot_count,
-            )
-            num_running_task_instances += len(running_task_instances)
-            if len(running_task_instances) == 
non_pooled_backfill_task_slot_count:
-                non_pooled_task_slot_count_reached_at_least_once = True
-
-        self.assertEquals(8, num_running_task_instances)
-        self.assertTrue(non_pooled_task_slot_count_reached_at_least_once)
-
-        times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            DagConcurrencyLimitReached,
-        )
-
-        times_pool_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            NoAvailablePoolSlot,
-        )
-
-        times_task_concurrency_limit_reached_in_debug = 
self._times_called_with(
-            mock_log.debug,
-            TaskConcurrencyLimitReached,
-        )
-
-        self.assertEquals(0, times_dag_concurrency_limit_reached_in_debug)
-        self.assertEquals(0, times_task_concurrency_limit_reached_in_debug)
-        self.assertGreater(times_pool_limit_reached_in_debug, 0)
-
-    def test_backfill_pool_not_found(self):
-        dag = self._get_dummy_dag(
-            dag_id='test_backfill_pool_not_found',
-            pool='king_pool',
-        )
-
-        executor = TestExecutor()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=executor,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-        )
-
-        try:
-            job.run()
-        except AirflowException:
-            return
-
-        self.fail()
-
-    @patch('airflow.jobs.LoggingMixin.log')
-    def test_backfill_respect_pool_limit(self, mock_log):
-        session = settings.Session()
-
-        slots = 2
-        pool = Pool(
-            pool='pool_with_two_slots',
-            slots=slots,
-        )
-        session.add(pool)
-        session.commit()
-
-        dag = self._get_dummy_dag(
-            dag_id='test_backfill_respect_pool_limit',
-            pool=pool.pool,
-        )
-
-        executor = TestExecutor()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=executor,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=7),
-        )
-
-        job.run()
-
-        self.assertTrue(0 < len(executor.history))
-
-        pool_was_full_at_least_once = False
-        num_running_task_instances = 0
-
-        for running_task_instances in executor.history:
-            self.assertLessEqual(len(running_task_instances), slots)
-            num_running_task_instances += len(running_task_instances)
-            if len(running_task_instances) == slots:
-                pool_was_full_at_least_once = True
-
-        self.assertEquals(8, num_running_task_instances)
-        self.assertTrue(pool_was_full_at_least_once)
-
-        times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            DagConcurrencyLimitReached,
-        )
-
-        times_pool_limit_reached_in_debug = self._times_called_with(
-            mock_log.debug,
-            NoAvailablePoolSlot,
-        )
-
-        times_task_concurrency_limit_reached_in_debug = 
self._times_called_with(
-            mock_log.debug,
-            TaskConcurrencyLimitReached,
-        )
-
-        self.assertEquals(0, times_task_concurrency_limit_reached_in_debug)
-        self.assertEquals(0, times_dag_concurrency_limit_reached_in_debug)
-        self.assertGreater(times_pool_limit_reached_in_debug, 0)
-
-    def test_backfill_run_rescheduled(self):
-        dag = DAG(
-            dag_id='test_backfill_run_rescheduled',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(
-                task_id='test_backfill_run_rescheduled_task-1',
-                dag=dag,
-            )
-
-        dag.clear()
-
-        executor = TestExecutor()
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          )
-        job.run()
-
-        ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        ti.set_state(State.UP_FOR_RESCHEDULE)
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          rerun_failed_tasks=True
-                          )
-        job.run()
-        ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.SUCCESS)
-
-    def test_backfill_rerun_failed_tasks(self):
-        dag = DAG(
-            dag_id='test_backfill_rerun_failed',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(
-                task_id='test_backfill_rerun_failed_task-1',
-                dag=dag)
-
-        dag.clear()
-
-        executor = TestExecutor()
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          )
-        job.run()
-
-        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        ti.set_state(State.FAILED)
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          rerun_failed_tasks=True
-                          )
-        job.run()
-        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.SUCCESS)
-
-    def test_backfill_rerun_upstream_failed_tasks(self):
-        dag = DAG(
-            dag_id='test_backfill_rerun_upstream_failed',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@daily')
-
-        with dag:
-            t1 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1',
-                               dag=dag)
-            t2 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2',
-                               dag=dag)
-            t1.set_upstream(t2)
-
-        dag.clear()
-        executor = TestExecutor()
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          )
-        job.run()
-
-        ti = 
TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        ti.set_state(State.UPSTREAM_FAILED)
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          rerun_failed_tasks=True
-                          )
-        job.run()
-        ti = 
TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.SUCCESS)
-
-    def test_backfill_rerun_failed_tasks_without_flag(self):
-        dag = DAG(
-            dag_id='test_backfill_rerun_failed',
-            start_date=DEFAULT_DATE,
-            schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(
-                task_id='test_backfill_rerun_failed_task-1',
-                dag=dag)
-
-        dag.clear()
-
-        executor = TestExecutor()
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          )
-        job.run()
-
-        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
-                execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        ti.set_state(State.FAILED)
-
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          rerun_failed_tasks=False
-                          )
-
-        with self.assertRaises(AirflowException):
-            job.run()
-
-    def test_backfill_ordered_concurrent_execute(self):
-        dag = DAG(
-            dag_id='test_backfill_ordered_concurrent_execute',
-            start_date=DEFAULT_DATE,
-            schedule_interval="@daily")
-
-        with dag:
-            op1 = DummyOperator(task_id='leave1')
-            op2 = DummyOperator(task_id='leave2')
-            op3 = DummyOperator(task_id='upstream_level_1')
-            op4 = DummyOperator(task_id='upstream_level_2')
-            op5 = DummyOperator(task_id='upstream_level_3')
-            # order randomly
-            op2.set_downstream(op3)
-            op1.set_downstream(op3)
-            op4.set_downstream(op5)
-            op3.set_downstream(op4)
-
-        dag.clear()
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
-                          )
-        job.run()
-
-        d0 = DEFAULT_DATE
-        d1 = d0 + datetime.timedelta(days=1)
-        d2 = d1 + datetime.timedelta(days=1)
-
-        # test executor history keeps a list
-        history = executor.history
-
-        self.maxDiff = None
-        self.assertListEqual(
-            # key[0] is dag id, key[3] is try_number, we don't care about 
either of those here
-            [sorted([item[-1].key[1:3] for item in batch]) for batch in 
history],
-            [
-                [
-                    ('leave1', d0),
-                    ('leave1', d1),
-                    ('leave1', d2),
-                    ('leave2', d0),
-                    ('leave2', d1),
-                    ('leave2', d2)
-                ],
-                [('upstream_level_1', d0), ('upstream_level_1', d1), 
('upstream_level_1', d2)],
-                [('upstream_level_2', d0), ('upstream_level_2', d1), 
('upstream_level_2', d2)],
-                [('upstream_level_3', d0), ('upstream_level_3', d1), 
('upstream_level_3', d2)],
-            ]
-        )
-
-    def test_backfill_pooled_tasks(self):
-        """
-        Test that queued tasks are executed by BackfillJob
-        """
-        session = settings.Session()
-        pool = Pool(pool='test_backfill_pooled_task_pool', slots=1)
-        session.add(pool)
-        session.commit()
-
-        dag = self.dagbag.get_dag('test_backfill_pooled_task_dag')
-        dag.clear()
-
-        job = BackfillJob(
-            dag=dag,
-            executor=TestExecutor(),
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE)
-
-        # run with timeout because this creates an infinite loop if not
-        # caught
-        with timeout(seconds=30):
-            job.run()
-
-        ti = TI(
-            task=dag.get_task('test_backfill_pooled_task'),
-            execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.SUCCESS)
-
-    def test_backfill_depends_on_past(self):
-        """
-        Test that backfill respects ignore_depends_on_past
-        """
-        dag = self.dagbag.get_dag('test_depends_on_past')
-        dag.clear()
-        run_date = DEFAULT_DATE + datetime.timedelta(days=5)
-
-        # backfill should deadlock
-        self.assertRaisesRegexp(
-            AirflowException,
-            'BackfillJob is deadlocked',
-            BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run)
-
-        BackfillJob(
-            dag=dag,
-            start_date=run_date,
-            end_date=run_date,
-            executor=TestExecutor(),
-            ignore_first_depends_on_past=True).run()
-
-        # ti should have succeeded
-        ti = TI(dag.tasks[0], run_date)
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.SUCCESS)
-
-    def test_run_ignores_all_dependencies(self):
-        """
-        Test that run respects ignore_all_dependencies
-        """
-        dag_id = 'test_run_ignores_all_dependencies'
-
-        dag = self.dagbag.get_dag('test_run_ignores_all_dependencies')
-        dag.clear()
-
-        task0_id = 'test_run_dependent_task'
-        args0 = ['run',
-                 '-A',
-                 dag_id,
-                 task0_id,
-                 DEFAULT_DATE.isoformat()]
-        cli.run(self.parser.parse_args(args0))
-        ti_dependent0 = TI(
-            task=dag.get_task(task0_id),
-            execution_date=DEFAULT_DATE)
-
-        ti_dependent0.refresh_from_db()
-        self.assertEqual(ti_dependent0.state, State.FAILED)
-
-        task1_id = 'test_run_dependency_task'
-        args1 = ['run',
-                 '-A',
-                 dag_id,
-                 task1_id,
-                 (DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()]
-        cli.run(self.parser.parse_args(args1))
-
-        ti_dependency = TI(
-            task=dag.get_task(task1_id),
-            execution_date=DEFAULT_DATE + datetime.timedelta(days=1))
-        ti_dependency.refresh_from_db()
-        self.assertEqual(ti_dependency.state, State.FAILED)
-
-        task2_id = 'test_run_dependent_task'
-        args2 = ['run',
-                 '-A',
-                 dag_id,
-                 task2_id,
-                 (DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()]
-        cli.run(self.parser.parse_args(args2))
-
-        ti_dependent = TI(
-            task=dag.get_task(task2_id),
-            execution_date=DEFAULT_DATE + datetime.timedelta(days=1))
-        ti_dependent.refresh_from_db()
-        self.assertEqual(ti_dependent.state, State.SUCCESS)
-
-    def test_backfill_depends_on_past_backwards(self):
-        """
-        Test that CLI respects -B argument and raises on interaction with 
depends_on_past
-        """
-        dag_id = 'test_depends_on_past'
-        start_date = DEFAULT_DATE + datetime.timedelta(days=1)
-        end_date = start_date + datetime.timedelta(days=1)
-        kwargs = dict(
-            start_date=start_date,
-            end_date=end_date,
-        )
-        dag = self.dagbag.get_dag(dag_id)
-        dag.clear()
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=dag,
-                          executor=executor,
-                          ignore_first_depends_on_past=True,
-                          **kwargs)
-        job.run()
-
-        ti = TI(dag.get_task('test_dop_task'), end_date)
-        ti.refresh_from_db()
-        # runs fine forwards
-        self.assertEqual(ti.state, State.SUCCESS)
-
-        # raises backwards
-        expected_msg = 'You cannot backfill backwards because one or more 
tasks depend_on_past: {}'.format(
-            'test_dop_task')
-        with self.assertRaisesRegexp(AirflowException, expected_msg):
-            executor = TestExecutor()
-            job = BackfillJob(dag=dag,
-                              executor=executor,
-                              run_backwards=True,
-                              **kwargs)
-            job.run()
-
-    def test_cli_receives_delay_arg(self):
-        """
-        Tests that the --delay argument is passed correctly to the BackfillJob
-        """
-        dag_id = 'example_bash_operator'
-        run_date = DEFAULT_DATE
-        args = [
-            'backfill',
-            dag_id,
-            '-s',
-            run_date.isoformat(),
-            '--delay_on_limit',
-            '0.5',
-        ]
-        parsed_args = self.parser.parse_args(args)
-        self.assertEqual(0.5, parsed_args.delay_on_limit)
-
-    def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1):
-        dag = DAG(
-            dag_id=dag_id,
-            start_date=DEFAULT_DATE,
-            schedule_interval="@hourly",
-            max_active_runs=max_active_runs
-        )
-
-        with dag:
-            op1 = DummyOperator(task_id='leave1')
-            op2 = DummyOperator(task_id='leave2')
-            op3 = DummyOperator(task_id='upstream_level_1')
-            op4 = DummyOperator(task_id='upstream_level_2')
-
-            op1 >> op2 >> op3
-            op4 >> op3
-
-        dag.clear()
-        return dag
-
-    def test_backfill_max_limit_check_within_limit(self):
-        dag = self._get_dag_test_max_active_limits(
-            'test_backfill_max_limit_check_within_limit',
-            max_active_runs=16)
-
-        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
-        end_date = DEFAULT_DATE
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=dag,
-                          start_date=start_date,
-                          end_date=end_date,
-                          executor=executor,
-                          donot_pickle=True)
-        job.run()
-
-        dagruns = DagRun.find(dag_id=dag.dag_id)
-        self.assertEqual(2, len(dagruns))
-        self.assertTrue(all([run.state == State.SUCCESS for run in dagruns]))
-
-    def test_backfill_max_limit_check(self):
-        dag_id = 'test_backfill_max_limit_check'
-        run_id = 'test_dagrun'
-        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
-        end_date = DEFAULT_DATE
-
-        dag_run_created_cond = threading.Condition()
-
-        def run_backfill(cond):
-            cond.acquire()
-            try:
-                dag = self._get_dag_test_max_active_limits(dag_id)
-
-                # this session object is different than the one in the main 
thread
-                thread_session = settings.Session()
-
-                # Existing dagrun that is not within the backfill range
-                dag.create_dagrun(
-                    run_id=run_id,
-                    state=State.RUNNING,
-                    execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
-                    start_date=DEFAULT_DATE,
-                )
-
-                thread_session.commit()
-                cond.notify()
-            finally:
-                cond.release()
-
-            executor = TestExecutor()
-            job = BackfillJob(dag=dag,
-                              start_date=start_date,
-                              end_date=end_date,
-                              executor=executor,
-                              donot_pickle=True)
-            job.run()
-
-            thread_session.close()
-
-        backfill_job_thread = threading.Thread(target=run_backfill,
-                                               name="run_backfill",
-                                               args=(dag_run_created_cond,))
-
-        dag_run_created_cond.acquire()
-        session = settings.Session()
-        backfill_job_thread.start()
-        try:
-            # at this point backfill can't run since the max_active_runs has 
been
-            # reached, so it is waiting
-            dag_run_created_cond.wait(timeout=1.5)
-            dagruns = DagRun.find(dag_id=dag_id)
-            dr = dagruns[0]
-            self.assertEqual(1, len(dagruns))
-            self.assertEqual(dr.run_id, run_id)
-
-            # allow the backfill to execute by setting the existing dag run to 
SUCCESS,
-            # backfill will execute dag runs 1 by 1
-            dr.set_state(State.SUCCESS)
-            session.merge(dr)
-            session.commit()
-            session.close()
-
-            backfill_job_thread.join()
-
-            dagruns = DagRun.find(dag_id=dag_id)
-            self.assertEqual(3, len(dagruns))  # 2 from backfill + 1 existing
-            self.assertEqual(dagruns[-1].run_id, dr.run_id)
-        finally:
-            dag_run_created_cond.release()
-
-    def test_backfill_max_limit_check_no_count_existing(self):
-        dag = self._get_dag_test_max_active_limits(
-            'test_backfill_max_limit_check_no_count_existing')
-        start_date = DEFAULT_DATE
-        end_date = DEFAULT_DATE
-
-        # Existing dagrun that is within the backfill range
-        dag.create_dagrun(run_id="test_existing_backfill",
-                          state=State.RUNNING,
-                          execution_date=DEFAULT_DATE,
-                          start_date=DEFAULT_DATE)
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=dag,
-                          start_date=start_date,
-                          end_date=end_date,
-                          executor=executor,
-                          donot_pickle=True)
-        job.run()
-
-        # BackfillJob will run since the existing DagRun does not count for 
the max
-        # active limit since it's within the backfill date range.
-        dagruns = DagRun.find(dag_id=dag.dag_id)
-        # will only be able to run 1 (the existing one) since there's just
-        # one dag run slot left given the max_active_runs limit
-        self.assertEqual(1, len(dagruns))
-        self.assertEqual(State.SUCCESS, dagruns[0].state)
-
-    def test_backfill_max_limit_check_complete_loop(self):
-        dag = self._get_dag_test_max_active_limits(
-            'test_backfill_max_limit_check_complete_loop')
-        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
-        end_date = DEFAULT_DATE
-
-        # Given the max limit to be 1 in active dag runs, we need to run the
-        # backfill job 3 times
-        success_expected = 2
-        executor = TestExecutor()
-        job = BackfillJob(dag=dag,
-                          start_date=start_date,
-                          end_date=end_date,
-                          executor=executor,
-                          donot_pickle=True)
-        job.run()
-
-        success_dagruns = len(DagRun.find(dag_id=dag.dag_id, 
state=State.SUCCESS))
-        running_dagruns = len(DagRun.find(dag_id=dag.dag_id, 
state=State.RUNNING))
-        self.assertEqual(success_expected, success_dagruns)
-        self.assertEqual(0, running_dagruns)  # no dag_runs in running state 
are left
-
-    def test_sub_set_subdag(self):
-        dag = DAG(
-            'test_sub_set_subdag',
-            start_date=DEFAULT_DATE,
-            default_args={'owner': 'owner1'})
-
-        with dag:
-            op1 = DummyOperator(task_id='leave1')
-            op2 = DummyOperator(task_id='leave2')
-            op3 = DummyOperator(task_id='upstream_level_1')
-            op4 = DummyOperator(task_id='upstream_level_2')
-            op5 = DummyOperator(task_id='upstream_level_3')
-            # order randomly
-            op2.set_downstream(op3)
-            op1.set_downstream(op3)
-            op4.set_downstream(op5)
-            op3.set_downstream(op4)
-
-        dag.clear()
-        dr = dag.create_dagrun(run_id="test",
-                               state=State.RUNNING,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE)
-
-        executor = TestExecutor()
-        sub_dag = dag.sub_dag(task_regex="leave*",
-                              include_downstream=False,
-                              include_upstream=False)
-        job = BackfillJob(dag=sub_dag,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE,
-                          executor=executor)
-        job.run()
-
-        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
-        # the run_id should have changed, so a refresh won't work
-        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
-        dr = drs[0]
-
-        
self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
-                         dr.run_id)
-        for ti in dr.get_task_instances():
-            if ti.task_id == 'leave1' or ti.task_id == 'leave2':
-                self.assertEqual(State.SUCCESS, ti.state)
-            else:
-                self.assertEqual(State.NONE, ti.state)
-
-    def test_backfill_fill_blanks(self):
-        dag = DAG(
-            'test_backfill_fill_blanks',
-            start_date=DEFAULT_DATE,
-            default_args={'owner': 'owner1'},
-        )
-
-        with dag:
-            op1 = DummyOperator(task_id='op1')
-            op2 = DummyOperator(task_id='op2')
-            op3 = DummyOperator(task_id='op3')
-            op4 = DummyOperator(task_id='op4')
-            op5 = DummyOperator(task_id='op5')
-            op6 = DummyOperator(task_id='op6')
-
-        dag.clear()
-        dr = dag.create_dagrun(run_id='test',
-                               state=State.RUNNING,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE)
-        executor = TestExecutor()
-
-        session = settings.Session()
-
-        tis = dr.get_task_instances()
-        for ti in tis:
-            if ti.task_id == op1.task_id:
-                ti.state = State.UP_FOR_RETRY
-                ti.end_date = DEFAULT_DATE
-            elif ti.task_id == op2.task_id:
-                ti.state = State.FAILED
-            elif ti.task_id == op3.task_id:
-                ti.state = State.SKIPPED
-            elif ti.task_id == op4.task_id:
-                ti.state = State.SCHEDULED
-            elif ti.task_id == op5.task_id:
-                ti.state = State.UPSTREAM_FAILED
-            # op6 = None
-            session.merge(ti)
-        session.commit()
-        session.close()
-
-        job = BackfillJob(dag=dag,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE,
-                          executor=executor)
-        self.assertRaisesRegexp(
-            AirflowException,
-            'Some task instances failed',
-            job.run)
-
-        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
-        # the run_id should have changed, so a refresh won't work
-        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
-        dr = drs[0]
-
-        self.assertEqual(dr.state, State.FAILED)
-
-        tis = dr.get_task_instances()
-        for ti in tis:
-            if ti.task_id in (op1.task_id, op4.task_id, op6.task_id):
-                self.assertEqual(ti.state, State.SUCCESS)
-            elif ti.task_id == op2.task_id:
-                self.assertEqual(ti.state, State.FAILED)
-            elif ti.task_id == op3.task_id:
-                self.assertEqual(ti.state, State.SKIPPED)
-            elif ti.task_id == op5.task_id:
-                self.assertEqual(ti.state, State.UPSTREAM_FAILED)
-
-    def test_backfill_execute_subdag(self):
-        dag = self.dagbag.get_dag('example_subdag_operator')
-        subdag_op_task = dag.get_task('section-1')
-
-        subdag = subdag_op_task.subdag
-        subdag.schedule_interval = '@daily'
-
-        start_date = timezone.utcnow()
-        executor = TestExecutor()
-        job = BackfillJob(dag=subdag,
-                          start_date=start_date,
-                          end_date=start_date,
-                          executor=executor,
-                          donot_pickle=True)
-        job.run()
-
-        history = executor.history
-        subdag_history = history[0]
-
-        # check that all 5 task instances of the subdag 'section-1' were 
executed
-        self.assertEqual(5, len(subdag_history))
-        for sdh in subdag_history:
-            ti = sdh[3]
-            self.assertIn('section-1-task-', ti.task_id)
-
-        subdag.clear()
-        dag.clear()
-
-    def test_subdag_clear_parentdag_downstream_clear(self):
-        dag = self.dagbag.get_dag('example_subdag_operator')
-        subdag_op_task = dag.get_task('section-1')
-
-        subdag = subdag_op_task.subdag
-        subdag.schedule_interval = '@daily'
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=subdag,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE,
-                          executor=executor,
-                          donot_pickle=True)
-
-        with timeout(seconds=30):
-            job.run()
-
-        ti0 = TI(
-            task=subdag.get_task('section-1-task-1'),
-            execution_date=DEFAULT_DATE)
-        ti0.refresh_from_db()
-        self.assertEqual(ti0.state, State.SUCCESS)
-
-        sdag = subdag.sub_dag(
-            task_regex='section-1-task-1',
-            include_downstream=True,
-            include_upstream=False)
-
-        sdag.clear(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE,
-            include_parentdag=True)
-
-        ti0.refresh_from_db()
-        self.assertEqual(State.NONE, ti0.state)
-
-        ti1 = TI(
-            task=dag.get_task('some-other-task'),
-            execution_date=DEFAULT_DATE)
-        self.assertEqual(State.NONE, ti1.state)
-
-        # Checks that all the Downstream tasks for Parent DAG
-        # have been cleared
-        for task in subdag_op_task.downstream_list:
-            ti = TI(
-                task=dag.get_task(task.task_id),
-                execution_date=DEFAULT_DATE
-            )
-            self.assertEqual(State.NONE, ti.state)
-
-        subdag.clear()
-        dag.clear()
-
-    def test_backfill_execute_subdag_with_removed_task(self):
-        """
-        Ensure that subdag operators execute properly in the case where
-        an associated task of the subdag has been removed from the dag
-        definition, but has instances in the database from previous runs.
-        """
-        dag = self.dagbag.get_dag('example_subdag_operator')
-        subdag = dag.get_task('section-1').subdag
-
-        executor = TestExecutor()
-        job = BackfillJob(dag=subdag,
-                          start_date=DEFAULT_DATE,
-                          end_date=DEFAULT_DATE,
-                          executor=executor,
-                          donot_pickle=True)
-
-        removed_task_ti = TI(
-            task=DummyOperator(task_id='removed_task'),
-            execution_date=DEFAULT_DATE,
-            state=State.REMOVED)
-        removed_task_ti.dag_id = subdag.dag_id
-
-        session = settings.Session()
-        session.merge(removed_task_ti)
-
-        with timeout(seconds=30):
-            job.run()
-
-        for task in subdag.tasks:
-            instance = session.query(TI).filter(
-                TI.dag_id == subdag.dag_id,
-                TI.task_id == task.task_id,
-                TI.execution_date == DEFAULT_DATE).first()
-
-            self.assertIsNotNone(instance)
-            self.assertEqual(instance.state, State.SUCCESS)
-
-        removed_task_ti.refresh_from_db()
-        self.assertEqual(removed_task_ti.state, State.REMOVED)
-
-        subdag.clear()
-        dag.clear()
-
-    def test_update_counters(self):
-        dag = DAG(
-            dag_id='test_manage_executor_state',
-            start_date=DEFAULT_DATE)
-
-        task1 = DummyOperator(
-            task_id='dummy',
-            dag=dag,
-            owner='airflow')
-
-        job = BackfillJob(dag=dag)
-
-        session = settings.Session()
-        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
-                               state=State.RUNNING,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE,
-                               session=session)
-        ti = TI(task1, dr.execution_date)
-        ti.refresh_from_db()
-
-        ti_status = BackfillJob._DagRunTaskStatus()
-
-        # test for success
-        ti.set_state(State.SUCCESS, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 1)
-        self.assertTrue(len(ti_status.skipped) == 0)
-        self.assertTrue(len(ti_status.failed) == 0)
-        self.assertTrue(len(ti_status.to_run) == 0)
-
-        ti_status.succeeded.clear()
-
-        # test for skipped
-        ti.set_state(State.SKIPPED, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 0)
-        self.assertTrue(len(ti_status.skipped) == 1)
-        self.assertTrue(len(ti_status.failed) == 0)
-        self.assertTrue(len(ti_status.to_run) == 0)
-
-        ti_status.skipped.clear()
-
-        # test for failed
-        ti.set_state(State.FAILED, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 0)
-        self.assertTrue(len(ti_status.skipped) == 0)
-        self.assertTrue(len(ti_status.failed) == 1)
-        self.assertTrue(len(ti_status.to_run) == 0)
-
-        ti_status.failed.clear()
-
-        # test for retry
-        ti.set_state(State.UP_FOR_RETRY, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 0)
-        self.assertTrue(len(ti_status.skipped) == 0)
-        self.assertTrue(len(ti_status.failed) == 0)
-        self.assertTrue(len(ti_status.to_run) == 1)
-
-        ti_status.to_run.clear()
-
-        # test for reschedule
-        ti.set_state(State.UP_FOR_RESCHEDULE, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 0)
-        self.assertTrue(len(ti_status.skipped) == 0)
-        self.assertTrue(len(ti_status.failed) == 0)
-        self.assertTrue(len(ti_status.to_run) == 1)
-
-        ti_status.to_run.clear()
-
-        # test for none
-        ti.set_state(State.NONE, session)
-        ti_status.running[ti.key] = ti
-        job._update_counters(ti_status=ti_status)
-        self.assertTrue(len(ti_status.running) == 0)
-        self.assertTrue(len(ti_status.succeeded) == 0)
-        self.assertTrue(len(ti_status.skipped) == 0)
-        self.assertTrue(len(ti_status.failed) == 0)
-        self.assertTrue(len(ti_status.to_run) == 1)
-
-        ti_status.to_run.clear()
-
-        session.close()
-
-    def test_dag_get_run_dates(self):
-
-        def get_test_dag_for_backfill(schedule_interval=None):
-            dag = DAG(
-                dag_id='test_get_dates',
-                start_date=DEFAULT_DATE,
-                schedule_interval=schedule_interval)
-            DummyOperator(
-                task_id='dummy',
-                dag=dag,
-                owner='airflow',
-            )
-            return dag
-
-        test_dag = get_test_dag_for_backfill()
-        self.assertEqual([DEFAULT_DATE], test_dag.get_run_dates(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE))
-
-        test_dag = get_test_dag_for_backfill(schedule_interval="@hourly")
-        self.assertEqual([DEFAULT_DATE - datetime.timedelta(hours=3),
-                          DEFAULT_DATE - datetime.timedelta(hours=2),
-                          DEFAULT_DATE - datetime.timedelta(hours=1),
-                          DEFAULT_DATE],
-                         test_dag.get_run_dates(
-                             start_date=DEFAULT_DATE - 
datetime.timedelta(hours=3),
-                             end_date=DEFAULT_DATE,))
-
-    def test_backfill_run_backwards(self):
-        dag = self.dagbag.get_dag("test_start_date_scheduling")
-        dag.clear()
-
-        job = BackfillJob(
-            dag=dag,
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE + datetime.timedelta(days=1),
-            run_backwards=True
-        )
-        job.run()
-
-        session = settings.Session()
-        tis = session.query(TI).filter(
-            TI.dag_id == 'test_start_date_scheduling' and TI.task_id == 'dummy'
-        ).order_by(TI.execution_date).all()
-
-        queued_times = [ti.queued_dttm for ti in tis]
-        self.assertTrue(queued_times == sorted(queued_times, reverse=True))
-        self.assertTrue(all([ti.state == State.SUCCESS for ti in tis]))
-
-        dag.clear()
-        session.close()
-
-
-class LocalTaskJobTest(unittest.TestCase):
-    def setUp(self):
-        clear_db_runs()
-
-    def test_localtaskjob_essential_attr(self):
-        """
-        Check whether essential attributes
-        of LocalTaskJob can be assigned with
-        proper values without intervention
-        """
-        dag = DAG(
-            'test_localtaskjob_essential_attr',
-            start_date=DEFAULT_DATE,
-            default_args={'owner': 'owner1'})
-
-        with dag:
-            op1 = DummyOperator(task_id='op1')
-
-        dag.clear()
-        dr = dag.create_dagrun(run_id="test",
-                               state=State.SUCCESS,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE)
-        ti = dr.get_task_instance(task_id=op1.task_id)
-
-        job1 = LocalTaskJob(task_instance=ti,
-                            ignore_ti_state=True,
-                            executor=SequentialExecutor())
-
-        essential_attr = ["dag_id", "job_type", "start_date", "hostname"]
-
-        check_result_1 = [hasattr(job1, attr) for attr in essential_attr]
-        self.assertTrue(all(check_result_1))
-
-        check_result_2 = [getattr(job1, attr) is not None for attr in 
essential_attr]
-        self.assertTrue(all(check_result_2))
-
-    @patch('os.getpid')
-    def test_localtaskjob_heartbeat(self, mock_pid):
-        session = settings.Session()
-        dag = DAG(
-            'test_localtaskjob_heartbeat',
-            start_date=DEFAULT_DATE,
-            default_args={'owner': 'owner1'})
-
-        with dag:
-            op1 = DummyOperator(task_id='op1')
-
-        dag.clear()
-        dr = dag.create_dagrun(run_id="test",
-                               state=State.SUCCESS,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE,
-                               session=session)
-        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
-        ti.state = State.RUNNING
-        ti.hostname = "blablabla"
-        session.commit()
-
-        job1 = LocalTaskJob(task_instance=ti,
-                            ignore_ti_state=True,
-                            executor=SequentialExecutor())
-        self.assertRaises(AirflowException, job1.heartbeat_callback)
-
-        mock_pid.return_value = 1
-        ti.state = State.RUNNING
-        ti.hostname = get_hostname()
-        ti.pid = 1
-        session.merge(ti)
-        session.commit()
-
-        ret = job1.heartbeat_callback()
-        self.assertEqual(ret, None)
-
-        mock_pid.return_value = 2
-        self.assertRaises(AirflowException, job1.heartbeat_callback)
-
-    @unittest.skipIf('mysql' in configuration.conf.get('core', 
'sql_alchemy_conn'),
-                     "flaky when run on mysql")
-    @unittest.skipIf('postgresql' in configuration.conf.get('core', 
'sql_alchemy_conn'),
-                     'flaky when run on postgresql')
-    def test_mark_success_no_kill(self):
-        """
-        Test that ensures that mark_success in the UI doesn't cause
-        the task to fail, and that the task exits
-        """
-        dagbag = models.DagBag(
-            dag_folder=TEST_DAG_FOLDER,
-            include_examples=False,
-        )
-        dag = dagbag.dags.get('test_mark_success')
-        task = dag.get_task('task1')
-
-        session = settings.Session()
-
-        dag.clear()
-        dag.create_dagrun(run_id="test",
-                          state=State.RUNNING,
-                          execution_date=DEFAULT_DATE,
-                          start_date=DEFAULT_DATE,
-                          session=session)
-        ti = TI(task=task, execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
-        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True)
-        process = multiprocessing.Process(target=job1.run)
-        process.start()
-        ti.refresh_from_db()
-        for i in range(0, 50):
-            if ti.state == State.RUNNING:
-                break
-            time.sleep(0.1)
-            ti.refresh_from_db()
-        self.assertEqual(State.RUNNING, ti.state)
-        ti.state = State.SUCCESS
-        session.merge(ti)
-        session.commit()
-
-        process.join(timeout=10)
-        self.assertFalse(process.is_alive())
-        ti.refresh_from_db()
-        self.assertEqual(State.SUCCESS, ti.state)
-
-    def test_localtaskjob_double_trigger(self):
-        dagbag = models.DagBag(
-            dag_folder=TEST_DAG_FOLDER,
-            include_examples=False,
-        )
-        dag = dagbag.dags.get('test_localtaskjob_double_trigger')
-        task = dag.get_task('test_localtaskjob_double_trigger_task')
-
-        session = settings.Session()
-
-        dag.clear()
-        dr = dag.create_dagrun(run_id="test",
-                               state=State.SUCCESS,
-                               execution_date=DEFAULT_DATE,
-                               start_date=DEFAULT_DATE,
-                               session=session)
-        ti = dr.get_task_instance(task_id=task.task_id, session=session)
-        ti.state = State.RUNNING
-        ti.hostname = get_hostname()
-        ti.pid = 1
-        session.commit()
-
-        ti_run = TI(task=task, execution_date=DEFAULT_DATE)
-        job1 = LocalTaskJob(task_instance=ti_run,
-                            ignore_ti_state=True,
-                            executor=SequentialExecutor())
-        with patch.object(BaseTaskRunner, 'start', return_value=None) as 
mock_method:
-            job1.run()
-            mock_method.assert_not_called()
-
-        ti = dr.get_task_instance(task_id=task.task_id, session=session)
-        self.assertEqual(ti.pid, 1)
-        self.assertEqual(ti.state, State.RUNNING)
-
-        session.close()
+    os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'dags')
 
 Review comment:
   In other test scripts we do `from tests.core import TEST_DAG_FOLDER`, shall 
we stick to that method everywhere?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to