This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b810b89c3 Fix TriggerDagRunOperator Tests for Database Isolation 
Tests (#41298)
6b810b89c3 is described below

commit 6b810b89c3f63dd2d2cf107c568be40ba9da0ba2
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Aug 9 20:54:19 2024 +0200

    Fix TriggerDagRunOperator Tests for Database Isolation Tests (#41298)
    
    * Attempt to fix TriggerDagRunOperator for Database Isolation Tests
    
    * Finalize making tests run for triggerdagrunoperator in db isolation mode
    
    * Adjust query count assert for adjustments to serialization
    
    * Review feedback
---
 airflow/api/common/trigger_dag.py                  |   8 +
 airflow/api_internal/endpoints/rpc_api_endpoint.py |   2 +
 airflow/exceptions.py                              |  22 +
 airflow/models/dag.py                              |   4 +
 airflow/operators/trigger_dagrun.py                |  14 +
 airflow/serialization/serialized_objects.py        |   7 +-
 tests/models/test_dag.py                           |   2 +-
 tests/operators/test_trigger_dagrun.py             | 675 ++++++++++++---------
 8 files changed, 451 insertions(+), 283 deletions(-)

diff --git a/airflow/api/common/trigger_dag.py 
b/airflow/api/common/trigger_dag.py
index 86513f7833..f22755ec64 100644
--- a/airflow/api/common/trigger_dag.py
+++ b/airflow/api/common/trigger_dag.py
@@ -22,15 +22,19 @@ from __future__ import annotations
 import json
 from typing import TYPE_CHECKING
 
+from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.exceptions import DagNotFound, DagRunAlreadyExists
 from airflow.models import DagBag, DagModel, DagRun
 from airflow.utils import timezone
+from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunType
 
 if TYPE_CHECKING:
     from datetime import datetime
 
+    from sqlalchemy.orm.session import Session
+
 
 def _trigger_dag(
     dag_id: str,
@@ -103,12 +107,15 @@ def _trigger_dag(
     return dag_runs
 
 
+@internal_api_call
+@provide_session
 def trigger_dag(
     dag_id: str,
     run_id: str | None = None,
     conf: dict | str | None = None,
     execution_date: datetime | None = None,
     replace_microseconds: bool = True,
+    session: Session = NEW_SESSION,
 ) -> DagRun | None:
     """
     Triggers execution of DAG specified by dag_id.
@@ -118,6 +125,7 @@ def trigger_dag(
     :param conf: configuration
     :param execution_date: date of execution
     :param replace_microseconds: whether microseconds should be zeroed
+    :param session: Unused. Only added in compatibility with database 
isolation mode
     :return: first dag run triggered - even if more than one Dag Runs were 
triggered or None
     """
     dag_model = DagModel.get_current(dag_id)
diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py 
b/airflow/api_internal/endpoints/rpc_api_endpoint.py
index be4699fa6c..ad65157ef9 100644
--- a/airflow/api_internal/endpoints/rpc_api_endpoint.py
+++ b/airflow/api_internal/endpoints/rpc_api_endpoint.py
@@ -53,6 +53,7 @@ log = logging.getLogger(__name__)
 
 @functools.lru_cache
 def initialize_method_map() -> dict[str, Callable]:
+    from airflow.api.common.trigger_dag import trigger_dag
     from airflow.cli.commands.task_command import _get_ti_db_access
     from airflow.dag_processing.manager import DagFileProcessorManager
     from airflow.dag_processing.processor import DagFileProcessor
@@ -92,6 +93,7 @@ def initialize_method_map() -> dict[str, Callable]:
         _add_log,
         _xcom_pull,
         _record_task_map_for_downstreams,
+        trigger_dag,
         DagCode.remove_deleted_code,
         DagModel.deactivate_deleted_dags,
         DagModel.get_paused_dag_ids,
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 40a62ad208..3831d909fc 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -239,6 +239,28 @@ class DagRunAlreadyExists(AirflowBadRequest):
             f"A DAG Run already exists for DAG {dag_run.dag_id} at 
{execution_date} with run id {run_id}"
         )
         self.dag_run = dag_run
+        self.execution_date = execution_date
+        self.run_id = run_id
+
+    def serialize(self):
+        cls = self.__class__
+        # Note the DagRun object will be detached here and fails 
serialization, we need to create a new one
+        from airflow.models import DagRun
+
+        dag_run = DagRun(
+            state=self.dag_run.state,
+            dag_id=self.dag_run.dag_id,
+            run_id=self.dag_run.run_id,
+            external_trigger=self.dag_run.external_trigger,
+            run_type=self.dag_run.run_type,
+            execution_date=self.dag_run.execution_date,
+        )
+        dag_run.id = self.dag_run.id
+        return (
+            f"{cls.__module__}.{cls.__name__}",
+            (),
+            {"dag_run": dag_run, "execution_date": self.execution_date, 
"run_id": self.run_id},
+        )
 
 
 class DagFileExists(AirflowBadRequest):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c9380494a0..1c9d351c1d 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -115,6 +115,7 @@ from airflow.models.taskinstance import (
     TaskInstanceKey,
     clear_task_instances,
 )
+from airflow.models.tasklog import LogTemplate
 from airflow.secrets.local_filesystem import LocalFilesystemBackend
 from airflow.security import permissions
 from airflow.settings import json
@@ -338,6 +339,9 @@ def _create_orm_dagrun(
         creating_job_id=creating_job_id,
         data_interval=data_interval,
     )
+    # Load defaults into the following two fields to ensure result can be 
serialized detached
+    run.log_template_id = 
int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))
+    run.consumed_dataset_events = []
     session.add(run)
     session.flush()
     run.dag = dag
diff --git a/airflow/operators/trigger_dagrun.py 
b/airflow/operators/trigger_dagrun.py
index 35d387738a..2521297dcf 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -27,6 +27,7 @@ from sqlalchemy import select
 from sqlalchemy.orm.exc import NoResultFound
 
 from airflow.api.common.trigger_dag import trigger_dag
+from airflow.api_internal.internal_api_call import InternalApiConfig
 from airflow.configuration import conf
 from airflow.exceptions import (
     AirflowException,
@@ -83,6 +84,8 @@ class TriggerDagRunOperator(BaseOperator):
     """
     Triggers a DAG run for a specified DAG ID.
 
+    Note that if database isolation mode is enabled, not all features are 
supported.
+
     :param trigger_dag_id: The ``dag_id`` of the DAG to trigger (templated).
     :param trigger_run_id: The run ID to use for the triggered DAG run 
(templated).
         If not provided, a run ID will be automatically generated.
@@ -174,6 +177,14 @@ class TriggerDagRunOperator(BaseOperator):
         self.logical_date = logical_date
 
     def execute(self, context: Context):
+        if InternalApiConfig.get_use_internal_api():
+            if self.reset_dag_run:
+                raise AirflowException("Parameter reset_dag_run=True is broken 
with Database Isolation Mode.")
+            if self.wait_for_completion:
+                raise AirflowException(
+                    "Parameter wait_for_completion=True is broken with 
Database Isolation Mode."
+                )
+
         if isinstance(self.logical_date, datetime.datetime):
             parsed_logical_date = self.logical_date
         elif isinstance(self.logical_date, str):
@@ -210,6 +221,7 @@ class TriggerDagRunOperator(BaseOperator):
                 if dag_model is None:
                     raise DagNotFound(f"Dag id {self.trigger_dag_id} not found 
in DagModel")
 
+                # Note: here execution fails on database isolation mode. Needs 
structural changes for AIP-72
                 dag_bag = DagBag(dag_folder=dag_model.fileloc, 
read_dags_from_db=True)
                 dag = dag_bag.get_dag(self.trigger_dag_id)
                 dag.clear(start_date=dag_run.logical_date, 
end_date=dag_run.logical_date)
@@ -250,6 +262,7 @@ class TriggerDagRunOperator(BaseOperator):
                 )
                 time.sleep(self.poke_interval)
 
+                # Note: here execution fails on database isolation mode. Needs 
structural changes for AIP-72
                 dag_run.refresh_from_db()
                 state = dag_run.state
                 if state in self.failed_states:
@@ -263,6 +276,7 @@ class TriggerDagRunOperator(BaseOperator):
         # This logical_date is parsed from the return trigger event
         provided_logical_date = event[1]["execution_dates"][0]
         try:
+            # Note: here execution fails on database isolation mode. Needs 
structural changes for AIP-72
             dag_run = session.execute(
                 select(DagRun).where(
                     DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == provided_logical_date
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 94631c993c..d110271c3d 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1447,7 +1447,12 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
 
     @classmethod
     def _is_excluded(cls, var: Any, attrname: str, op: DAGNode):
-        if var is not None and op.has_dag() and attrname.endswith("_date"):
+        if (
+            var is not None
+            and op.has_dag()
+            and op.dag.__class__ is not AttributeRemoved
+            and attrname.endswith("_date")
+        ):
             # If this date is the same as the matching field in the dag, then
             # don't store it again at the task level.
             dag_date = getattr(op.dag, attrname, None)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 376d5c5beb..3d39a7290d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3293,7 +3293,7 @@ class TestQueries:
         dag = DAG("test_dagrun_query_count", start_date=DEFAULT_DATE)
         for i in range(tasks_count):
             EmptyOperator(task_id=f"dummy_task_{i}", owner="test", dag=dag)
-        with assert_queries_count(2):
+        with assert_queries_count(3):
             dag.create_dagrun(
                 run_id="test_dagrun_query_count",
                 state=State.RUNNING,
diff --git a/tests/operators/test_trigger_dagrun.py 
b/tests/operators/test_trigger_dagrun.py
index 341b34fe46..349bba4638 100644
--- a/tests/operators/test_trigger_dagrun.py
+++ b/tests/operators/test_trigger_dagrun.py
@@ -17,7 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import pathlib
 import tempfile
 from datetime import datetime
 from unittest import mock
@@ -26,13 +25,14 @@ import pendulum
 import pytest
 
 from airflow.exceptions import AirflowException, DagRunAlreadyExists, 
RemovedInAirflow3Warning, TaskDeferred
-from airflow.models.dag import DAG, DagModel
+from airflow.models.dag import DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun
 from airflow.models.log import Log
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.settings import TracebackSessionForTests
 from airflow.triggers.external_task import DagStateTrigger
 from airflow.utils import timezone
 from airflow.utils.session import create_session
@@ -67,15 +67,18 @@ class TestDagRunOperator:
             self._tmpfile = f.name
             f.write(DAG_SCRIPT)
             f.flush()
+        self.f_name = f.name
 
         with create_session() as session:
             session.add(DagModel(dag_id=TRIGGERED_DAG_ID, 
fileloc=self._tmpfile))
             session.commit()
 
-        self.dag = DAG(TEST_DAG_ID, default_args={"owner": "airflow", 
"start_date": DEFAULT_DATE})
-        dagbag = DagBag(f.name, read_dags_from_db=False, 
include_examples=False)
-        dagbag.bag_dag(self.dag, root_dag=self.dag)
-        dagbag.sync_to_db()
+    def re_sync_triggered_dag_to_db(self, dag, dag_maker):
+        TracebackSessionForTests.set_allow_db_access(dag_maker.session, True)
+        dagbag = DagBag(self.f_name, read_dags_from_db=False, 
include_examples=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db(session=dag_maker.session)
+        TracebackSessionForTests.set_allow_db_access(dag_maker.session, False)
 
     def teardown_method(self):
         """Cleanup state after testing in DB."""
@@ -86,7 +89,7 @@ class TestDagRunOperator:
                     synchronize_session=False
                 )
 
-        pathlib.Path(self._tmpfile).unlink()
+        # pathlib.Path(self._tmpfile).unlink()
 
     def assert_extra_link(self, triggered_dag_run, triggering_task, session):
         """
@@ -115,24 +118,32 @@ class TestDagRunOperator:
         }
         assert expected_args in args
 
-    def test_trigger_dagrun(self):
+    def test_trigger_dagrun(self, dag_maker):
         """Test TriggerDagRunOperator."""
-        task = TriggerDagRunOperator(task_id="test_task", 
trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag)
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(task_id="test_task", 
trigger_dag_id=TRIGGERED_DAG_ID)
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
-        with create_session() as session:
-            dagrun = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
-            assert dagrun.external_trigger
-            assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.logical_date)
-            self.assert_extra_link(dagrun, task, session)
+        dagrun = dag_maker.session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).one()
+        assert dagrun.external_trigger
+        assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
dagrun.logical_date)
+        self.assert_extra_link(dagrun, task, dag_maker.session)
 
