This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d8c0cfe  Have the dag_maker fixture (optionally) give SerializedDAGs 
(#17577)
d8c0cfe is described below

commit d8c0cfea5ff679dc2de55220f8fc500fadef1093
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Aug 18 12:46:49 2021 +0100

    Have the dag_maker fixture (optionally) give SerializedDAGs (#17577)
    
    All but one test in test_scheduler_job.py wants to operate on serialized
    dags, so it makes sense to have this be done in the dag_maker for us, to
    make each test "smaller".
---
 pytest.ini                       |   2 +
 tests/conftest.py                | 113 +++++++++--
 tests/jobs/test_scheduler_job.py | 420 ++++++++-------------------------------
 3 files changed, 175 insertions(+), 360 deletions(-)

diff --git a/pytest.ini b/pytest.ini
index a7fd317..7f30753 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -32,3 +32,5 @@ faulthandler_timeout = 480
 log_level = INFO
 filterwarnings =
     error::pytest.PytestCollectionWarning
+markers =
+    need_serialized_dag
diff --git a/tests/conftest.py b/tests/conftest.py
index c7685d4..81d1d37 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -429,7 +429,7 @@ def app():
 @pytest.fixture
 def dag_maker(request):
     """
-    The dag_maker helps us to create DAG & DagModel automatically.
+    The dag_maker helps us to create DAG, DagModel, and SerializedDAG 
automatically.
 
     You have to use the dag_maker as a context manager and it takes
     the same argument as DAG::
@@ -451,49 +451,89 @@ def dag_maker(request):
 
     The dag_maker.create_dagrun takes the same arguments as dag.create_dagrun
 
+    If you want to operate on serialized DAGs, then either pass 
``serialized=True` to the ``dag_maker()``
+    call, or you can mark your test/class/file with 
``@pytest.mark.need_serialized_dag(True)``. In both of
+    these cases the ``dag`` returned by the context manager will be a 
lazily-evaluated proxy object to the
+    SerializedDAG.
     """
-    from airflow.models import DAG, DagModel
-    from airflow.utils import timezone
-    from airflow.utils.session import provide_session
-    from airflow.utils.state import State
+    import lazy_object_proxy
 
-    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+    # IMPORTANT: Delay _all_ imports from `airflow.*` to _inside a method_.
+    # This fixture is "called" early on in the pytest collection process, and
+    # if we import airflow.* here the wrong (non-test) config will be loaded
+    # and "baked" in to various constants
+
+    want_serialized = False
+
+    # Allow changing default serialized behaviour with 
`@ptest.mark.need_serialized_dag` or
+    # `@ptest.mark.need_serialized_dag(False)`
+    serialized_marker = request.node.get_closest_marker("need_serialized_dag")
+    if serialized_marker:
+        (want_serialized,) = serialized_marker.args or (True,)
 
     class DagFactory:
+        def __init__(self):
+            from airflow.models import DagBag
+
+            # Keep all the serialized dags we've created in this test
+            self.dagbag = DagBag(os.devnull, include_examples=False, 
read_dags_from_db=False)
+
         def __enter__(self):
             self.dag.__enter__()
+            if self.want_serialized:
+                return lazy_object_proxy.Proxy(self._serialized_dag)
             return self.dag
 
+        def _serialized_dag(self):
+            return self.serialized_model.dag
+
         def __exit__(self, type, value, traceback):
+            from airflow.models import DagModel
+            from airflow.models.serialized_dag import SerializedDagModel
+
             dag = self.dag
             dag.__exit__(type, value, traceback)
-            if type is None:
-                dag.clear()
-                self.dag_model = DagModel(
-                    dag_id=dag.dag_id,
-                    next_dagrun=dag.start_date,
-                    is_active=True,
-                    is_paused=False,
-                    max_active_tasks=dag.max_active_tasks,
-                    has_task_concurrency_limits=False,
-                )
-                self.session.add(self.dag_model)
+            if type is not None:
+                return
+
+            dag.clear()
+            dag.sync_to_db(self.session)
+            self.dag_model = self.session.query(DagModel).get(dag.dag_id)
+
+            if self.want_serialized:
+                self.serialized_model = SerializedDagModel(dag)
+                self.session.merge(self.serialized_model)
+                serialized_dag = self._serialized_dag()
+                self.dagbag.bag_dag(serialized_dag, root_dag=serialized_dag)
                 self.session.flush()
+            else:
+                self.dagbag.bag_dag(self.dag, self.dag)
 
         def create_dagrun(self, **kwargs):
+            from airflow.utils.state import State
+
             dag = self.dag
             defaults = dict(
                 run_id='test',
                 state=State.RUNNING,
                 execution_date=self.start_date,
                 start_date=self.start_date,
+                session=self.session,
             )
             kwargs = {**defaults, **kwargs}
             self.dag_run = dag.create_dagrun(**kwargs)
             return self.dag_run
 
-        @provide_session
-        def __call__(self, dag_id='test_dag', session=None, **kwargs):
+        def __call__(
+            self, dag_id='test_dag', serialized=want_serialized, fileloc=None, 
session=None, **kwargs
+        ):
+            from airflow import settings
+            from airflow.models import DAG
+            from airflow.utils import timezone
+
+            if session is None:
+                session = settings.Session()
+
             self.kwargs = kwargs
             self.session = session
             self.start_date = self.kwargs.get('start_date', None)
@@ -506,13 +546,44 @@ def dag_maker(request):
                 if hasattr(request.module, 'DEFAULT_DATE'):
                     self.start_date = getattr(request.module, 'DEFAULT_DATE')
                 else:
+                    DEFAULT_DATE = timezone.datetime(2016, 1, 1)
                     self.start_date = DEFAULT_DATE
             self.kwargs['start_date'] = self.start_date
             self.dag = DAG(dag_id, **self.kwargs)
-            self.dag.fileloc = request.module.__file__
+            self.dag.fileloc = fileloc or request.module.__file__
+            self.want_serialized = serialized
+
             return self
 
-    return DagFactory()
+        def cleanup(self):
+            from airflow.models import DagModel, DagRun, TaskInstance
+            from airflow.models.serialized_dag import SerializedDagModel
+
+            dag_ids = list(self.dagbag.dag_ids)
+            if not dag_ids:
+                return
+            # To isolate problems here with problems from elsewhere on the 
session object
+            self.session.flush()
+
+            
self.session.query(SerializedDagModel).filter(SerializedDagModel.dag_id.in_(dag_ids)).delete(
+                synchronize_session=False
+            )
+            
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(synchronize_session=False)
+            
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
+                synchronize_session=False
+            )
+            
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
+                synchronize_session=False
+            )
+            self.session.commit()
+
+    factory = DagFactory()
+
+    try:
+        yield factory
+    finally:
+        factory.cleanup()
+        del factory.session
 
 
 @pytest.fixture
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9030005..f8e8e53 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -96,6 +96,7 @@ def dagbag():
 
 
 @pytest.mark.usefixtures("disable_load_example")
