This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 18aa7dca79 Fix some mor operators tests for Database Isolation Mode 
(#41290)
18aa7dca79 is described below

commit 18aa7dca799a77d69d2c148a9437ebbc2e6fcc88
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 6 22:44:59 2024 +0200

    Fix some mor operators tests for Database Isolation Mode (#41290)
---
 tests/operators/test_generic_transfer.py     |  69 ++++-----
 tests/operators/test_latest_only_operator.py |  62 ++++----
 tests/operators/test_subdag_operator.py      |  23 +--
 tests/operators/test_weekday.py              | 223 ++++++++++++++-------------
 4 files changed, 193 insertions(+), 184 deletions(-)

diff --git a/tests/operators/test_generic_transfer.py 
b/tests/operators/test_generic_transfer.py
index d10dcf2846..7f9fd07da1 100644
--- a/tests/operators/test_generic_transfer.py
+++ b/tests/operators/test_generic_transfer.py
@@ -102,11 +102,6 @@ class TestMySql:
 
 @pytest.mark.backend("postgres")
 class TestPostgres:
-    def setup_method(self):
-        args = {"owner": "airflow", "start_date": DEFAULT_DATE}
-        dag = DAG(TEST_DAG_ID, default_args=args)
-        self.dag = dag
-
     def teardown_method(self):
         tables_to_drop = ["test_postgres_to_postgres", "test_airflow"]
         with PostgresHook().get_conn() as conn:
@@ -114,42 +109,44 @@ class TestPostgres:
                 for table in tables_to_drop:
                     cur.execute(f"DROP TABLE IF EXISTS {table}")
 
-    def test_postgres_to_postgres(self):
+    def test_postgres_to_postgres(self, dag_maker):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
-        op = GenericTransfer(
-            task_id="test_p2p",
-            preoperator=[
-                "DROP TABLE IF EXISTS test_postgres_to_postgres",
-                "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres (LIKE 
INFORMATION_SCHEMA.TABLES)",
-            ],
-            source_conn_id="postgres_default",
-            destination_conn_id="postgres_default",
-            destination_table="test_postgres_to_postgres",
-            sql=sql,
-            dag=self.dag,
-        )
+        with dag_maker(default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True):
+            op = GenericTransfer(
+                task_id="test_p2p",
+                preoperator=[
+                    "DROP TABLE IF EXISTS test_postgres_to_postgres",
+                    "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres 
(LIKE INFORMATION_SCHEMA.TABLES)",
+                ],
+                source_conn_id="postgres_default",
+                destination_conn_id="postgres_default",
+                destination_table="test_postgres_to_postgres",
+                sql=sql,
+            )
+        dag_maker.create_dagrun()
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
     @mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.insert_rows")
-    def test_postgres_to_postgres_replace(self, mock_insert):
+    def test_postgres_to_postgres_replace(self, mock_insert, dag_maker):
         sql = "SELECT id, conn_id, conn_type FROM connection LIMIT 10;"
-        op = GenericTransfer(
-            task_id="test_p2p",
-            preoperator=[
-                "DROP TABLE IF EXISTS test_postgres_to_postgres",
-                "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres (LIKE 
connection INCLUDING INDEXES)",
-            ],
-            source_conn_id="postgres_default",
-            destination_conn_id="postgres_default",
-            destination_table="test_postgres_to_postgres",
-            sql=sql,
-            dag=self.dag,
-            insert_args={
-                "replace": True,
-                "target_fields": ("id", "conn_id", "conn_type"),
-                "replace_index": "id",
-            },
-        )
+        with dag_maker(default_args={"owner": "airflow", "start_date": 
DEFAULT_DATE}, serialized=True):
+            op = GenericTransfer(
+                task_id="test_p2p",
+                preoperator=[
+                    "DROP TABLE IF EXISTS test_postgres_to_postgres",
+                    "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres 
(LIKE connection INCLUDING INDEXES)",
+                ],
+                source_conn_id="postgres_default",
+                destination_conn_id="postgres_default",
+                destination_table="test_postgres_to_postgres",
+                sql=sql,
+                insert_args={
+                    "replace": True,
+                    "target_fields": ("id", "conn_id", "conn_type"),
+                    "replace_index": "id",
+                },
+            )
+        dag_maker.create_dagrun()
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
         assert mock_insert.called
         _, kwargs = mock_insert.call_args
diff --git a/tests/operators/test_latest_only_operator.py 
b/tests/operators/test_latest_only_operator.py
index e67463aea0..53a12aca18 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -24,7 +24,6 @@ import time_machine
 
 from airflow import settings
 from airflow.models import DagRun, TaskInstance
