This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e1aa7d9272 Move tests under correct test class (#22768)
e1aa7d9272 is described below

commit e1aa7d92721fd09c1188a04f546711c69fbb45ed
Author: Ping Zhang <[email protected]>
AuthorDate: Tue Apr 5 20:22:50 2022 -0700

    Move tests under correct test class (#22768)
---
 tests/jobs/test_scheduler_job.py | 244 +++++++++++++++++++--------------------
 1 file changed, 122 insertions(+), 122 deletions(-)

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9570c29fd5..f0f8c52b07 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3854,6 +3854,128 @@ class TestSchedulerJob:
         dr.refresh_from_db(session)
         assert dr.state == DagRunState.SUCCESS
 
+    def test_should_mark_dummy_task_as_success(self):
+        dag_file = os.path.join(
+            os.path.dirname(os.path.realpath(__file__)), 
'../dags/test_only_dummy_tasks.py'
+        )
+
+        # Write DAGs to dag and serialized_dag table
+        dagbag = DagBag(dag_folder=dag_file, include_examples=False, 
read_dags_from_db=False)
+        dagbag.sync_to_db()
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+        dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
+
+        # Create DagRun
+        session = settings.Session()
+        orm_dag = session.query(DagModel).get(dag.dag_id)
+        self.scheduler_job._create_dag_runs([orm_dag], session)
+
+        drs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(drs) == 1
+        dr = drs[0]
+
+        # Schedule TaskInstances
+        self.scheduler_job._schedule_dag_run(dr, session)
+        with create_session() as session:
+            tis = session.query(TaskInstance).all()
+
+        dags = self.scheduler_job.dagbag.dags.values()
+        assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
+        assert 5 == len(tis)
+        assert {
+            ('test_task_a', 'success'),
+            ('test_task_b', None),
+            ('test_task_c', 'success'),
+            ('test_task_on_execute', 'scheduled'),
+            ('test_task_on_success', 'scheduled'),
+        } == {(ti.task_id, ti.state) for ti in tis}
+        for state, start_date, end_date, duration in [
+            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+        ]:
+            if state == 'success':
+                assert start_date is not None
+                assert end_date is not None
+                assert 0.0 == duration
+            else:
+                assert start_date is None
+                assert end_date is None
+                assert duration is None
+
+        self.scheduler_job._schedule_dag_run(dr, session)
+        with create_session() as session:
+            tis = session.query(TaskInstance).all()
+
+        assert 5 == len(tis)
+        assert {
+            ('test_task_a', 'success'),
+            ('test_task_b', 'success'),
+            ('test_task_c', 'success'),
+            ('test_task_on_execute', 'scheduled'),
+            ('test_task_on_success', 'scheduled'),
+        } == {(ti.task_id, ti.state) for ti in tis}
+        for state, start_date, end_date, duration in [
+            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
+        ]:
+            if state == 'success':
+                assert start_date is not None
+                assert end_date is not None
+                assert 0.0 == duration
+            else:
+                assert start_date is None
+                assert end_date is None
+                assert duration is None
+
+    @pytest.mark.need_serialized_dag
+    def test_catchup_works_correctly(self, dag_maker):
+        """Test that catchup works correctly"""
+        session = settings.Session()
+        with dag_maker(
+            dag_id='test_catchup_schedule_dag',
+            schedule_interval=timedelta(days=1),
+            start_date=DEFAULT_DATE,
+            catchup=True,
+            max_active_runs=1,
+            session=session,
+        ) as dag:
+            DummyOperator(task_id='dummy')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
+        self.scheduler_job._start_queued_dagruns(session)
+        # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
+        dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
+        ti = dr.get_task_instance(task_id='dummy')
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.flush()
+
+        self.scheduler_job._schedule_dag_run(dr, session)
+        session.flush()
+
+        # Run the second time so _update_dag_next_dagrun will run
+        self.scheduler_job._schedule_dag_run(dr, session)
+        session.flush()
+
+        dag.catchup = False
+        dag.sync_to_db()
+        assert not dag.catchup
+
+        dm = DagModel.get_dagmodel(dag.dag_id)
+        self.scheduler_job._create_dag_runs([dm], session)
+
+        # Check catchup worked correctly by ensuring execution_date is quite 
new
+        # Our dag is a daily dag
+        assert (
+            session.query(DagRun.execution_date)
+            .filter(DagRun.execution_date != DEFAULT_DATE)  # exclude the 
first run
+            .scalar()
+        ) > (timezone.utcnow() - timedelta(days=2))
+
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():
@@ -4065,125 +4187,3 @@ class TestSchedulerJobQueriesCount:
                 prefix = "Collected database query count mismatches:"
                 joined = "\n\n".join(failures)
                 raise AssertionError(f"{prefix}\n\n{joined}")
-
-    def test_should_mark_dummy_task_as_success(self):
-        dag_file = os.path.join(
-            os.path.dirname(os.path.realpath(__file__)), 
'../dags/test_only_dummy_tasks.py'
-        )
-
-        # Write DAGs to dag and serialized_dag table
-        dagbag = DagBag(dag_folder=dag_file, include_examples=False, 
read_dags_from_db=False)
-        dagbag.sync_to_db()
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.processor_agent = mock.MagicMock()
-        dag = self.scheduler_job.dagbag.get_dag("test_only_dummy_tasks")
-
-        # Create DagRun
-        session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        # Schedule TaskInstances
-        self.scheduler_job._schedule_dag_run(dr, session)
-        with create_session() as session:
-            tis = session.query(TaskInstance).all()
-
-        dags = self.scheduler_job.dagbag.dags.values()
-        assert ['test_only_dummy_tasks'] == [dag.dag_id for dag in dags]
-        assert 5 == len(tis)
-        assert {
-            ('test_task_a', 'success'),
-            ('test_task_b', None),
-            ('test_task_c', 'success'),
-            ('test_task_on_execute', 'scheduled'),
-            ('test_task_on_success', 'scheduled'),
-        } == {(ti.task_id, ti.state) for ti in tis}
-        for state, start_date, end_date, duration in [
-            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
-        ]:
-            if state == 'success':
-                assert start_date is not None
-                assert end_date is not None
-                assert 0.0 == duration
-            else:
-                assert start_date is None
-                assert end_date is None
-                assert duration is None
-
-        self.scheduler_job._schedule_dag_run(dr, session)
-        with create_session() as session:
-            tis = session.query(TaskInstance).all()
-
-        assert 5 == len(tis)
-        assert {
-            ('test_task_a', 'success'),
-            ('test_task_b', 'success'),
-            ('test_task_c', 'success'),
-            ('test_task_on_execute', 'scheduled'),
-            ('test_task_on_success', 'scheduled'),
-        } == {(ti.task_id, ti.state) for ti in tis}
-        for state, start_date, end_date, duration in [
-            (ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
-        ]:
-            if state == 'success':
-                assert start_date is not None
-                assert end_date is not None
-                assert 0.0 == duration
-            else:
-                assert start_date is None
-                assert end_date is None
-                assert duration is None
-
-    @pytest.mark.need_serialized_dag
-    def test_catchup_works_correctly(self, dag_maker):
-        """Test that catchup works correctly"""
-        session = settings.Session()
-        with dag_maker(
-            dag_id='test_catchup_schedule_dag',
-            schedule_interval=timedelta(days=1),
-            start_date=DEFAULT_DATE,
-            catchup=True,
-            max_active_runs=1,
-            session=session,
-        ) as dag:
-            DummyOperator(task_id='dummy')
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.executor = MockExecutor()
-        self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
-
-        self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
-        self.scheduler_job._start_queued_dagruns(session)
-        # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
-        dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
-        ti = dr.get_task_instance(task_id='dummy')
-        ti.state = State.SUCCESS
-        session.merge(ti)
-        session.flush()
-
-        self.scheduler_job._schedule_dag_run(dr, session)
-        session.flush()
-
-        # Run the second time so _update_dag_next_dagrun will run
-        self.scheduler_job._schedule_dag_run(dr, session)
-        session.flush()
-
-        dag.catchup = False
-        dag.sync_to_db()
-        assert not dag.catchup
-
-        dm = DagModel.get_dagmodel(dag.dag_id)
-        self.scheduler_job._create_dag_runs([dm], session)
-
-        # Check catchup worked correctly by ensuring execution_date is quite 
new
-        # Our dag is a daily dag
-        assert (
-            session.query(DagRun.execution_date)
-            .filter(DagRun.execution_date != DEFAULT_DATE)  # exclude the 
first run
-            .scalar()
-        ) > (timezone.utcnow() - timedelta(days=2))

Reply via email to