[email protected]_serialized_dag
 class TestSchedulerJob:
     @staticmethod
     def clean_db():
@@ -189,12 +190,11 @@ class TestSchedulerJob:
         dag_id2 = "test_process_executor_events_2"
         task_id_1 = 'dummy_task'
 
-        with dag_maker(dag_id=dag_id, full_filepath="/test_path1/") as dag:
+        with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
             task1 = DummyOperator(task_id=task_id_1)
-        with dag_maker(dag_id=dag_id2, full_filepath="/test_path1/") as dag2:
+        with dag_maker(dag_id=dag_id2, fileloc='/test_path1/'):
             DummyOperator(task_id=task_id_1)
-        dag.fileloc = "/test_path1/"
-        dag2.fileloc = "/test_path1/"
+
         mock_stats_incr.reset_mock()
         executor = MockExecutor(do_update=False)
         task_callback = mock.MagicMock()
@@ -203,8 +203,6 @@ class TestSchedulerJob:
         self.scheduler_job.processor_agent = mock.MagicMock()
 
         session = settings.Session()
-        dag.sync_to_db(session=session)
-        dag2.sync_to_db(session=session)
 
         ti1 = TaskInstance(task1, DEFAULT_DATE)
         ti1.state = State.QUEUED
@@ -270,7 +268,7 @@ class TestSchedulerJob:
 
         with dag_maker(dag_id=dag_id) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        assert isinstance(dag, SerializedDAG)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -295,9 +293,8 @@ class TestSchedulerJob:
         dag_id = 
'SchedulerJobTest.test_execute_task_instances_no_dagrun_task_will_execute'
         task_id_1 = 'dummy_task'
 
-        with dag_maker(dag_id=dag_id) as dag:
+        with dag_maker(dag_id=dag_id):
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -321,9 +318,8 @@ class TestSchedulerJob:
         dag_id = 
'SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute'
         task_id_1 = 'dummy_task'
 
-        with dag_maker(dag_id=dag_id) as dag:
+        with dag_maker(dag_id=dag_id):
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -350,7 +346,6 @@ class TestSchedulerJob:
         task_id_1 = 'dummy'
         with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -390,7 +385,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
             task1 = DummyOperator(task_id=task_id_1, pool='a')
             task2 = DummyOperator(task_id=task_id_2, pool='b')
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -437,17 +431,14 @@ class TestSchedulerJob:
         dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
         dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
         task_id = 'task-a'
