This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 68fe455b6f Resolve taskinstance deprecations in tests (#39988)
68fe455b6f is described below
commit 68fe455b6fa1fdf855e42d49d324d5bfca999a85
Author: Gopal Dirisala <[email protected]>
AuthorDate: Sat Jun 1 15:27:19 2024 +0530
Resolve taskinstance deprecations in tests (#39988)
---
tests/deprecations_ignore.yml | 8 -------
tests/models/test_taskinstance.py | 46 +++++++++++++++++++++++----------------
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 0bff1b3580..4a6766ca8f 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -95,14 +95,6 @@
- tests/models/test_dagbag.py::TestDagBag::test_skip_cycle_dags
-
tests/models/test_mappedoperator.py::test_expand_mapped_task_instance_with_named_index
- tests/models/test_skipmixin.py::TestSkipMixin::test_skip_none_dagrun
--
tests/models/test_taskinstance.py::TestTaskInstance::test_context_triggering_dataset_events
--
tests/models/test_taskinstance.py::TestTaskInstance::test_get_num_running_task_instances
--
tests/models/test_taskinstance.py::TestTaskInstance::test_get_previous_start_date_none
-- tests/models/test_taskinstance.py::TestTaskInstance::test_handle_failure
--
tests/models/test_taskinstance.py::TestTaskInstance::test_handle_failure_fail_stop
--
tests/models/test_taskinstance.py::TestTaskInstance::test_template_with_custom_timetable_deprecated_context
-- tests/models/test_taskinstance.py::TestTaskInstance::test_xcom_pull
--
tests/models/test_taskinstance.py::TestTaskInstance::test_xcom_pull_different_execution_date
- tests/models/test_xcom.py::TestXCom::test_set_serialize_call_old_signature
-
tests/ti_deps/deps/test_prev_dagrun_dep.py::TestPrevDagrunDep::test_first_task_run_of_new_task
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index fecde3bcb2..6bac956e93 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -47,6 +47,7 @@ from airflow.exceptions import (
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
+ RemovedInAirflow3Warning,
UnmappableXComLengthPushed,
UnmappableXComTypePushed,
XComForMappingNotPushed,
@@ -1580,13 +1581,7 @@ class TestTaskInstance:
ti1.xcom_push(key="foo", value="bar")
# Push another value with the same key (but by a different task)
- XCom.set(
- key="foo",
- value="baz",
- task_id=task_2.task_id,
- dag_id=dag.dag_id,
- execution_date=dagrun.execution_date,
- )
+ XCom.set(key="foo", value="baz", task_id=task_2.task_id,
dag_id=dag.dag_id, run_id=dagrun.run_id)
# Pull with no arguments
result = ti1.xcom_pull()
@@ -1706,7 +1701,7 @@ class TestTaskInstance:
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
ti.run()
exec_date += datetime.timedelta(days=1)
- dr = ti.task.dag.create_dagrun(run_id="test2",
execution_date=exec_date, state=None)
+ dr = ti.task.dag.create_dagrun(run_id="test2",
data_interval=(exec_date, exec_date), state=None)
ti = TI(task=ti.task, run_id=dr.run_id)
ti.run()
# We have set a new execution date (and did not pass in
@@ -1939,11 +1934,13 @@ class TestTaskInstance:
dag_id="test_get_num_running_task_instances", task_id="task1",
session=session
)
+ execution_date = DEFAULT_DATE + datetime.timedelta(days=1)
dr = ti1.task.dag.create_dagrun(
- execution_date=DEFAULT_DATE + datetime.timedelta(days=1),
+ execution_date=execution_date,
state=None,
run_id="2",
session=session,
+ data_interval=(execution_date, execution_date),
)
assert ti1 in session
ti2 = dr.task_instances[0]
@@ -2717,6 +2714,7 @@ class TestTaskInstance:
execution_date=day_2,
state=State.RUNNING,
run_type=DagRunType.MANUAL,
+ data_interval=(day_1, day_2),
)
ti_1 = dagrun_1.get_task_instance(task.task_id)
@@ -2742,6 +2740,7 @@ class TestTaskInstance:
session.add_all([ds1, ds2])
session.commit()
+ execution_date = timezone.utcnow()
# it's easier to fake a manual run here
dag, task1 = create_dummy_dag(
dag_id="test_triggering_dataset_events",
@@ -2754,9 +2753,10 @@ class TestTaskInstance:
dr = dag.create_dagrun(
run_id="test2",
run_type=DagRunType.DATASET_TRIGGERED,
- execution_date=timezone.utcnow(),
+ execution_date=execution_date,
state=None,
session=session,
+ data_interval=(execution_date, execution_date),
)
ds1_event = DatasetEvent(dataset_id=1)
ds2_event_1 = DatasetEvent(dataset_id=2)
@@ -2939,13 +2939,17 @@ class TestTaskInstance:
assert recorded_message[0].startswith(message_beginning)
def test_template_with_custom_timetable_deprecated_context(self,
create_task_instance):
- ti = create_task_instance(
- start_date=DEFAULT_DATE,
- timetable=AfterWorkdayTimetable(),
- run_type=DagRunType.SCHEDULED,
- execution_date=timezone.datetime(2021, 9, 6),
- data_interval=(timezone.datetime(2021, 9, 6),
timezone.datetime(2021, 9, 7)),
- )
+ with pytest.warns(
+ RemovedInAirflow3Warning,
+ match="Param `timetable` is deprecated and will be removed in a
future release. Please use `schedule` instead.",
+ ):
+ ti = create_task_instance(
+ start_date=DEFAULT_DATE,
+ timetable=AfterWorkdayTimetable(),
+ run_type=DagRunType.SCHEDULED,
+ execution_date=timezone.datetime(2021, 9, 6),
+ data_interval=(timezone.datetime(2021, 9, 6),
timezone.datetime(2021, 9, 7)),
+ )
context = ti.get_template_context()
with pytest.deprecated_call():
assert context["execution_date"] == pendulum.DateTime(2021, 9, 6,
tzinfo=TIMEZONE)
@@ -3030,12 +3034,14 @@ class TestTaskInstance:
on_retry_callback=mock_on_retry_1,
session=session,
)
+ execution_date = timezone.utcnow()
dr = dag.create_dagrun(
run_id="test2",
run_type=DagRunType.MANUAL,
- execution_date=timezone.utcnow(),
+ execution_date=execution_date,
state=None,
session=session,
+ data_interval=(execution_date, execution_date),
)
ti1 = dr.get_task_instance(task1.task_id, session=session)
ti1.task = task1
@@ -3173,12 +3179,14 @@ class TestTaskInstance:
session=session,
fail_stop=True,
)
+ execution_date = timezone.utcnow()
dr = dag.create_dagrun(
run_id="test_ff",
run_type=DagRunType.MANUAL,
- execution_date=timezone.utcnow(),
+ execution_date=execution_date,
state=None,
session=session,
+ data_interval=(execution_date, execution_date),
)
ti1 = dr.get_task_instance(task1.task_id, session=session)