-from airflow.models.dag import DAG
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils import timezone
@@ -63,11 +62,6 @@ class TestLatestOnlyOperator:
         self.clean_db()
 
     def setup_method(self):
-        self.dag = DAG(
-            "test_dag",
-            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
-            schedule=INTERVAL,
-        )
         self.freezer = time_machine.travel(FROZEN_NOW, tick=False)
         self.freezer.start()
 
@@ -75,23 +69,28 @@ class TestLatestOnlyOperator:
         self.freezer.stop()
         self.clean_db()
 
-    def test_run(self):
-        task = LatestOnlyOperator(task_id="latest", dag=self.dag)
+    def test_run(self, dag_maker):
+        with dag_maker(
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, 
schedule=INTERVAL, serialized=True
+        ):
+            task = LatestOnlyOperator(task_id="latest")
+        dag_maker.create_dagrun()
         task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    def test_skipping_non_latest(self):
-        latest_task = LatestOnlyOperator(task_id="latest", dag=self.dag)
-        downstream_task = EmptyOperator(task_id="downstream", dag=self.dag)
-        downstream_task2 = EmptyOperator(task_id="downstream_2", dag=self.dag)
-        downstream_task3 = EmptyOperator(
-            task_id="downstream_3", trigger_rule=TriggerRule.NONE_FAILED, 
dag=self.dag
-        )
+    def test_skipping_non_latest(self, dag_maker):
+        with dag_maker(
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, 
schedule=INTERVAL, serialized=True
+        ):
+            latest_task = LatestOnlyOperator(task_id="latest")
+            downstream_task = EmptyOperator(task_id="downstream")
+            downstream_task2 = EmptyOperator(task_id="downstream_2")
+            downstream_task3 = EmptyOperator(task_id="downstream_3", 
trigger_rule=TriggerRule.NONE_FAILED)
 
-        downstream_task.set_upstream(latest_task)
-        downstream_task2.set_upstream(downstream_task)
-        downstream_task3.set_upstream(downstream_task)
+            downstream_task.set_upstream(latest_task)
+            downstream_task2.set_upstream(downstream_task)
+            downstream_task3.set_upstream(downstream_task)
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -99,7 +98,7 @@ class TestLatestOnlyOperator:
             data_interval=(DEFAULT_DATE, DEFAULT_DATE),
         )
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             start_date=timezone.utcnow(),
             execution_date=timezone.datetime(2016, 1, 1, 12),
@@ -107,7 +106,7 @@ class TestLatestOnlyOperator:
             data_interval=(timezone.datetime(2016, 1, 1, 12), 
timezone.datetime(2016, 1, 1, 12) + INTERVAL),
         )
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             start_date=timezone.utcnow(),
             execution_date=END_DATE,
@@ -152,15 +151,18 @@ class TestLatestOnlyOperator:
             timezone.datetime(2016, 1, 2): "success",
         } == exec_date_to_downstream_state
 
-    def test_not_skipping_external(self):
-        latest_task = LatestOnlyOperator(task_id="latest", dag=self.dag)
-        downstream_task = EmptyOperator(task_id="downstream", dag=self.dag)
-        downstream_task2 = EmptyOperator(task_id="downstream_2", dag=self.dag)
+    def test_not_skipping_external(self, dag_maker):
+        with dag_maker(
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, 
schedule=INTERVAL, serialized=True
+        ):
+            latest_task = LatestOnlyOperator(task_id="latest")
+            downstream_task = EmptyOperator(task_id="downstream")
+            downstream_task2 = EmptyOperator(task_id="downstream_2")
 
-        downstream_task.set_upstream(latest_task)
-        downstream_task2.set_upstream(downstream_task)
+            downstream_task.set_upstream(latest_task)
+            downstream_task2.set_upstream(downstream_task)
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -170,7 +172,7 @@ class TestLatestOnlyOperator:
         )
 
         execution_date = timezone.datetime(2016, 1, 1, 12)
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             execution_date=execution_date,
@@ -179,7 +181,7 @@ class TestLatestOnlyOperator:
             data_interval=(execution_date, execution_date),
         )
 