-    def test_trigger_dagrun_custom_run_id(self):
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id="custom_run_id",
-            dag=self.dag,
-        )
+    def test_trigger_dagrun_custom_run_id(self, dag_maker):
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id="custom_run_id",
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
         with create_session() as session:
@@ -140,15 +151,19 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             assert dagruns[0].run_id == "custom_run_id"
 
-    def test_trigger_dagrun_with_logical_date(self):
+    def test_trigger_dagrun_with_logical_date(self, dag_maker):
         """Test TriggerDagRunOperator with custom logical_date."""
         custom_logical_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=custom_logical_date,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=custom_logical_date,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         with create_session() as session:
@@ -158,78 +173,91 @@ class TestDagRunOperator:
             assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_logical_date)
             self.assert_extra_link(dagrun, task, session)
 
-    def test_trigger_dagrun_twice(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_twice(self, dag_maker):
         """Test TriggerDagRunOperator with custom logical_date."""
         utc_now = timezone.utcnow()
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=utc_now,
-            dag=self.dag,
-            poke_interval=1,
-            reset_dag_run=True,
-            wait_for_completion=True,
-        )
         run_id = f"manual__{utc_now.isoformat()}"
-        with create_session() as session:
-            dag_run = DagRun(
-                dag_id=TRIGGERED_DAG_ID,
-                execution_date=utc_now,
-                state=State.SUCCESS,
-                run_type="manual",
-                run_id=run_id,
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id=run_id,
+                logical_date=utc_now,
+                poke_interval=1,
+                reset_dag_run=True,
+                wait_for_completion=True,
             )
