This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6229c1c98a Fix DateTimeOperator tests in Database Isolation Mode
(#41271)
6229c1c98a is described below
commit 6229c1c98a2a78d00255d6b6c5b70d032cff9b80
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Aug 6 09:17:09 2024 +0200
Fix DateTimeOperator tests in Database Isolation Mode (#41271)
* Fix DateTimeOperator tests in Database Isolation Mode
* Review Feedback
---
tests/operators/test_datetime.py | 41 ++++++++++++++++++++--------------------
1 file changed, 20 insertions(+), 21 deletions(-)
diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index 485897acc6..e4294a8856 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -23,7 +23,6 @@ import pytest
import time_machine
from airflow.exceptions import AirflowException
-from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance as TI
from airflow.operators.datetime import BranchDateTimeOperator
@@ -52,30 +51,30 @@ class TestBranchDateTimeOperator:
(datetime.time(10, 0, 0), datetime.datetime(2020, 7, 7, 11, 0, 0)),
]
- def setup_method(self):
- self.dag = DAG(
+ @pytest.fixture(autouse=True)
+ def base_tests_setup(self, dag_maker):
+ with dag_maker(
"branch_datetime_operator_test",
default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
schedule=INTERVAL,
- )
-
- self.branch_1 = EmptyOperator(task_id="branch_1", dag=self.dag)
- self.branch_2 = EmptyOperator(task_id="branch_2", dag=self.dag)
+ serialized=True,
+ ) as dag:
+ self.dag = dag
+ self.branch_1 = EmptyOperator(task_id="branch_1")
+ self.branch_2 = EmptyOperator(task_id="branch_2")
- self.branch_op = BranchDateTimeOperator(
- task_id="datetime_branch",
- follow_task_ids_if_true="branch_1",
- follow_task_ids_if_false="branch_2",
- target_upper=datetime.datetime(2020, 7, 7, 11, 0, 0),
- target_lower=datetime.datetime(2020, 7, 7, 10, 0, 0),
- dag=self.dag,
- )
+ self.branch_op = BranchDateTimeOperator(
+ task_id="datetime_branch",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ target_upper=datetime.datetime(2020, 7, 7, 11, 0, 0),
+ target_lower=datetime.datetime(2020, 7, 7, 10, 0, 0),
+ )
- self.branch_1.set_upstream(self.branch_op)
- self.branch_2.set_upstream(self.branch_op)
- self.dag.clear()
+ self.branch_1.set_upstream(self.branch_op)
+ self.branch_2.set_upstream(self.branch_op)
- self.dr = self.dag.create_dagrun(
+ self.dr = dag_maker.create_dagrun(
run_id="manual__",
start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE,
@@ -231,11 +230,11 @@ class TestBranchDateTimeOperator:
targets,
)
@time_machine.travel("2020-12-01 09:00:00")
- def test_branch_datetime_operator_use_task_logical_date(self,
target_lower, target_upper):
+ def test_branch_datetime_operator_use_task_logical_date(self, dag_maker,
target_lower, target_upper):
"""Check if BranchDateTimeOperator uses task execution date"""
in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
self.branch_op.use_task_logical_date = True
- self.dr = self.dag.create_dagrun(
+ self.dr = dag_maker.create_dagrun(
run_id="manual_exec_date__",
start_date=in_between_date,
execution_date=in_between_date,