seelmann commented on a change in pull request #5908: Revert "[AIRFLOW-4797] 
Improve performance and behaviour of zombie de…
URL: https://github.com/apache/airflow/pull/5908#discussion_r321969740
 
 

 ##########
 File path: tests/utils/test_dag_processing.py
 ##########
 @@ -169,6 +179,126 @@ def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
         manager.set_file_paths(['abc.txt'])
         self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})
 
+    def test_find_zombies(self):
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            async_mode=True)
+
+        dagbag = DagBag(TEST_DAG_FOLDER)
+        with create_session() as session:
+            session.query(LJ).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TI(task, DEFAULT_DATE, State.RUNNING)
+            lj = LJ(ti)
+            lj.state = State.SHUTDOWN
+            lj.id = 1
+            ti.job_id = lj.id
+
+            session.add(lj)
+            session.add(ti)
+            session.commit()
+
+            manager._last_zombie_query_time = timezone.utcnow() - timedelta(
+                seconds=manager._zombie_threshold_secs + 1)
+            manager._find_zombies()
+            zombies = manager._zombies
+            self.assertEqual(1, len(zombies))
+            self.assertIsInstance(zombies[0], SimpleTaskInstance)
+            self.assertEqual(ti.dag_id, zombies[0].dag_id)
+            self.assertEqual(ti.task_id, zombies[0].task_id)
+            self.assertEqual(ti.execution_date, zombies[0].execution_date)
+
+            session.query(TI).delete()
+            session.query(LJ).delete()
+
+    def test_zombies_are_correctly_passed_to_dag_file_processor(self):
+        """
+        Check that the same set of zombies are passed to the dag
+        file processors until the next zombie detection logic is invoked.
+        :return:
+        """
+
+        dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 
'test_example_bash_operator'), include_examples=False)
 
 Review comment:
   Seems test requires to `include_examples=True` in order to load the test DAG 
below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to