-            session.add(dag_run)
-            session.commit()
-            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
+        dag_run = DagRun(
+            dag_id=TRIGGERED_DAG_ID,
+            execution_date=utc_now,
+            state=State.SUCCESS,
+            run_type="manual",
+            run_id=run_id,
+        )
+        dag_maker.session.add(dag_run)
+        dag_maker.session.commit()
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.logical_date == utc_now
-            self.assert_extra_link(triggered_dag_run, task, session)
+        dagruns = dag_maker.session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
+        assert len(dagruns) == 1
+        triggered_dag_run = dagruns[0]
+        assert triggered_dag_run.external_trigger
+        assert triggered_dag_run.logical_date == utc_now
+        self.assert_extra_link(triggered_dag_run, task, dag_maker.session)
 
-    def test_trigger_dagrun_with_scheduled_dag_run(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_with_scheduled_dag_run(self, dag_maker):
         """Test TriggerDagRunOperator with custom logical_date and scheduled 
dag_run."""
         utc_now = timezone.utcnow()
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=utc_now,
-            dag=self.dag,
-            poke_interval=1,
-            reset_dag_run=True,
-            wait_for_completion=True,
-        )
-        run_id = f"scheduled__{utc_now.isoformat()}"
-        with create_session() as session:
-            dag_run = DagRun(
-                dag_id=TRIGGERED_DAG_ID,
-                execution_date=utc_now,
-                state=State.SUCCESS,
-                run_type="scheduled",
-                run_id=run_id,
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=utc_now,
+                poke_interval=1,
+                reset_dag_run=True,
+                wait_for_completion=True,
             )