-        with dag_maker(dag_id=dag_id_1, max_active_tasks=16) as dag_1:
+        with dag_maker(dag_id=dag_id_1, max_active_tasks=16):
             dag1_task = DummyOperator(task_id=task_id)
         dr1 = dag_maker.create_dagrun(execution_date=DEFAULT_DATE + 
timedelta(hours=1))
 
-        with dag_maker(dag_id=dag_id_2, max_active_tasks=16) as dag_2:
+        with dag_maker(dag_id=dag_id_2, max_active_tasks=16):
             dag2_task = DummyOperator(task_id=task_id)
         dr2 = dag_maker.create_dagrun()
 
-        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
-        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
@@ -469,17 +460,14 @@ class TestSchedulerJob:
         dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
         dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
         task_id = 'task-a'
-        with dag_maker(dag_id=dag_id_1, max_active_tasks=16) as dag_1:
+        with dag_maker(dag_id=dag_id_1, max_active_tasks=16):
             dag1_task = DummyOperator(task_id=task_id, priority_weight=1)
         dr1 = dag_maker.create_dagrun()
 
-        with dag_maker(dag_id=dag_id_2, max_active_tasks=16) as dag_2:
+        with dag_maker(dag_id=dag_id_2, max_active_tasks=16):
             dag2_task = DummyOperator(task_id=task_id, priority_weight=4)
         dr2 = dag_maker.create_dagrun()
 
-        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
-        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
 
@@ -501,15 +489,13 @@ class TestSchedulerJob:
         dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
         dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
         task_id = 'task-a'
-        with dag_maker(dag_id=dag_id_1, max_active_tasks=16) as dag_1:
+        with dag_maker(dag_id=dag_id_1, max_active_tasks=16):
             dag1_task = DummyOperator(task_id=task_id, priority_weight=1)
         dr1 = dag_maker.create_dagrun()
 
-        with dag_maker(dag_id=dag_id_2, max_active_tasks=16) as dag_2:
+        with dag_maker(dag_id=dag_id_2, max_active_tasks=16):
             dag2_task = DummyOperator(task_id=task_id, priority_weight=4)
         dr2 = dag_maker.create_dagrun(execution_date=DEFAULT_DATE + 
timedelta(hours=1))
-        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
-        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -535,7 +521,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id) as dag:
             op1 = DummyOperator(task_id='dummy1')
             op2 = DummyOperator(task_id='dummy2')
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         executor = MockExecutor(do_update=True)
         self.scheduler_job = SchedulerJob(executor=executor)
@@ -575,9 +560,8 @@ class TestSchedulerJob:
     def test_nonexistent_pool(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_nonexistent_pool'
         task_id = 'dummy_wrong_pool'
-        with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
+        with dag_maker(dag_id=dag_id, max_active_tasks=16):
             task = DummyOperator(task_id=task_id, 
pool="this_pool_doesnt_exist")
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -597,9 +581,8 @@ class TestSchedulerJob:
     def test_infinite_pool(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_infinite_pool'
         task_id = 'dummy'
-        with dag_maker(dag_id=dag_id, concurrency=16) as dag:
+        with dag_maker(dag_id=dag_id, concurrency=16):
             task = DummyOperator(task_id=task_id, pool="infinite_pool")
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -620,9 +603,8 @@ class TestSchedulerJob:
     def test_find_executable_task_instances_none(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
         task_id_1 = 'dummy'
-        with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
+        with dag_maker(dag_id=dag_id, max_active_tasks=16):
             DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -635,7 +617,6 @@ class TestSchedulerJob:
         task_id_1 = 'dummy'
         with dag_maker(dag_id=dag_id, max_active_tasks=2) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -681,11 +662,10 @@ class TestSchedulerJob:
 
     def test_find_executable_task_instances_concurrency_queued(self, 
dag_maker):
         dag_id = 
'SchedulerJobTest.test_find_executable_task_instances_concurrency_queued'
-        with dag_maker(dag_id=dag_id, max_active_tasks=3) as dag:
+        with dag_maker(dag_id=dag_id, max_active_tasks=3):
             task1 = DummyOperator(task_id='dummy1')
             task2 = DummyOperator(task_id='dummy2')
             task3 = DummyOperator(task_id='dummy3')
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -724,9 +704,6 @@ class TestSchedulerJob:
         self.scheduler_job = SchedulerJob(executor=executor)
         session = settings.Session()
 
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-        self.scheduler_job.dagbag.sync_to_db(session=session)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
         dr1 = dag_maker.create_dagrun()
         dr2 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -806,7 +783,6 @@ class TestSchedulerJob:
         task_id_1 = 'dummy'
         with dag_maker(dag_id=dag_id, max_active_tasks=2) as dag:
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -846,9 +822,8 @@ class TestSchedulerJob:
     def test_enqueue_task_instances_with_queued_state(self, dag_maker):
         dag_id = 
'SchedulerJobTest.test_enqueue_task_instances_with_queued_state'
         task_id_1 = 'dummy'
-        with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE) as dag:
+        with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE):
             task1 = DummyOperator(task_id=task_id_1)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -877,7 +852,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id, max_active_tasks=3) as dag:
             task1 = DummyOperator(task_id=task_id_1)
             task2 = DummyOperator(task_id=task_id_2)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -944,7 +918,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id, max_active_tasks=16) as dag:
             task1 = DummyOperator(task_id=task_id_1)
             task2 = DummyOperator(task_id=task_id_2)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -998,7 +971,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id, max_active_tasks=1024) as dag:
             task1 = DummyOperator(task_id=task_id_1)
             task2 = DummyOperator(task_id=task_id_2)
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         session = settings.Session()
@@ -1032,12 +1004,12 @@ class TestSchedulerJob:
         session.rollback()
 
     def test_change_state_for_tis_without_dagrun(self, dag_maker):
