ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663765528



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1475,6 +1475,55 @@ def 
test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def 
test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be 
failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 
'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        dagbag.has_dag = mock.MagicMock(return_value=False)
+
+        dag = DagBag(read_dags_from_db=True, 
include_examples=False).get_dag(dag_id)
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the clean_tis_without_dag interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, 
processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        self.scheduler_job.executor = executor
+        processor = mock.MagicMock()
+        processor.done = False
+        self.scheduler_job.processor_agent = processor
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+            {('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
+        ):
+            self.scheduler_job._run_scheduler_loop()

Review comment:
       Not sure we need to run the whole scheduler loop here -- we could just 
call `self.scheduler_job._clean_tis_without_dag()` directly.
   
   By calling scheduler_loop the only thing extra we check is that we've added 
this to the timer, but we can see that pretty easily.
   
   Dunno :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to