-            session.add(dag_run)
-            session.commit()
-            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
+        run_id = f"scheduled__{utc_now.isoformat()}"
+        dag_run = DagRun(
+            dag_id=TRIGGERED_DAG_ID,
+            execution_date=utc_now,
+            state=State.SUCCESS,
+            run_type="scheduled",
+            run_id=run_id,
+        )
+        dag_maker.session.add(dag_run)
+        dag_maker.session.commit()
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
-            dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
-            assert len(dagruns) == 1
-            triggered_dag_run = dagruns[0]
-            assert triggered_dag_run.external_trigger
-            assert triggered_dag_run.logical_date == utc_now
-            self.assert_extra_link(triggered_dag_run, task, session)
+        dagruns = dag_maker.session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
+        assert len(dagruns) == 1
+        triggered_dag_run = dagruns[0]
+        assert triggered_dag_run.external_trigger
+        assert triggered_dag_run.logical_date == utc_now
+        self.assert_extra_link(triggered_dag_run, task, dag_maker.session)
 
-    def test_trigger_dagrun_with_templated_logical_date(self):
+    def test_trigger_dagrun_with_templated_logical_date(self, dag_maker):
         """Test TriggerDagRunOperator with templated logical_date."""
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date="{{ logical_date }}",
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_str_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date="{{ logical_date }}",
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         with create_session() as session:
@@ -240,14 +268,18 @@ class TestDagRunOperator:
             assert triggered_dag_run.logical_date == DEFAULT_DATE
             self.assert_extra_link(triggered_dag_run, task, session)
 
