ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r657505551



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2124,6 +2124,59 @@ def 
test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def 
test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be 
failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 
'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        mock_has_dag = mock.MagicMock(return_value=False)
+        dagbag.has_dag = mock_has_dag
+
+        dag = DagBag(read_dags_from_db=True, 
include_examples=False).get_dag(dag_id)
+        # Create DAG run with FAILED state
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the missing_dag_cleanup_interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, 
processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        executor.queued_tasks

Review comment:
       It's actually needed to simulate the executor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to