KevinYang21 commented on a change in pull request #5908: Revert "[AIRFLOW-4797] 
Improve performance and behaviour of zombie de…
URL: https://github.com/apache/airflow/pull/5908#discussion_r321985815
 
 

 ##########
 File path: tests/utils/test_dag_processing.py
 ##########
 @@ -169,6 +179,126 @@ def 
test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
         manager.set_file_paths(['abc.txt'])
         self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})
 
+    def test_find_zombies(self):
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            file_paths=['abc.txt'],
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            async_mode=True)
+
+        dagbag = DagBag(TEST_DAG_FOLDER)
+        with create_session() as session:
+            session.query(LJ).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TI(task, DEFAULT_DATE, State.RUNNING)
+            lj = LJ(ti)
+            lj.state = State.SHUTDOWN
+            lj.id = 1
+            ti.job_id = lj.id
+
+            session.add(lj)
+            session.add(ti)
+            session.commit()
+
+            manager._last_zombie_query_time = timezone.utcnow() - timedelta(
+                seconds=manager._zombie_threshold_secs + 1)
+            manager._find_zombies()
+            zombies = manager._zombies
+            self.assertEqual(1, len(zombies))
+            self.assertIsInstance(zombies[0], SimpleTaskInstance)
+            self.assertEqual(ti.dag_id, zombies[0].dag_id)
+            self.assertEqual(ti.task_id, zombies[0].task_id)
+            self.assertEqual(ti.execution_date, zombies[0].execution_date)
+
+            session.query(TI).delete()
+            session.query(LJ).delete()
+
+    def test_zombies_are_correctly_passed_to_dag_file_processor(self):
+        """
+        Check that the same set of zombies are passed to the dag
+        file processors until the next zombie detection logic is invoked.
+        :return:
+        """
+
+        dagbag = DagBag(os.path.join(TEST_DAG_FOLDER, 
'test_example_bash_operator'), include_examples=False)
 
 Review comment:
   MB there, was a local change wasn't committed: missing `.py` in the file 
name. I was using the test DAG in tests/dags so wanna prevent loading 
examples--for less redundant logging and parsing time. I guess ideally we wanna 
to maintain one DAG file for each such test case to make it more `unit` test 
but that convention might introduce too many files so decided to reuse the test 
DAG file for now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to