-    def test_trigger_dagrun_operator_conf(self):
+    def test_trigger_dagrun_operator_conf(self, dag_maker):
         """Test passing conf to the triggered DagRun."""
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            conf={"foo": "bar"},
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_str_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                conf={"foo": "bar"},
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         with create_session() as session:
@@ -255,25 +287,33 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             assert dagruns[0].conf == {"foo": "bar"}
 
-    def test_trigger_dagrun_operator_templated_invalid_conf(self):
+    def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker):
         """Test passing a conf that is not JSON Serializable raise error."""
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_invalid_conf",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            conf={"foo": "{{ dag.dag_id }}", "datetime": timezone.utcnow()},
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_invalid_conf",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                conf={"foo": "{{ dag.dag_id }}", "datetime": 
timezone.utcnow()},
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         with pytest.raises(AirflowException, match="^conf parameter should be 
JSON Serializable$"):
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_trigger_dagrun_operator_templated_conf(self):
+    def test_trigger_dagrun_operator_templated_conf(self, dag_maker):
         """Test passing a templated conf to the triggered DagRun."""
-        task = TriggerDagRunOperator(
-            task_id="test_trigger_dagrun_with_str_logical_date",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            conf={"foo": "{{ dag.dag_id }}"},
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_trigger_dagrun_with_str_logical_date",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                conf={"foo": "{{ dag.dag_id }}"},
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         with create_session() as session:
@@ -281,17 +321,21 @@ class TestDagRunOperator:
             assert len(dagruns) == 1
             assert dagruns[0].conf == {"foo": TEST_DAG_ID}
 
-    def test_trigger_dagrun_with_reset_dag_run_false(self):
+    def test_trigger_dagrun_with_reset_dag_run_false(self, dag_maker):
         """Test TriggerDagRunOperator without reset_dag_run."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id=None,
-            logical_date=None,
-            reset_dag_run=False,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id=None,
+                logical_date=None,
+                reset_dag_run=False,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
         task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
@@ -307,39 +351,50 @@ class TestDagRunOperator:
             ("dummy_run_id", DEFAULT_DATE),
         ],
     )
-    def test_trigger_dagrun_with_reset_dag_run_false_fail(self, 
trigger_run_id, trigger_logical_date):
+    def test_trigger_dagrun_with_reset_dag_run_false_fail(
+        self, trigger_run_id, trigger_logical_date, dag_maker
+    ):
         """Test TriggerDagRunOperator without reset_dag_run but triggered dag 
fails."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id=trigger_run_id,
-            logical_date=trigger_logical_date,
-            reset_dag_run=False,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id=trigger_run_id,
+                logical_date=trigger_logical_date,
+                reset_dag_run=False,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
         with pytest.raises(DagRunAlreadyExists):
             task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
-    def test_trigger_dagrun_with_skip_when_already_exists(self):
+    def test_trigger_dagrun_with_skip_when_already_exists(self, dag_maker):
         """Test TriggerDagRunOperator with skip_when_already_exists."""
         execution_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id="dummy_run_id",
-            execution_date=None,
-            reset_dag_run=False,
-            skip_when_already_exists=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id="dummy_run_id",
+                execution_date=None,
+                reset_dag_run=False,
+                skip_when_already_exists=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dr: DagRun = dag_maker.create_dagrun()
         task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
-        assert task.get_task_instances()[0].state == TaskInstanceState.SUCCESS
+        assert dr.get_task_instance("test_task").state == 
TaskInstanceState.SUCCESS
         task.run(start_date=execution_date, end_date=execution_date, 
ignore_ti_state=True)
-        assert task.get_task_instances()[0].state == TaskInstanceState.SKIPPED
+        assert dr.get_task_instance("test_task").state == 
TaskInstanceState.SKIPPED
 
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
     @pytest.mark.parametrize(
         "trigger_run_id, trigger_logical_date, expected_dagruns_count",
         [
@@ -350,18 +405,22 @@ class TestDagRunOperator:
         ],
     )
     def test_trigger_dagrun_with_reset_dag_run_true(
-        self, trigger_run_id, trigger_logical_date, expected_dagruns_count
+        self, trigger_run_id, trigger_logical_date, expected_dagruns_count, 
dag_maker
     ):
         """Test TriggerDagRunOperator with reset_dag_run."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id=trigger_run_id,
-            logical_date=trigger_logical_date,
-            reset_dag_run=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id=trigger_run_id,
+                logical_date=trigger_logical_date,
+                reset_dag_run=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
         task.run(start_date=logical_date, end_date=logical_date, 
ignore_ti_state=True)
 
@@ -370,106 +429,132 @@ class TestDagRunOperator:
             assert len(dag_runs) == expected_dagruns_count
             assert dag_runs[0].external_trigger
 
-    def test_trigger_dagrun_with_wait_for_completion_true(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_with_wait_for_completion_true(self, dag_maker):
         """Test TriggerDagRunOperator with wait_for_completion."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            allowed_states=[State.QUEUED],
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                allowed_states=[State.QUEUED],
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
 
-    def test_trigger_dagrun_with_wait_for_completion_true_fail(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_with_wait_for_completion_true_fail(self, 
dag_maker):
         """Test TriggerDagRunOperator with wait_for_completion but triggered 
dag fails."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            failed_states=[State.QUEUED],
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                failed_states=[State.QUEUED],
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         with pytest.raises(AirflowException):
             task.run(start_date=logical_date, end_date=logical_date)
 
-    def test_trigger_dagrun_triggering_itself(self):
+    def test_trigger_dagrun_triggering_itself(self, dag_maker):
         """Test TriggerDagRunOperator that triggers itself"""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=self.dag.dag_id,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TEST_DAG_ID,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date)
 
-        with create_session() as session:
-            dagruns = (
-                session.query(DagRun)
-                .filter(DagRun.dag_id == self.dag.dag_id)
-                .order_by(DagRun.execution_date)
-                .all()
-            )
-            assert len(dagruns) == 2
-            triggered_dag_run = dagruns[1]
-            assert triggered_dag_run.state == State.QUEUED
-            self.assert_extra_link(triggered_dag_run, task, session)
+        dagruns = (
+            dag_maker.session.query(DagRun)
+            .filter(DagRun.dag_id == TEST_DAG_ID)
+            .order_by(DagRun.execution_date)
+            .all()
+        )
+        assert len(dagruns) == 2
+        triggered_dag_run = dagruns[1]
+        assert triggered_dag_run.state == State.QUEUED
 
-    def test_trigger_dagrun_triggering_itself_with_logical_date(self):
+    def test_trigger_dagrun_triggering_itself_with_logical_date(self, 
dag_maker):
         """Test TriggerDagRunOperator that triggers itself with logical date,
         fails with DagRunAlreadyExists"""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=self.dag.dag_id,
-            logical_date=logical_date,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TEST_DAG_ID,
+                logical_date=logical_date,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         with pytest.raises(DagRunAlreadyExists):
             task.run(start_date=logical_date, end_date=logical_date)
 
-    def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self, 
dag_maker):
         """Test TriggerDagRunOperator with wait_for_completion."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            allowed_states=[State.QUEUED],
-            deferrable=False,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                allowed_states=[State.QUEUED],
+                deferrable=False,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=logical_date, end_date=logical_date)
 
         with create_session() as session:
             dagruns = session.query(DagRun).filter(DagRun.dag_id == 
TRIGGERED_DAG_ID).all()
             assert len(dagruns) == 1
 
-    def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self, 
dag_maker):
         """Test TriggerDagRunOperator with wait_for_completion."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            allowed_states=[State.QUEUED],
-            deferrable=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                allowed_states=[State.QUEUED],
+                deferrable=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         task.run(start_date=logical_date, end_date=logical_date)
 
@@ -485,19 +570,24 @@ class TestDagRunOperator:
 
         task.execute_complete(context={}, event=trigger.serialize())
 
-    def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self, 
dag_maker):
         """Test TriggerDagRunOperator wait_for_completion dag run in non 
defined state."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            allowed_states=[State.SUCCESS],
-            deferrable=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                allowed_states=[State.SUCCESS],
+                deferrable=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         task.run(start_date=logical_date, end_date=logical_date)
 
@@ -517,20 +607,25 @@ class TestDagRunOperator:
                 event=trigger.serialize(),
             )
 