-        self.dag.create_dagrun(
+        dag_maker.create_dagrun(
             run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             execution_date=END_DATE,
diff --git a/tests/operators/test_subdag_operator.py 
b/tests/operators/test_subdag_operator.py
index 3437560da4..ca669a9e4e 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -88,21 +88,21 @@ class TestSubDagOperator:
             assert op.dag == dag
             assert op.subdag == subdag
 
-    def test_subdag_pools(self):
+    def test_subdag_pools(self, dag_maker):
         """
         Subdags and subdag tasks can't both have a pool with 1 slot
         """
-        dag = DAG("parent", default_args=default_args)
-        subdag = DAG("parent.child", default_args=default_args)
+        with dag_maker("parent", default_args=default_args, serialized=True) 
as dag:
+            pass
 
-        session = airflow.settings.Session()
         pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1, 
include_deferred=False)
         pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10, 
include_deferred=False)
-        session.add(pool_1)
-        session.add(pool_10)
-        session.commit()
+        dag_maker.session.add(pool_1)
+        dag_maker.session.add(pool_10)
+        dag_maker.session.commit()
 
-        EmptyOperator(task_id="dummy", dag=subdag, pool="test_pool_1")
+        with dag_maker("parent.child", default_args=default_args, 
serialized=True) as subdag:
+            EmptyOperator(task_id="dummy", pool="test_pool_1")
 
         with pytest.raises(AirflowException):
             SubDagOperator(task_id="child", dag=dag, subdag=subdag, 
pool="test_pool_1")
@@ -112,9 +112,9 @@ class TestSubDagOperator:
         with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
             SubDagOperator(task_id="child", dag=dag, subdag=subdag, 
pool="test_pool_10")
 
-        session.delete(pool_1)
-        session.delete(pool_10)
-        session.commit()
+        dag_maker.session.delete(pool_1)
+        dag_maker.session.delete(pool_10)
+        dag_maker.session.commit()
 
     def test_subdag_pools_no_possible_conflict(self):
         """
@@ -269,6 +269,7 @@ class TestSubDagOperator:
         subdag.create_dagrun.assert_not_called()
         assert 3 == subdag_task._get_dagrun.call_count
 
+    @pytest.mark.skip_if_database_isolation_mode  # this uses functions which 
operate directly on DB
     def test_rerun_failed_subdag(self, dag_maker):
         """
         When there is an existing DagRun with failed state, reset the DagRun 
and the
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index d4b2c938d9..9030942c65 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -23,7 +23,6 @@ import pytest
 import time_machine
 
 from airflow.exceptions import AirflowException
-from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.models.xcom import XCom
@@ -67,22 +66,6 @@ class TestBranchDayOfWeekOperator:
             session.query(TI).delete()
             session.query(XCom).delete()
 
-    def setup_method(self):
-        self.dag = DAG(
-            "branch_day_of_week_operator_test",
-            start_date=DEFAULT_DATE,
-            schedule=INTERVAL,
-        )
-        self.branch_1 = EmptyOperator(task_id="branch_1", dag=self.dag)
-        self.branch_2 = EmptyOperator(task_id="branch_2", dag=self.dag)
-        self.branch_3 = None
-
-    def teardown_method(self):
-        with create_session() as session:
-            session.query(DagRun).delete()
-            session.query(TI).delete()
-            session.query(XCom).delete()
-
     def _assert_task_ids_match_states(self, dr, task_ids_to_states):
         """Helper that asserts task instances with a given id are in a given 
state"""
         tis = dr.get_task_instances()
@@ -99,23 +82,25 @@ class TestBranchDayOfWeekOperator:
         "weekday", TEST_CASE_BRANCH_FOLLOW_TRUE.values(), 
ids=TEST_CASE_BRANCH_FOLLOW_TRUE.keys()
     )
     @time_machine.travel("2021-01-25")  # Monday
-    def test_branch_follow_true(self, weekday):
+    def test_branch_follow_true(self, weekday, dag_maker):
         """Checks if BranchDayOfWeekOperator follows true branch"""