-        with dag_maker(dag_id='test_change_state_for_tis_without_dagrun') as 
dag1:
+        with dag_maker(dag_id='test_change_state_for_tis_without_dagrun'):
             DummyOperator(task_id='dummy')
             DummyOperator(task_id='dummy_b')
         dr1 = dag_maker.create_dagrun()
 
-        with 
dag_maker(dag_id='test_change_state_for_tis_without_dagrun_dont_change') as 
dag2:
+        with 
dag_maker(dag_id='test_change_state_for_tis_without_dagrun_dont_change'):
             DummyOperator(task_id='dummy')
         dr2 = dag_maker.create_dagrun()
 
@@ -1062,13 +1034,6 @@ class TestSchedulerJob:
         session.merge(ti3)
         session.commit()
 
-        dagbag = DagBag("/dev/null", include_examples=False, 
read_dags_from_db=False)
-        dagbag.bag_dag(dag1, root_dag=dag1)
-        dagbag.bag_dag(dag2, root_dag=dag2)
-        dagbag.bag_dag(dag3, root_dag=dag3)
-        dagbag.sync_to_db(session)
-        session.commit()
-
         self.scheduler_job = SchedulerJob(num_runs=0)
         self.scheduler_job.dagbag.collect_dags_from_db()
 
@@ -1122,7 +1087,6 @@ class TestSchedulerJob:
         session = settings.Session()
         with dag_maker('test_execute_helper_reset_orphaned_tasks') as dag:
             op1 = DummyOperator(task_id='op1')
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
 
         dr = dag_maker.create_dagrun()
         dr2 = dag.create_dagrun(
@@ -1168,15 +1132,9 @@ class TestSchedulerJob:
         with dag_maker(
             dag_id,
             start_date=DEFAULT_DATE + timedelta(days=1),
-        ) as dag:
+        ):
             op1 = DummyOperator(task_id='op1')
 
-        # Write Dag to DB
-        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
-        dagbag.bag_dag(dag, root_dag=dag)
-        dagbag.sync_to_db()
-
-        dag = DagBag(read_dags_from_db=True, 
include_examples=False).get_dag(dag_id)
         # Create DAG run with FAILED state
         dr = dag_maker.create_dagrun(
             state=State.FAILED,
@@ -1190,7 +1148,7 @@ class TestSchedulerJob:
         # This poll interval is large, bug the scheduler doesn't sleep that
         # long, instead we hit the clean_tis_without_dagrun interval instead
         self.scheduler_job = SchedulerJob(num_runs=2, 
processor_poll_interval=30)
-        self.scheduler_job.dagbag = dagbag
+        self.scheduler_job.dagbag = dag_maker.dagbag
         executor = MockExecutor(do_update=False)
         executor.queued_tasks
         self.scheduler_job.executor = executor
@@ -1253,22 +1211,18 @@ class TestSchedulerJob:
         with dag_maker(
             dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout',
             start_date=DEFAULT_DATE,
+            max_active_runs=1,
+            dagrun_timeout=datetime.timedelta(seconds=60),
         ) as dag:
             DummyOperator(task_id='dummy')
-        dag.max_active_runs = 1
-        dag.dagrun_timeout = datetime.timedelta(seconds=60)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-
-        self.scheduler_job.dagbag.sync_to_db()
+        self.scheduler_job.dagbag = dag_maker.dagbag
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
         assert orm_dag is not None
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         self.scheduler_job._create_dag_runs([orm_dag], session)
         self.scheduler_job._start_queued_dagruns(session)
 
@@ -1321,30 +1275,18 @@ class TestSchedulerJob:
         """
         Test if a a dagrun will be set failed if timeout, even without 
max_active_runs
         """
-        with dag_maker(dag_id='test_scheduler_fail_dagrun_timeout') as dag:
-            DummyOperator(task_id='dummy')
-        dag.dagrun_timeout = datetime.timedelta(seconds=60)
-
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-        self.scheduler_job.dagbag.sync_to_db()
-
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
-        assert orm_dag is not None
+        with dag_maker(
+            dag_id='test_scheduler_fail_dagrun_timeout',
+            dagrun_timeout=datetime.timedelta(seconds=60),
+            session=session,
+        ):
+            DummyOperator(task_id='dummy')
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - 
datetime.timedelta(days=1))
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
-        # Should be scheduled as dagrun_timeout has passed
-        dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
-        session.flush()
+        self.scheduler_job.dagbag = dag_maker.dagbag
 
         # Mock that processor_agent is started
         self.scheduler_job.processor_agent = mock.Mock()