-    def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def 
test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self, 
dag_maker):
         """Test TriggerDagRunOperator  wait_for_completion dag run in failed 
state."""
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            allowed_states=[State.SUCCESS],
-            failed_states=[State.QUEUED],
-            deferrable=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                allowed_states=[State.SUCCESS],
+                failed_states=[State.QUEUED],
+                deferrable=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         task.run(start_date=logical_date, end_date=logical_date)
 
@@ -548,19 +643,23 @@ class TestDagRunOperator:
         with pytest.raises(AirflowException, match="failed with failed state"):
             task.execute_complete(context={}, event=trigger.serialize())
 
-    def test_trigger_dagrun_with_execution_date(self):
+    def test_trigger_dagrun_with_execution_date(self, dag_maker):
         """Test TriggerDagRunOperator with custom execution_date (deprecated 
parameter)"""
         custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5)
-        with pytest.warns(
-            RemovedInAirflow3Warning,
-            match="Parameter 'execution_date' is deprecated. Use 
'logical_date' instead.",
-        ):
-            task = TriggerDagRunOperator(
-                task_id="test_trigger_dagrun_with_execution_date",
-                trigger_dag_id=TRIGGERED_DAG_ID,
-                execution_date=custom_execution_date,
-                dag=self.dag,
-            )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            with pytest.warns(
+                RemovedInAirflow3Warning,
+                match="Parameter 'execution_date' is deprecated. Use 
'logical_date' instead.",
+            ):
+                task = TriggerDagRunOperator(
+                    task_id="test_trigger_dagrun_with_execution_date",
+                    trigger_dag_id=TRIGGERED_DAG_ID,
+                    execution_date=custom_execution_date,
+                )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
         with create_session() as session:
