ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663765528
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1475,6 +1475,55 @@ def
test_scheduler_loop_should_change_state_for_tis_without_dagrun(
assert ti.start_date == ti.end_date
assert ti.duration is not None
+ def
test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+ """This tests that if dags are missing in dagbag, then it should be
failed"""
+ session = settings.Session()
+ dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner':
'owner1'})
+
+ with dag:
+ op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False,
read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ # Mock dagbag.has_dag
+ dagbag.has_dag = mock.MagicMock(return_value=False)
+
+ dag = DagBag(read_dags_from_db=True,
include_examples=False).get_dag(dag_id)
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE + timedelta(days=1),
+ start_date=DEFAULT_DATE + timedelta(days=1),
+ session=session,
+ )
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.RUNNING
+ session.commit()
+
+ # This poll interval is large, bug the scheduler doesn't sleep that
+ # long, instead we hit the clean_tis_without_dag interval instead
+ self.scheduler_job = SchedulerJob(num_runs=2,
processor_poll_interval=30)
+ self.scheduler_job.dagbag = dagbag
+ executor = MockExecutor(do_update=False)
+ self.scheduler_job.executor = executor
+ processor = mock.MagicMock()
+ processor.done = False
+ self.scheduler_job.processor_agent = processor
+
+ with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+ {('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
+ ):
+ self.scheduler_job._run_scheduler_loop()
Review comment:
Not sure we need to run the whole scheduler loop here -- we could just
call `self.scheduler_job._clean_tis_without_dag()` directly.
By calling scheduler_loop the only thing extra we check is that we've added
this to the timer, but we can see that pretty easily.
Dunno :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]