@@ -1386,25 +1328,13 @@ class TestSchedulerJob:
             DummyOperator(task_id='dummy')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag = dag_maker.dagbag
         self.scheduler_job.processor_agent = mock.Mock()
         self.scheduler_job.processor_agent.send_callback_to_execute = 
mock.Mock()
         self.scheduler_job._send_sla_callbacks_to_processor = mock.Mock()
 
-        # Sync DAG into DB
-        with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-            self.scheduler_job.dagbag.sync_to_db()
-
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
-        assert orm_dag is not None
-
-        # Create DagRun
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
+        dr = dag_maker.create_dagrun()
 
         ti = dr.get_task_instance('dummy')
         ti.set_state(state, session)
@@ -1424,7 +1354,9 @@ class TestSchedulerJob:
         
self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once_with(expected_callback)
         # This is already tested separately
         # In this test we just want to verify that this function is called
-        
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(dag)
+        # `dag` is a lazy-object-proxy -- we need to resolve it
+        real_dag = self.scheduler_job.dagbag.get_dag(dag.dag_id)
+        
self.scheduler_job._send_sla_callbacks_to_processor.assert_called_once_with(real_dag)
 
         session.rollback()
         session.close()
@@ -1434,7 +1366,7 @@ class TestSchedulerJob:
         Tests that before any callbacks are sent to the processor, the session 
is committed. This ensures
         that the dagrun details are up to date when the callbacks are run.
         """
-        with dag_maker(dag_id='test_dagrun_callbacks_commited_before_sent') as 
dag:
+        with dag_maker(dag_id='test_dagrun_callbacks_commited_before_sent'):
             DummyOperator(task_id='dummy')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -1442,21 +1374,8 @@ class TestSchedulerJob:
         self.scheduler_job._send_dag_callbacks_to_processor = mock.Mock()
         self.scheduler_job._schedule_dag_run = mock.Mock()
 
-        # Sync DAG into DB
-        with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-            self.scheduler_job.dagbag.sync_to_db()
-
+        dr = dag_maker.create_dagrun()
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
-        assert orm_dag is not None
-
-        # Create DagRun
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
 
         ti = dr.get_task_instance('dummy')
         ti.set_state(State.SUCCESS, session)
@@ -1494,7 +1413,7 @@ class TestSchedulerJob:
         """
         with dag_maker(
             
dag_id='test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined',
-        ) as dag:
+        ):
             BashOperator(task_id='test_task', bash_command='echo hi')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -1502,22 +1421,8 @@ class TestSchedulerJob:
         self.scheduler_job.processor_agent.send_callback_to_execute = 
mock.Mock()
         self.scheduler_job._send_dag_callbacks_to_processor = mock.Mock()
 
-        # Sync DAG into DB
-        with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-            self.scheduler_job.dagbag.sync_to_db()
-
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
-        assert orm_dag is not None
-
-        # Create DagRun
-        self.scheduler_job._create_dag_runs([orm_dag], session)
-
-        drs = DagRun.find(dag_id=dag.dag_id, session=session)
-        assert len(drs) == 1
-        dr = drs[0]
-
+        dr = dag_maker.create_dagrun()
         ti = dr.get_task_instance('test_task')
         ti.set_state(state, session)
 
@@ -1540,8 +1445,6 @@ class TestSchedulerJob:
 
         session = settings.Session()
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         dr = dag_maker.create_dagrun()
         assert dr is not None
 