@@ -570,6 +669,7 @@ class TestDagRunOperator:
             assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, 
custom_execution_date)
             self.assert_extra_link(dagrun, task, session)
 
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
     @pytest.mark.parametrize(
         argnames=["trigger_logical_date"],
         argvalues=[
@@ -577,18 +677,22 @@ class TestDagRunOperator:
             pytest.param(None, id="logical_date=None"),
         ],
     )
-    def test_dagstatetrigger_execution_dates(self, trigger_logical_date):
+    def test_dagstatetrigger_execution_dates(self, trigger_logical_date, 
dag_maker):
         """Ensure that the DagStateTrigger is called with the triggered DAG's 
logical date."""
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=trigger_logical_date,
-            wait_for_completion=True,
-            poke_interval=5,
-            allowed_states=[DagRunState.QUEUED],
-            deferrable=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=trigger_logical_date,
+                wait_for_completion=True,
+                poke_interval=5,
+                allowed_states=[DagRunState.QUEUED],
+                deferrable=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         mock_task_defer = mock.MagicMock(side_effect=task.defer)
         with mock.patch.object(TriggerDagRunOperator, "defer", 
mock_task_defer), pytest.raises(TaskDeferred):
@@ -602,19 +706,24 @@ class TestDagRunOperator:
             pendulum.instance(dagruns[0].logical_date)
         ]
 
