This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e1aa7d9272 Move tests under correct test class (#22768)
e1aa7d9272 is described below
commit e1aa7d92721fd09c1188a04f546711c69fbb45ed
Author: Ping Zhang <[email protected]>
AuthorDate: Tue Apr 5 20:22:50 2022 -0700
Move tests under correct test class (#22768)
---
tests/jobs/test_scheduler_job.py | 244 +++++++++++++++++++--------------------
1 file changed, 122 insertions(+), 122 deletions(-)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9570c29fd5..f0f8c52b07 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3854,6 +3854,128 @@ class TestSchedulerJob:
dr.refresh_from_db(session)
assert dr.state == DagRunState.SUCCESS
+ def test_should_mark_dummy_task_as_success(self):
+ dag_file = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
'../dags/test_only_dummy_tasks.py'
+ )
+
+ # Write DAGs to dag and serialized_dag table
+ dagbag = DagBag(dag_folder=dag_file, include_examples=False,
read_dags_from_db=False)
+ dagbag.sync_to_db()
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.processor_agent = mock.MagicMock()
+ dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
+
+ # Create DagRun
+ session = settings.Session()
+ orm_dag = session.query(DagModel).get(dag.dag_id)
+ self.scheduler_job._create_dag_runs([orm_dag], session)
+
+ drs = DagRun.find(dag_id=dag.dag_id, session=session)
+ assert len(drs) == 1
+ dr = drs[0]
+
+ # Schedule TaskInstances
+ self.scheduler_job._schedule_dag_run(dr, session)
+ with create_session() as session:
+ tis = session.query(TaskInstance).all()
+
+ dags = self.scheduler_job.dagbag.dags.values()
+ assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
+ assert 5 == len(tis)
+ assert {
+ ('test_task_a', 'success'),
+ ('test_task_b', None),
+ ('test_task_c', 'success'),
+ ('test_task_on_execute', 'scheduled'),
+ ('test_task_on_success', 'scheduled'),
+ } == {(ti.task_id, ti.state) for ti in tis}
+ for state, start_date, end_date, duration in [
+ (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+ ]:
+ if state == 'success':
+ assert start_date is not None
+ assert end_date is not None
+ assert 0.0 == duration
+ else:
+ assert start_date is None
+ assert end_date is None
+ assert duration is None
+
+ self.scheduler_job._schedule_dag_run(dr, session)
+ with create_session() as session:
+ tis = session.query(TaskInstance).all()
+
+ assert 5 == len(tis)
+ assert {
+ ('test_task_a', 'success'),
+ ('test_task_b', 'success'),
+ ('test_task_c', 'success'),
+ ('test_task_on_execute', 'scheduled'),
+ ('test_task_on_success', 'scheduled'),
+ } == {(ti.task_id, ti.state) for ti in tis}
+ for state, start_date, end_date, duration in [
+ (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+ ]:
+ if state == 'success':
+ assert start_date is not None
+ assert end_date is not None
+ assert 0.0 == duration
+ else:
+ assert start_date is None
+ assert end_date is None
+ assert duration is None
+
+ @pytest.mark.need_serialized_dag
+ def test_catchup_works_correctly(self, dag_maker):
+ """Test that catchup works correctly"""
+ session = settings.Session()
+ with dag_maker(
+ dag_id='test_catchup_schedule_dag',
+ schedule_interval=timedelta(days=1),
+ start_date=DEFAULT_DATE,
+ catchup=True,
+ max_active_runs=1,
+ session=session,
+ ) as dag:
+ DummyOperator(task_id='dummy')
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor()
+ self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
+
+ self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
+ self.scheduler_job._start_queued_dagruns(session)
+ # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
+ dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
+ ti = dr.get_task_instance(task_id='dummy')
+ ti.state = State.SUCCESS
+ session.merge(ti)
+ session.flush()
+
+ self.scheduler_job._schedule_dag_run(dr, session)
+ session.flush()
+
+ # Run the second time so _update_dag_next_dagrun will run
+ self.scheduler_job._schedule_dag_run(dr, session)
+ session.flush()
+
+ dag.catchup = False
+ dag.sync_to_db()
+ assert not dag.catchup
+
+ dm = DagModel.get_dagmodel(dag.dag_id)
+ self.scheduler_job._create_dag_runs([dm], session)
+
+ # Check catchup worked correctly by ensuring execution_date is quite
new
+ # Our dag is a daily dag
+ assert (
+ session.query(DagRun.execution_date)
+ .filter(DagRun.execution_date != DEFAULT_DATE) # exclude the
first run
+ .scalar()
+ ) > (timezone.utcnow() - timedelta(days=2))
+
@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():
@@ -4065,125 +4187,3 @@ class TestSchedulerJobQueriesCount:
prefix = "Collected database query count mismatches:"
joined = "\n\n".join(failures)
raise AssertionError(f"{prefix}\n\n{joined}")
-
- def test_should_mark_dummy_task_as_success(self):
- dag_file = os.path.join(
- os.path.dirname(os.path.realpath(__file__)),
'../dags/test_only_dummy_tasks.py'
- )
-
- # Write DAGs to dag and serialized_dag table
- dagbag = DagBag(dag_folder=dag_file, include_examples=False,
read_dags_from_db=False)
- dagbag.sync_to_db()
-
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- self.scheduler_job.processor_agent = mock.MagicMock()
- dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
-
- # Create DagRun
- session = settings.Session()
- orm_dag = session.query(DagModel).get(dag.dag_id)
- self.scheduler_job._create_dag_runs([orm_dag], session)
-
- drs = DagRun.find(dag_id=dag.dag_id, session=session)
- assert len(drs) == 1
- dr = drs[0]
-
- # Schedule TaskInstances
- self.scheduler_job._schedule_dag_run(dr, session)
- with create_session() as session:
- tis = session.query(TaskInstance).all()
-
- dags = self.scheduler_job.dagbag.dags.values()
- assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
- assert 5 == len(tis)
- assert {
- ('test_task_a', 'success'),
- ('test_task_b', None),
- ('test_task_c', 'success'),
- ('test_task_on_execute', 'scheduled'),
- ('test_task_on_success', 'scheduled'),
- } == {(ti.task_id, ti.state) for ti in tis}
- for state, start_date, end_date, duration in [
- (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
- ]:
- if state == 'success':
- assert start_date is not None
- assert end_date is not None
- assert 0.0 == duration
- else:
- assert start_date is None
- assert end_date is None
- assert duration is None
-
- self.scheduler_job._schedule_dag_run(dr, session)
- with create_session() as session:
- tis = session.query(TaskInstance).all()
-
- assert 5 == len(tis)
- assert {
- ('test_task_a', 'success'),
- ('test_task_b', 'success'),
- ('test_task_c', 'success'),
- ('test_task_on_execute', 'scheduled'),
- ('test_task_on_success', 'scheduled'),
- } == {(ti.task_id, ti.state) for ti in tis}
- for state, start_date, end_date, duration in [
- (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
- ]:
- if state == 'success':
- assert start_date is not None
- assert end_date is not None
- assert 0.0 == duration
- else:
- assert start_date is None
- assert end_date is None
- assert duration is None
-
- @pytest.mark.need_serialized_dag
- def test_catchup_works_correctly(self, dag_maker):
- """Test that catchup works correctly"""
- session = settings.Session()
- with dag_maker(
- dag_id='test_catchup_schedule_dag',
- schedule_interval=timedelta(days=1),
- start_date=DEFAULT_DATE,
- catchup=True,
- max_active_runs=1,
- session=session,
- ) as dag:
- DummyOperator(task_id='dummy')
-
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- self.scheduler_job.executor = MockExecutor()
- self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
-
- self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
- self.scheduler_job._start_queued_dagruns(session)
- # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
- dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
- ti = dr.get_task_instance(task_id='dummy')
- ti.state = State.SUCCESS
- session.merge(ti)
- session.flush()
-
- self.scheduler_job._schedule_dag_run(dr, session)
- session.flush()
-
- # Run the second time so _update_dag_next_dagrun will run
- self.scheduler_job._schedule_dag_run(dr, session)
- session.flush()
-
- dag.catchup = False
- dag.sync_to_db()
- assert not dag.catchup
-
- dm = DagModel.get_dagmodel(dag.dag_id)
- self.scheduler_job._create_dag_runs([dm], session)
-
- # Check catchup worked correctly by ensuring execution_date is quite
new
- # Our dag is a daily dag
- assert (
- session.query(DagRun.execution_date)
- .filter(DagRun.execution_date != DEFAULT_DATE) # exclude the
first run
- .scalar()
- ) > (timezone.utcnow() - timedelta(days=2))