@@ -1551,7 +1454,7 @@ class TestSchedulerJob:
         with dag_maker(
             dag_id='test_scheduler_do_not_schedule_removed_task',
             start_date=dag.following_schedule(DEFAULT_DATE),
-        ) as dag:
+        ):
             pass
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -1879,22 +1782,11 @@ class TestSchedulerJob:
                 bash_command='echo hi',
             )
 
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-
         session = settings.Session()
         pool = Pool(pool='test_scheduler_verify_pool_full', slots=1)
         session.add(pool)
         session.flush()
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-        SerializedDagModel.write_dag(dag)
-
         self.scheduler_job = SchedulerJob(executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
 
@@ -1929,21 +1821,10 @@ class TestSchedulerJob:
                 bash_command='echo hi',
             )
 
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-
         session = settings.Session()
         pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', 
slots=6)
         session.add(pool)
-        session.commit()
-        SerializedDagModel.write_dag(dag)
-
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        session.flush()
 
         self.scheduler_job = SchedulerJob(executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
@@ -1984,23 +1865,13 @@ class TestSchedulerJob:
                 pool='test_scheduler_keeps_scheduling_pool_full_p2',
                 bash_command='echo hi',
             )
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
-        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
-        dagbag.sync_to_db()
 
         session = settings.Session()
         pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', 
slots=1)
         pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', 
slots=10)
         session.add(pool_p1)
         session.add(pool_p2)
-        session.commit()
-
-        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+        session.flush()
 
         scheduler = SchedulerJob(executor=self.null_exec)
         scheduler.processor_agent = mock.MagicMock()
@@ -2045,7 +1916,7 @@ class TestSchedulerJob:
 
         Though tasks with lower priority might be executed.
         """
-        with dag_maker(dag_id='test_scheduler_verify_priority_and_slots') as 
dag:
+        with dag_maker(dag_id='test_scheduler_verify_priority_and_slots'):
             # Medium priority, not enough slots
             BashOperator(
                 task_id='test_scheduler_verify_priority_and_slots_t0',
@@ -2071,21 +1942,10 @@ class TestSchedulerJob:
                 bash_command='echo hi',
             )
 
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-
         session = settings.Session()
         pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2)
         session.add(pool)
-        session.commit()
-
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-        SerializedDagModel.write_dag(dag)
+        session.flush()
 
         self.scheduler_job = SchedulerJob(executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
@@ -2132,11 +1992,9 @@ class TestSchedulerJob:
             BashOperator(task_id='dummy', bash_command='echo hi')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
+        orm_dag = dag_maker.dag_model
         assert orm_dag is not None
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -2184,11 +2042,9 @@ class TestSchedulerJob:
             BashOperator(task_id='dummy', bash_command='echo hi')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-        self.scheduler_job.dagbag.sync_to_db()
 
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
+        orm_dag = dag_maker.dag_model
         assert orm_dag is not None
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
@@ -2255,16 +2111,12 @@ class TestSchedulerJob:
                 bash_command='exit 1',
                 retries=1,
             )
-        dag.is_subdag = False
 
         with create_session() as session:
             orm_dag = DagModel(dag_id=dag.dag_id)
             orm_dag.is_paused = False
             session.merge(orm_dag)
 
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-
         @mock.patch('airflow.dag_processing.processor.DagBag', 
return_value=dagbag)
         def do_schedule(mock_dagbag):
             # Use a empty file since the above mock will return the
@@ -2628,9 +2480,6 @@ class TestSchedulerJob:
         with dag_maker(dag_id=dag_id, schedule_interval='@daily') as dag:
             DummyOperator(task_id='task1', sla=timedelta(seconds=60))
 
-        # Used Serialized DAG as Serialized DAG is used in Scheduler
-        dag = SerializedDAG.from_json(SerializedDAG.to_json(dag))
-
         with patch.object(settings, "CHECK_SLAS", True):
             self.scheduler_job = SchedulerJob(subdir=os.devnull)
             mock_agent = mock.MagicMock()
@@ -2653,14 +2502,7 @@ class TestSchedulerJob:
         with dag_maker(dag_id='test_create_dag_runs') as dag:
             DummyOperator(task_id='dummy')
 
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-        dag_model = DagModel.get_dagmodel(dag.dag_id)
+        dag_model = dag_maker.dag_model
 
         self.scheduler_job = SchedulerJob(executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
@@ -2689,14 +2531,7 @@ class TestSchedulerJob:
                 task_id='dummy',
             )
 
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-        dag_model = DagModel.get_dagmodel(dag.dag_id)
+        dag_model = dag_maker.dag_model
 
         self.scheduler_job = SchedulerJob(executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
@@ -2726,17 +2561,7 @@ class TestSchedulerJob:
             # This CustomOperator has Extra Operator Links registered via 
plugins
             _ = CustomOperator(task_id='custom_task')
 
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db()
-
-        # Get serialized dag
-        s_dag_1 = dagbag.get_dag(dag.dag_id)
-        custom_task = s_dag_1.task_dict['custom_task']
+        custom_task = dag.task_dict['custom_task']
         # Test that custom_task has >= 1 Operator Links (after 
de-serialization)
         assert custom_task.operator_extra_links
 
@@ -2755,21 +2580,11 @@ class TestSchedulerJob:
         Test that scheduler._create_dag_runs does not raise an error when the 
DAG does not exist
         in serialized_dag table
         """
