This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 18aa7dca79 Fix some mor operators tests for Database Isolation Mode
(#41290)
18aa7dca79 is described below
commit 18aa7dca799a77d69d2c148a9437ebbc2e6fcc88
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 6 22:44:59 2024 +0200
Fix some mor operators tests for Database Isolation Mode (#41290)
---
tests/operators/test_generic_transfer.py | 69 ++++-----
tests/operators/test_latest_only_operator.py | 62 ++++----
tests/operators/test_subdag_operator.py | 23 +--
tests/operators/test_weekday.py | 223 ++++++++++++++-------------
4 files changed, 193 insertions(+), 184 deletions(-)
diff --git a/tests/operators/test_generic_transfer.py
b/tests/operators/test_generic_transfer.py
index d10dcf2846..7f9fd07da1 100644
--- a/tests/operators/test_generic_transfer.py
+++ b/tests/operators/test_generic_transfer.py
@@ -102,11 +102,6 @@ class TestMySql:
@pytest.mark.backend("postgres")
class TestPostgres:
- def setup_method(self):
- args = {"owner": "airflow", "start_date": DEFAULT_DATE}
- dag = DAG(TEST_DAG_ID, default_args=args)
- self.dag = dag
-
def teardown_method(self):
tables_to_drop = ["test_postgres_to_postgres", "test_airflow"]
with PostgresHook().get_conn() as conn:
@@ -114,42 +109,44 @@ class TestPostgres:
for table in tables_to_drop:
cur.execute(f"DROP TABLE IF EXISTS {table}")
- def test_postgres_to_postgres(self):
+ def test_postgres_to_postgres(self, dag_maker):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
- op = GenericTransfer(
- task_id="test_p2p",
- preoperator=[
- "DROP TABLE IF EXISTS test_postgres_to_postgres",
- "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres (LIKE
INFORMATION_SCHEMA.TABLES)",
- ],
- source_conn_id="postgres_default",
- destination_conn_id="postgres_default",
- destination_table="test_postgres_to_postgres",
- sql=sql,
- dag=self.dag,
- )
+ with dag_maker(default_args={"owner": "airflow", "start_date":
DEFAULT_DATE}, serialized=True):
+ op = GenericTransfer(
+ task_id="test_p2p",
+ preoperator=[
+ "DROP TABLE IF EXISTS test_postgres_to_postgres",
+ "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres
(LIKE INFORMATION_SCHEMA.TABLES)",
+ ],
+ source_conn_id="postgres_default",
+ destination_conn_id="postgres_default",
+ destination_table="test_postgres_to_postgres",
+ sql=sql,
+ )
+ dag_maker.create_dagrun()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
@mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.insert_rows")
- def test_postgres_to_postgres_replace(self, mock_insert):
+ def test_postgres_to_postgres_replace(self, mock_insert, dag_maker):
sql = "SELECT id, conn_id, conn_type FROM connection LIMIT 10;"
- op = GenericTransfer(
- task_id="test_p2p",
- preoperator=[
- "DROP TABLE IF EXISTS test_postgres_to_postgres",
- "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres (LIKE
connection INCLUDING INDEXES)",
- ],
- source_conn_id="postgres_default",
- destination_conn_id="postgres_default",
- destination_table="test_postgres_to_postgres",
- sql=sql,
- dag=self.dag,
- insert_args={
- "replace": True,
- "target_fields": ("id", "conn_id", "conn_type"),
- "replace_index": "id",
- },
- )
+ with dag_maker(default_args={"owner": "airflow", "start_date":
DEFAULT_DATE}, serialized=True):
+ op = GenericTransfer(
+ task_id="test_p2p",
+ preoperator=[
+ "DROP TABLE IF EXISTS test_postgres_to_postgres",
+ "CREATE TABLE IF NOT EXISTS test_postgres_to_postgres
(LIKE connection INCLUDING INDEXES)",
+ ],
+ source_conn_id="postgres_default",
+ destination_conn_id="postgres_default",
+ destination_table="test_postgres_to_postgres",
+ sql=sql,
+ insert_args={
+ "replace": True,
+ "target_fields": ("id", "conn_id", "conn_type"),
+ "replace_index": "id",
+ },
+ )
+ dag_maker.create_dagrun()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
assert mock_insert.called
_, kwargs = mock_insert.call_args
diff --git a/tests/operators/test_latest_only_operator.py
b/tests/operators/test_latest_only_operator.py
index e67463aea0..53a12aca18 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -24,7 +24,6 @@ import time_machine
from airflow import settings
from airflow.models import DagRun, TaskInstance
-from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils import timezone
@@ -63,11 +62,6 @@ class TestLatestOnlyOperator:
self.clean_db()
def setup_method(self):
- self.dag = DAG(
- "test_dag",
- default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
- schedule=INTERVAL,
- )
self.freezer = time_machine.travel(FROZEN_NOW, tick=False)
self.freezer.start()
@@ -75,23 +69,28 @@ class TestLatestOnlyOperator:
self.freezer.stop()
self.clean_db()
- def test_run(self):
- task = LatestOnlyOperator(task_id="latest", dag=self.dag)
+ def test_run(self, dag_maker):
+ with dag_maker(
+ default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
schedule=INTERVAL, serialized=True
+ ):
+ task = LatestOnlyOperator(task_id="latest")
+ dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
- def test_skipping_non_latest(self):
- latest_task = LatestOnlyOperator(task_id="latest", dag=self.dag)
- downstream_task = EmptyOperator(task_id="downstream", dag=self.dag)
- downstream_task2 = EmptyOperator(task_id="downstream_2", dag=self.dag)
- downstream_task3 = EmptyOperator(
- task_id="downstream_3", trigger_rule=TriggerRule.NONE_FAILED,
dag=self.dag
- )
+ def test_skipping_non_latest(self, dag_maker):
+ with dag_maker(
+ default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
schedule=INTERVAL, serialized=True
+ ):
+ latest_task = LatestOnlyOperator(task_id="latest")
+ downstream_task = EmptyOperator(task_id="downstream")
+ downstream_task2 = EmptyOperator(task_id="downstream_2")
+ downstream_task3 = EmptyOperator(task_id="downstream_3",
trigger_rule=TriggerRule.NONE_FAILED)
- downstream_task.set_upstream(latest_task)
- downstream_task2.set_upstream(downstream_task)
- downstream_task3.set_upstream(downstream_task)
+ downstream_task.set_upstream(latest_task)
+ downstream_task2.set_upstream(downstream_task)
+ downstream_task3.set_upstream(downstream_task)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -99,7 +98,7 @@ class TestLatestOnlyOperator:
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
start_date=timezone.utcnow(),
execution_date=timezone.datetime(2016, 1, 1, 12),
@@ -107,7 +106,7 @@ class TestLatestOnlyOperator:
data_interval=(timezone.datetime(2016, 1, 1, 12),
timezone.datetime(2016, 1, 1, 12) + INTERVAL),
)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
start_date=timezone.utcnow(),
execution_date=END_DATE,
@@ -152,15 +151,18 @@ class TestLatestOnlyOperator:
timezone.datetime(2016, 1, 2): "success",
} == exec_date_to_downstream_state
- def test_not_skipping_external(self):
- latest_task = LatestOnlyOperator(task_id="latest", dag=self.dag)
- downstream_task = EmptyOperator(task_id="downstream", dag=self.dag)
- downstream_task2 = EmptyOperator(task_id="downstream_2", dag=self.dag)
+ def test_not_skipping_external(self, dag_maker):
+ with dag_maker(
+ default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
schedule=INTERVAL, serialized=True
+ ):
+ latest_task = LatestOnlyOperator(task_id="latest")
+ downstream_task = EmptyOperator(task_id="downstream")
+ downstream_task2 = EmptyOperator(task_id="downstream_2")
- downstream_task.set_upstream(latest_task)
- downstream_task2.set_upstream(downstream_task)
+ downstream_task.set_upstream(latest_task)
+ downstream_task2.set_upstream(downstream_task)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -170,7 +172,7 @@ class TestLatestOnlyOperator:
)
execution_date = timezone.datetime(2016, 1, 1, 12)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
execution_date=execution_date,
@@ -179,7 +181,7 @@ class TestLatestOnlyOperator:
data_interval=(execution_date, execution_date),
)
- self.dag.create_dagrun(
+ dag_maker.create_dagrun(
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
execution_date=END_DATE,
diff --git a/tests/operators/test_subdag_operator.py
b/tests/operators/test_subdag_operator.py
index 3437560da4..ca669a9e4e 100644
--- a/tests/operators/test_subdag_operator.py
+++ b/tests/operators/test_subdag_operator.py
@@ -88,21 +88,21 @@ class TestSubDagOperator:
assert op.dag == dag
assert op.subdag == subdag
- def test_subdag_pools(self):
+ def test_subdag_pools(self, dag_maker):
"""
Subdags and subdag tasks can't both have a pool with 1 slot
"""
- dag = DAG("parent", default_args=default_args)
- subdag = DAG("parent.child", default_args=default_args)
+ with dag_maker("parent", default_args=default_args, serialized=True)
as dag:
+ pass
- session = airflow.settings.Session()
pool_1 = airflow.models.Pool(pool="test_pool_1", slots=1,
include_deferred=False)
pool_10 = airflow.models.Pool(pool="test_pool_10", slots=10,
include_deferred=False)
- session.add(pool_1)
- session.add(pool_10)
- session.commit()
+ dag_maker.session.add(pool_1)
+ dag_maker.session.add(pool_10)
+ dag_maker.session.commit()
- EmptyOperator(task_id="dummy", dag=subdag, pool="test_pool_1")
+ with dag_maker("parent.child", default_args=default_args,
serialized=True) as subdag:
+ EmptyOperator(task_id="dummy", pool="test_pool_1")
with pytest.raises(AirflowException):
SubDagOperator(task_id="child", dag=dag, subdag=subdag,
pool="test_pool_1")
@@ -112,9 +112,9 @@ class TestSubDagOperator:
with pytest.warns(RemovedInAirflow3Warning, match=WARNING_MESSAGE):
SubDagOperator(task_id="child", dag=dag, subdag=subdag,
pool="test_pool_10")
- session.delete(pool_1)
- session.delete(pool_10)
- session.commit()
+ dag_maker.session.delete(pool_1)
+ dag_maker.session.delete(pool_10)
+ dag_maker.session.commit()
def test_subdag_pools_no_possible_conflict(self):
"""
@@ -269,6 +269,7 @@ class TestSubDagOperator:
subdag.create_dagrun.assert_not_called()
assert 3 == subdag_task._get_dagrun.call_count
+ @pytest.mark.skip_if_database_isolation_mode # this uses functions which
operate directly on DB
def test_rerun_failed_subdag(self, dag_maker):
"""
When there is an existing DagRun with failed state, reset the DagRun
and the
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index d4b2c938d9..9030942c65 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -23,7 +23,6 @@ import pytest
import time_machine
from airflow.exceptions import AirflowException
-from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.xcom import XCom
@@ -67,22 +66,6 @@ class TestBranchDayOfWeekOperator:
session.query(TI).delete()
session.query(XCom).delete()
- def setup_method(self):
- self.dag = DAG(
- "branch_day_of_week_operator_test",
- start_date=DEFAULT_DATE,
- schedule=INTERVAL,
- )
- self.branch_1 = EmptyOperator(task_id="branch_1", dag=self.dag)
- self.branch_2 = EmptyOperator(task_id="branch_2", dag=self.dag)
- self.branch_3 = None
-
- def teardown_method(self):
- with create_session() as session:
- session.query(DagRun).delete()
- session.query(TI).delete()
- session.query(XCom).delete()
-
def _assert_task_ids_match_states(self, dr, task_ids_to_states):
"""Helper that asserts task instances with a given id are in a given
state"""
tis = dr.get_task_instances()
@@ -99,23 +82,25 @@ class TestBranchDayOfWeekOperator:
"weekday", TEST_CASE_BRANCH_FOLLOW_TRUE.values(),
ids=TEST_CASE_BRANCH_FOLLOW_TRUE.keys()
)
@time_machine.travel("2021-01-25") # Monday
- def test_branch_follow_true(self, weekday):
+ def test_branch_follow_true(self, weekday, dag_maker):
"""Checks if BranchDayOfWeekOperator follows true branch"""
- branch_op = BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true=["branch_1", "branch_2"],
- follow_task_ids_if_false="branch_3",
- week_day=weekday,
- dag=self.dag,
- )
-
- self.branch_1.set_upstream(branch_op)
- self.branch_2.set_upstream(branch_op)
- self.branch_3 = EmptyOperator(task_id="branch_3", dag=self.dag)
- self.branch_3.set_upstream(branch_op)
- self.dag.clear()
-
- dr = self.dag.create_dagrun(
+ with dag_maker(
+ "branch_day_of_week_operator_test", start_date=DEFAULT_DATE,
schedule=INTERVAL, serialized=True
+ ):
+ branch_op = BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true=["branch_1", "branch_2"],
+ follow_task_ids_if_false="branch_3",
+ week_day=weekday,
+ )
+ branch_1 = EmptyOperator(task_id="branch_1")
+ branch_2 = EmptyOperator(task_id="branch_2")
+ branch_3 = EmptyOperator(task_id="branch_3")
+ branch_1.set_upstream(branch_op)
+ branch_2.set_upstream(branch_op)
+ branch_3.set_upstream(branch_op)
+
+ dr = dag_maker.create_dagrun(
run_id="manual__",
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -136,23 +121,24 @@ class TestBranchDayOfWeekOperator:
)
@time_machine.travel("2021-01-25") # Monday
- def test_branch_follow_true_with_execution_date(self):
+ def test_branch_follow_true_with_execution_date(self, dag_maker):
"""Checks if BranchDayOfWeekOperator follows true branch when set
use_task_logical_date"""
+ with dag_maker(
+ "branch_day_of_week_operator_test", start_date=DEFAULT_DATE,
schedule=INTERVAL, serialized=True
+ ):
+ branch_op = BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day="Wednesday",
+ use_task_logical_date=True, # We compare to DEFAULT_DATE
which is Wednesday
+ )
+ branch_1 = EmptyOperator(task_id="branch_1")
+ branch_2 = EmptyOperator(task_id="branch_2")
+ branch_1.set_upstream(branch_op)
+ branch_2.set_upstream(branch_op)
- branch_op = BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day="Wednesday",
- use_task_logical_date=True, # We compare to DEFAULT_DATE which is
Wednesday
- dag=self.dag,
- )
-
- self.branch_1.set_upstream(branch_op)
- self.branch_2.set_upstream(branch_op)
- self.dag.clear()
-
- dr = self.dag.create_dagrun(
+ dr = dag_maker.create_dagrun(
run_id="manual__",
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -172,22 +158,23 @@ class TestBranchDayOfWeekOperator:
)
@time_machine.travel("2021-01-25") # Monday
- def test_branch_follow_false(self):
+ def test_branch_follow_false(self, dag_maker):
"""Checks if BranchDayOfWeekOperator follow false branch"""
+ with dag_maker(
+ "branch_day_of_week_operator_test", start_date=DEFAULT_DATE,
schedule=INTERVAL, serialized=True
+ ):
+ branch_op = BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day="Sunday",
+ )
+ branch_1 = EmptyOperator(task_id="branch_1")
+ branch_2 = EmptyOperator(task_id="branch_2")
+ branch_1.set_upstream(branch_op)
+ branch_2.set_upstream(branch_op)
- branch_op = BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day="Sunday",
- dag=self.dag,
- )
-
- self.branch_1.set_upstream(branch_op)
- self.branch_2.set_upstream(branch_op)
- self.dag.clear()
-
- dr = self.dag.create_dagrun(
+ dr = dag_maker.create_dagrun(
run_id="manual__",
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -206,17 +193,22 @@ class TestBranchDayOfWeekOperator:
},
)
- def test_branch_with_no_weekday(self):
+ def test_branch_with_no_weekday(self, dag_maker):
"""Check if BranchDayOfWeekOperator raises exception on missing
weekday"""
with pytest.raises(AirflowException):
- BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- dag=self.dag,
- )
-
- def test_branch_with_invalid_type(self):
+ with dag_maker(
+ "branch_day_of_week_operator_test",
+ start_date=DEFAULT_DATE,
+ schedule=INTERVAL,
+ serialized=True,
+ ):
+ BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ )
+
+ def test_branch_with_invalid_type(self, dag_maker):
"""Check if BranchDayOfWeekOperator raises exception on unsupported
weekday type"""
invalid_week_day = 5
with pytest.raises(
@@ -225,13 +217,18 @@ class TestBranchDayOfWeekOperator:
"Input should be iterable type:"
"str, set, list, dict or Weekday enum type",
):
- BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day=invalid_week_day,
- dag=self.dag,
- )
+ with dag_maker(
+ "branch_day_of_week_operator_test",
+ start_date=DEFAULT_DATE,
+ schedule=INTERVAL,
+ serialized=True,
+ ):
+ BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day=invalid_week_day,
+ )
@pytest.mark.parametrize(
"_,week_day,fail_msg",
@@ -241,33 +238,40 @@ class TestBranchDayOfWeekOperator:
("set", {WeekDay.MONDAY, "Thsday"}, "Thsday"),
],
)
- def test_weekday_branch_invalid_weekday_value(self, _, week_day, fail_msg):
+ def test_weekday_branch_invalid_weekday_value(self, _, week_day, fail_msg,
dag_maker):
"""Check if BranchDayOfWeekOperator raises exception on wrong value of
weekday"""
with pytest.raises(AttributeError, match=f'Invalid Week Day passed:
"{fail_msg}"'):
- BranchDayOfWeekOperator(
+ with dag_maker(
+ "branch_day_of_week_operator_test",
+ start_date=DEFAULT_DATE,
+ schedule=INTERVAL,
+ serialized=True,
+ ):
+ BranchDayOfWeekOperator(
+ task_id="make_choice",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day=week_day,
+ )
+
+ @time_machine.travel("2021-01-25") # Monday
+ def test_branch_xcom_push_true_branch(self, dag_maker):
+ """Check if BranchDayOfWeekOperator push to xcom value of
follow_task_ids_if_true"""
+ with dag_maker(
+ "branch_day_of_week_operator_test", start_date=DEFAULT_DATE,
schedule=INTERVAL, serialized=True
+ ):
+ branch_op = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
- week_day=week_day,
- dag=self.dag,
+ week_day="Monday",
)
+ branch_1 = EmptyOperator(task_id="branch_1")
+ branch_2 = EmptyOperator(task_id="branch_2")
+ branch_1.set_upstream(branch_op)
+ branch_2.set_upstream(branch_op)
- @time_machine.travel("2021-01-25") # Monday
- def test_branch_xcom_push_true_branch(self):
- """Check if BranchDayOfWeekOperator push to xcom value of
follow_task_ids_if_true"""
- branch_op = BranchDayOfWeekOperator(
- task_id="make_choice",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day="Monday",
- dag=self.dag,
- )
-
- self.branch_1.set_upstream(branch_op)
- self.branch_2.set_upstream(branch_op)
- self.dag.clear()
-
- dr = self.dag.create_dagrun(
+ dr = dag_maker.create_dagrun(
run_id="manual__",
start_date=timezone.utcnow(),
execution_date=DEFAULT_DATE,
@@ -282,17 +286,22 @@ class TestBranchDayOfWeekOperator:
if ti.task_id == "make_choice":
assert ti.xcom_pull(task_ids="make_choice") == "branch_1"
- def test_deprecation_warning(self):
+ def test_deprecation_warning(self, dag_maker):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use
``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
- BranchDayOfWeekOperator(
- task_id="week_day_warn",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- week_day="Monday",
- use_task_execution_day=True,
- dag=self.dag,
- )
+ with dag_maker(
+ "branch_day_of_week_operator_test",
+ start_date=DEFAULT_DATE,
+ schedule=INTERVAL,
+ serialized=True,
+ ):
+ BranchDayOfWeekOperator(
+ task_id="week_day_warn",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day="Monday",
+ use_task_execution_day=True,
+ )
assert warning_message == str(warnings[0].message)