-        branch_op = BranchDayOfWeekOperator(
-            task_id="make_choice",
-            follow_task_ids_if_true=["branch_1", "branch_2"],
-            follow_task_ids_if_false="branch_3",
-            week_day=weekday,
-            dag=self.dag,
-        )
-
-        self.branch_1.set_upstream(branch_op)
-        self.branch_2.set_upstream(branch_op)
-        self.branch_3 = EmptyOperator(task_id="branch_3", dag=self.dag)
-        self.branch_3.set_upstream(branch_op)
-        self.dag.clear()
-
-        dr = self.dag.create_dagrun(
+        with dag_maker(
+            "branch_day_of_week_operator_test", start_date=DEFAULT_DATE, 
schedule=INTERVAL, serialized=True
+        ):
+            branch_op = BranchDayOfWeekOperator(
+                task_id="make_choice",
+                follow_task_ids_if_true=["branch_1", "branch_2"],
+                follow_task_ids_if_false="branch_3",
+                week_day=weekday,
+            )
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+            branch_3 = EmptyOperator(task_id="branch_3")
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
+            branch_3.set_upstream(branch_op)
+
+        dr = dag_maker.create_dagrun(
             run_id="manual__",
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -136,23 +121,24 @@ class TestBranchDayOfWeekOperator:
         )
 
     @time_machine.travel("2021-01-25")  # Monday
-    def test_branch_follow_true_with_execution_date(self):
+    def test_branch_follow_true_with_execution_date(self, dag_maker):
         """Checks if BranchDayOfWeekOperator follows true branch when set 
use_task_logical_date"""
+        with dag_maker(
+            "branch_day_of_week_operator_test", start_date=DEFAULT_DATE, 
schedule=INTERVAL, serialized=True
+        ):
+            branch_op = BranchDayOfWeekOperator(
+                task_id="make_choice",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                week_day="Wednesday",
+                use_task_logical_date=True,  # We compare to DEFAULT_DATE 
which is Wednesday
+            )
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
 
-        branch_op = BranchDayOfWeekOperator(
-            task_id="make_choice",
-            follow_task_ids_if_true="branch_1",
-            follow_task_ids_if_false="branch_2",
-            week_day="Wednesday",
-            use_task_logical_date=True,  # We compare to DEFAULT_DATE which is 
Wednesday
-            dag=self.dag,
-        )
-
-        self.branch_1.set_upstream(branch_op)
-        self.branch_2.set_upstream(branch_op)
-        self.dag.clear()
-
-        dr = self.dag.create_dagrun(
+        dr = dag_maker.create_dagrun(
             run_id="manual__",
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -172,22 +158,23 @@ class TestBranchDayOfWeekOperator:
         )
 
     @time_machine.travel("2021-01-25")  # Monday
-    def test_branch_follow_false(self):
+    def test_branch_follow_false(self, dag_maker):
         """Checks if BranchDayOfWeekOperator follow false branch"""
+        with dag_maker(
+            "branch_day_of_week_operator_test", start_date=DEFAULT_DATE, 
schedule=INTERVAL, serialized=True
+        ):
+            branch_op = BranchDayOfWeekOperator(
+                task_id="make_choice",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                week_day="Sunday",
+            )
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
 
-        branch_op = BranchDayOfWeekOperator(
-            task_id="make_choice",
-            follow_task_ids_if_true="branch_1",
-            follow_task_ids_if_false="branch_2",
-            week_day="Sunday",
-            dag=self.dag,
-        )
-
-        self.branch_1.set_upstream(branch_op)
-        self.branch_2.set_upstream(branch_op)
-        self.dag.clear()
-
-        dr = self.dag.create_dagrun(
+        dr = dag_maker.create_dagrun(
             run_id="manual__",
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -206,17 +193,22 @@ class TestBranchDayOfWeekOperator:
             },
         )
 
-    def test_branch_with_no_weekday(self):
+    def test_branch_with_no_weekday(self, dag_maker):
         """Check if BranchDayOfWeekOperator raises exception on missing 
weekday"""
         with pytest.raises(AirflowException):
-            BranchDayOfWeekOperator(
-                task_id="make_choice",
-                follow_task_ids_if_true="branch_1",
-                follow_task_ids_if_false="branch_2",
-                dag=self.dag,
-            )
-
-    def test_branch_with_invalid_type(self):
+            with dag_maker(
+                "branch_day_of_week_operator_test",
+                start_date=DEFAULT_DATE,
+                schedule=INTERVAL,
+                serialized=True,
+            ):
+                BranchDayOfWeekOperator(
+                    task_id="make_choice",
+                    follow_task_ids_if_true="branch_1",
+                    follow_task_ids_if_false="branch_2",
+                )
+
+    def test_branch_with_invalid_type(self, dag_maker):
         """Check if BranchDayOfWeekOperator raises exception on unsupported 
weekday type"""
         invalid_week_day = 5
         with pytest.raises(
@@ -225,13 +217,18 @@ class TestBranchDayOfWeekOperator:
             "Input should be iterable type:"
             "str, set, list, dict or Weekday enum type",
         ):
-            BranchDayOfWeekOperator(
-                task_id="make_choice",
-                follow_task_ids_if_true="branch_1",
-                follow_task_ids_if_false="branch_2",
-                week_day=invalid_week_day,
-                dag=self.dag,
-            )
+            with dag_maker(
+                "branch_day_of_week_operator_test",
+                start_date=DEFAULT_DATE,
+                schedule=INTERVAL,
+                serialized=True,
+            ):
+                BranchDayOfWeekOperator(
+                    task_id="make_choice",
+                    follow_task_ids_if_true="branch_1",
+                    follow_task_ids_if_false="branch_2",
+                    week_day=invalid_week_day,
+                )
 
     @pytest.mark.parametrize(
         "_,week_day,fail_msg",
@@ -241,33 +238,40 @@ class TestBranchDayOfWeekOperator:
             ("set", {WeekDay.MONDAY, "Thsday"}, "Thsday"),
         ],
     )
-    def test_weekday_branch_invalid_weekday_value(self, _, week_day, fail_msg):
+    def test_weekday_branch_invalid_weekday_value(self, _, week_day, fail_msg, 
dag_maker):
         """Check if BranchDayOfWeekOperator raises exception on wrong value of 
weekday"""
         with pytest.raises(AttributeError, match=f'Invalid Week Day passed: 
"{fail_msg}"'):
-            BranchDayOfWeekOperator(
+            with dag_maker(
+                "branch_day_of_week_operator_test",
+                start_date=DEFAULT_DATE,
+                schedule=INTERVAL,
+                serialized=True,
+            ):
+                BranchDayOfWeekOperator(
+                    task_id="make_choice",
+                    follow_task_ids_if_true="branch_1",
+                    follow_task_ids_if_false="branch_2",
+                    week_day=week_day,
+                )
+
+    @time_machine.travel("2021-01-25")  # Monday
+    def test_branch_xcom_push_true_branch(self, dag_maker):
+        """Check if BranchDayOfWeekOperator push to xcom value of 
follow_task_ids_if_true"""
+        with dag_maker(
+            "branch_day_of_week_operator_test", start_date=DEFAULT_DATE, 
schedule=INTERVAL, serialized=True
+        ):
+            branch_op = BranchDayOfWeekOperator(
                 task_id="make_choice",
                 follow_task_ids_if_true="branch_1",
                 follow_task_ids_if_false="branch_2",
-                week_day=week_day,
-                dag=self.dag,
+                week_day="Monday",
             )
+            branch_1 = EmptyOperator(task_id="branch_1")
+            branch_2 = EmptyOperator(task_id="branch_2")
+            branch_1.set_upstream(branch_op)
+            branch_2.set_upstream(branch_op)
 
-    @time_machine.travel("2021-01-25")  # Monday
-    def test_branch_xcom_push_true_branch(self):
-        """Check if BranchDayOfWeekOperator push to xcom value of 
follow_task_ids_if_true"""
-        branch_op = BranchDayOfWeekOperator(
-            task_id="make_choice",
-            follow_task_ids_if_true="branch_1",
-            follow_task_ids_if_false="branch_2",
-            week_day="Monday",
-            dag=self.dag,
-        )
-
-        self.branch_1.set_upstream(branch_op)
-        self.branch_2.set_upstream(branch_op)
-        self.dag.clear()
-
-        dr = self.dag.create_dagrun(
+        dr = dag_maker.create_dagrun(
             run_id="manual__",
             start_date=timezone.utcnow(),
             execution_date=DEFAULT_DATE,
@@ -282,17 +286,22 @@ class TestBranchDayOfWeekOperator:
             if ti.task_id == "make_choice":
                 assert ti.xcom_pull(task_ids="make_choice") == "branch_1"
 
-    def test_deprecation_warning(self):
+    def test_deprecation_warning(self, dag_maker):
         warning_message = (
             """Parameter ``use_task_execution_day`` is deprecated. Use 
``use_task_logical_date``."""
         )
         with pytest.warns(DeprecationWarning) as warnings:
-            BranchDayOfWeekOperator(
-                task_id="week_day_warn",
-                follow_task_ids_if_true="branch_1",
-                follow_task_ids_if_false="branch_2",
-                week_day="Monday",
-                use_task_execution_day=True,
-                dag=self.dag,
-            )
+            with dag_maker(
+                "branch_day_of_week_operator_test",
+                start_date=DEFAULT_DATE,
+                schedule=INTERVAL,
+                serialized=True,
+            ):
+                BranchDayOfWeekOperator(
+                    task_id="week_day_warn",
+                    follow_task_ids_if_true="branch_1",
+                    follow_task_ids_if_false="branch_2",
+                    week_day="Monday",
+                    use_task_execution_day=True,
+                )
         assert warning_message == str(warnings[0].message)

Reply via email to