-        with 
dag_maker(dag_id='test_scheduler_create_dag_runs_does_not_raise_error') as dag:
+        with 
dag_maker(dag_id='test_scheduler_create_dag_runs_does_not_raise_error', 
serialized=False):
             DummyOperator(
                 task_id='dummy',
             )
 
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=False,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        # Only write to dag table and not serialized_dag table
-        DAG.bulk_write_to_db(dagbag.dags.values())
-        dag_model = DagModel.get_dagmodel(dag.dag_id)
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull, 
executor=self.null_exec)
         self.scheduler_job.processor_agent = mock.MagicMock()
 
@@ -2779,7 +2594,7 @@ class TestSchedulerJob:
             'ERROR',
             logger='airflow.jobs.scheduler_job',
         ):
-            self.scheduler_job._create_dag_runs([dag_model], session)
+            self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
             assert caplog.messages == [
                 "DAG 'test_scheduler_create_dag_runs_does_not_raise_error' not 
found in serialized_dag table",
             ]
@@ -2798,18 +2613,9 @@ class TestSchedulerJob:
             DummyOperator(task_id='dummy')
 
         session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        # Write to dag and serialized_dag table
-        dagbag.sync_to_db(session)
-        dag = dagbag.get_dag(dag.dag_id)
 
         # Verify that dag_model.next_dagrun is equal to next execution_date
-        dag_model = session.query(DagModel).get(dag.dag_id)
+        dag_model = dag_maker.dag_model
         assert dag_model.next_dagrun == DEFAULT_DATE
         assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE
         assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(minutes=1)
@@ -2870,22 +2676,11 @@ class TestSchedulerJob:
         session = settings.Session()
         assert dag.get_last_dagrun(session) is None
 
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=False,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-
-        # Create DagModel
-        DAG.bulk_write_to_db(dagbag.dags.values())
-        dag_model = DagModel.get_dagmodel(dag.dag_id)
+        dag_model = dag_maker.dag_model
 
         # Assert dag_model.next_dagrun is set correctly
         assert dag_model.next_dagrun == DEFAULT_DATE
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         dagrun = dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dag_model.next_dagrun,
@@ -2900,7 +2695,6 @@ class TestSchedulerJob:
         assert dag.get_last_dagrun(session) == dagrun
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull, 
executor=self.null_exec)
-        self.scheduler_job.dagbag = dagbag
         self.scheduler_job.processor_agent = mock.MagicMock()
 
         # Test that this does not raise any error
@@ -2929,14 +2723,6 @@ class TestSchedulerJob:
             )
 
         session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
 
         run1 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -2988,15 +2774,6 @@ class TestSchedulerJob:
             task1 = BashOperator(task_id='dummy1', bash_command='true')
 
         session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
-
         session.add(TaskInstance(task1, DEFAULT_DATE, State.REMOVED))
         session.flush()
 
@@ -3039,15 +2816,6 @@ class TestSchedulerJob:
             BashOperator(task_id='dummy3', bash_command='true')
 
         session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
-
         dag_run = dag_maker.create_dagrun(
             state=State.QUEUED,
             session=session,
@@ -3106,14 +2874,11 @@ class TestSchedulerJob:
         Test if _process_task_instances puts the right task instances into the
         mock_list.
         """
-        with dag_maker(dag_id='test_scheduler_process_execute_task') as dag:
+        with dag_maker(dag_id='test_scheduler_process_execute_task'):
             BashOperator(task_id='dummy', bash_command='echo hi')
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
         dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
         assert dr is not None
@@ -3153,14 +2918,11 @@ class TestSchedulerJob:
         Test if _process_task_instances puts the right task instances into the
         mock_list.
         """
-        with 
dag_maker(dag_id='test_scheduler_process_execute_task_with_task_concurrency') 
as dag:
+        with 
dag_maker(dag_id='test_scheduler_process_execute_task_with_task_concurrency'):
             BashOperator(task_id='dummy', task_concurrency=2, 
bash_command='echo Hi')
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
         dr = dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
@@ -3207,15 +2969,12 @@ class TestSchedulerJob:
             default_args={
                 'depends_on_past': True,
             },
-        ) as dag:
+        ):
             BashOperator(task_id='dummy1', bash_command='echo hi')
             BashOperator(task_id='dummy2', bash_command='echo hi')
 
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
         dr = dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
         )
