Ash Berlin-Taylor created AIRFLOW-4358:
------------------------------------------

             Summary: Decouple running actual tasks from test_jobs.py
                 Key: AIRFLOW-4358
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4358
             Project: Apache Airflow
          Issue Type: Test
          Components: tests
            Reporter: Ash Berlin-Taylor


In tests/test_jobs.py we have a lot of slow, fragile tests that test the 
behaviour of the "Jobs" - BackfillJob and SchedulerJob being the main two.

Since we have tests that check that the executors will actually _run_ tasks we 
should "stub" out the executor and replace the executor in those tests with one 
that just records what tasks it was asked to run.

This will hopefully speed up the tests and make them less fragile.

A possible starting point:

{code:title=test_jobs.diff}
diff --git a/tests/test_jobs.py b/tests/test_jobs.py
index e3ae8be1..aa919f90 100644
--- a/tests/test_jobs.py
+++ b/tests/test_jobs.py
@@ -1384,6 +1384,14 @@ class LocalTaskJobTest(unittest.TestCase):

 class SchedulerJobTest(unittest.TestCase):

+    class NullExecutor(airflow.executors.base_executor.BaseExecutor):
+        def heartbeat(self, *args, **kwargs):
+            # This would normally pop tasks of the queue and run them.
+            pass
+
+        def end():
+            pass
+
     def setUp(self):
         clear_db_runs()
         clear_db_pools()
@@ -1391,6 +1399,10 @@ class SchedulerJobTest(unittest.TestCase):
         clear_db_sla_miss()
         clear_db_errors()

+        # Speed up some tests by not running the tasks, just look at what we
+        # enqueue!
+        self.null_exec = self.NullExecutor()
+
     @classmethod
     def setUpClass(cls):
         cls.dagbag = DagBag()
@@ -1409,8 +1421,7 @@ class SchedulerJobTest(unittest.TestCase):
     def tearDownClass(cls):
         cls.patcher.stop()

-    @staticmethod
-    def run_single_scheduler_loop_with_no_dags(dags_folder):
+    def run_single_scheduler_loop_with_no_dags(self, dags_folder):
         """
         Utility function that runs a single scheduler loop without actually
         changing/scheduling any dags. This is useful to simulate the other 
side effects of
@@ -1421,6 +1432,7 @@ class SchedulerJobTest(unittest.TestCase):
         :type directory: str
         """
         scheduler = SchedulerJob(
+            executor=self.null_exec,
             dag_id='this_dag_doesnt_exist',  # We don't want to actually run 
anything
             num_runs=1,
             subdir=os.path.join(dags_folder))
@@ -2483,6 +2495,7 @@ class SchedulerJobTest(unittest.TestCase):
             self.assertTrue(dag.start_date > datetime.datetime.utcnow())

             scheduler = SchedulerJob(dag_id,
+                                     executor=self.null_exec,
                                      subdir=os.path.join(TEST_DAG_FOLDER, 
'test_scheduler_dags.py'),
                                      num_runs=1)
             scheduler.run()
@@ -2491,6 +2504,7 @@ class SchedulerJobTest(unittest.TestCase):
             self.assertEqual(
                 len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
             session.commit()
+            self.assertEqual({}, self.null_exec.queued_tasks)

             # previously, running this backfill would kick off the Scheduler
             # because it would take the most recent run and start from there
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to