-    def test_dagstatetrigger_execution_dates_with_clear_and_reset(self):
+    @pytest.mark.skip_if_database_isolation_mode  # Known to be broken in db 
isolation mode
+    def test_dagstatetrigger_execution_dates_with_clear_and_reset(self, 
dag_maker):
         """Check DagStateTrigger is called with the triggered DAG's logical 
date on subsequent defers."""
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            trigger_run_id="custom_run_id",
-            wait_for_completion=True,
-            poke_interval=5,
-            allowed_states=[DagRunState.QUEUED],
-            deferrable=True,
-            reset_dag_run=True,
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                trigger_run_id="custom_run_id",
+                wait_for_completion=True,
+                poke_interval=5,
+                allowed_states=[DagRunState.QUEUED],
+                deferrable=True,
+                reset_dag_run=True,
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         mock_task_defer = mock.MagicMock(side_effect=task.defer)
         with mock.patch.object(TriggerDagRunOperator, "defer", 
mock_task_defer), pytest.raises(TaskDeferred):
@@ -647,16 +756,20 @@ class TestDagRunOperator:
             pendulum.instance(triggered_logical_date)
         ]
 
-    def test_trigger_dagrun_with_no_failed_state(self):
+    def test_trigger_dagrun_with_no_failed_state(self, dag_maker):
         logical_date = DEFAULT_DATE
-        task = TriggerDagRunOperator(
-            task_id="test_task",
-            trigger_dag_id=TRIGGERED_DAG_ID,
-            logical_date=logical_date,
-            wait_for_completion=True,
-            poke_interval=10,
-            failed_states=[],
-            dag=self.dag,
-        )
+        with dag_maker(
+            TEST_DAG_ID, default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True
+        ) as dag:
+            task = TriggerDagRunOperator(
+                task_id="test_task",
+                trigger_dag_id=TRIGGERED_DAG_ID,
+                logical_date=logical_date,
+                wait_for_completion=True,
+                poke_interval=10,
+                failed_states=[],
+            )
+        self.re_sync_triggered_dag_to_db(dag, dag_maker)
+        dag_maker.create_dagrun()
 
         assert task.failed_states == []

Reply via email to