@@ -3244,14 +3003,10 @@ class TestSchedulerJob:
             BashOperator(task_id='dummy', bash_command='echo test')
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
-
-        # Since we don't want to store the code for the DAG defined in this 
file
-        with mock.patch.object(settings, "STORE_DAG_CODE", False):
-            self.scheduler_job.dagbag.sync_to_db()
+        self.scheduler_job.dagbag = dag_maker.dagbag
 
         session = settings.Session()
-        orm_dag = session.query(DagModel).get(dag.dag_id)
+        orm_dag = dag_maker.dag_model
         assert orm_dag is not None
 
         if self.scheduler_job.processor_agent:
@@ -3286,22 +3041,13 @@ class TestSchedulerJob:
         """
         Test dag after dag.clear, max_active_runs is respected
         """
-        with 
dag_maker(dag_id='test_scheduler_max_active_runs_respected_after_clear') as dag:
+        with dag_maker(
+            dag_id='test_scheduler_max_active_runs_respected_after_clear', 
max_active_runs=1
+        ) as dag:
             BashOperator(task_id='dummy', bash_command='echo Hi')
-        dag.max_active_runs = 1
-
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
-        # Write Dag to DB
-        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
-        dagbag.bag_dag(dag, root_dag=dag)
-        dagbag.sync_to_db()
-
-        dag = DagBag(read_dags_from_db=True, 
include_examples=False).get_dag(dag.dag_id)
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.processor_agent = mock.MagicMock()
-        self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
 
         session = settings.Session()
         date = DEFAULT_DATE
@@ -3327,47 +3073,43 @@ class TestSchedulerJob:
 
         session = settings.Session()
         self.scheduler_job._start_queued_dagruns(session)
-        session.commit()
+        session.flush()
         # Assert that only 1 dagrun is active
         assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)) == 1
         # Assert that the other two are queued
         assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, 
session=session)) == 2
 
-    def test_timeout_triggers(self):
+    def test_timeout_triggers(self, dag_maker):
         """
         Tests that tasks in the deferred state, but whose trigger timeout
         has expired, are correctly failed.
 
         """
 
+        session = settings.Session()
         # Create the test DAG and task
-        with DAG(
+        with dag_maker(
             dag_id='test_timeout_triggers',
             start_date=DEFAULT_DATE,
             schedule_interval='@once',
             max_active_runs=1,
-        ) as dag:
-            task1 = DummyOperator(task_id='dummy1')
-
-        # Load it into the DagBag
-        session = settings.Session()
-        dagbag = DagBag(
-            dag_folder=os.devnull,
-            include_examples=False,
-            read_dags_from_db=True,
-        )
-        dagbag.bag_dag(dag=dag, root_dag=dag)
-        dagbag.sync_to_db(session=session)
+            session=session,
+        ):
+            DummyOperator(task_id='dummy1')
 
         # Create a Task Instance for the task that is allegedly deferred
         # but past its timeout, and one that is still good.
         # We don't actually need a linked trigger here; the code doesn't check.
-        ti1 = TaskInstance(task1, DEFAULT_DATE, State.DEFERRED)
-        ti2 = TaskInstance(task1, DEFAULT_DATE + 
datetime.timedelta(seconds=1), State.DEFERRED)
+        dr1 = dag_maker.create_dagrun()
+        dr2 = dag_maker.create_dagrun(
+            run_id="test2", execution_date=DEFAULT_DATE + 
datetime.timedelta(seconds=1)
+        )
+        ti1 = dr1.get_task_instance('dummy1', session)
+        ti2 = dr2.get_task_instance('dummy1', session)
+        ti1.state = State.DEFERRED
         ti1.trigger_timeout = timezone.utcnow() - 
datetime.timedelta(seconds=60)
+        ti2.state = State.DEFERRED
         ti2.trigger_timeout = timezone.utcnow() + 
datetime.timedelta(seconds=60)
-        session.add(ti1)
-        session.add(ti2)
         session.flush()
 
         # Boot up the scheduler and make it check timeouts
@@ -3375,8 +3117,8 @@ class TestSchedulerJob:
         self.scheduler_job.check_trigger_timeouts(session=session)
 
         # Make sure that TI1 is now scheduled to fail, and 2 wasn't touched
-        ti1.refresh_from_db()
-        ti2.refresh_from_db()
+        session.refresh(ti1)
+        session.refresh(ti2)
         assert ti1.state == State.SCHEDULED
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED

Reply via email to