This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4abb134e48 Resolve deprecations in core sensors tests (#39404)
4abb134e48 is described below
commit 4abb134e482cc248d6f0cfd0b5faaa5144ce1b53
Author: Andrey Anshin <[email protected]>
AuthorDate: Sat May 4 18:53:25 2024 +0400
Resolve deprecations in core sensors tests (#39404)
---
tests/deprecations_ignore.yml | 18 -------
tests/sensors/test_external_task_sensor.py | 60 +++++++++++++--------
tests/sensors/test_timeout_sensor.py | 85 ------------------------------
3 files changed, 37 insertions(+), 126 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 2887cd13a3..131ea00c67 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -299,24 +299,6 @@
-
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag
-
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname
-
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_log
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_soft_fail_as_skipped
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_not_exists_without_check_existence
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_failed_states
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_success
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_when_there_is_no_TIs
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_failed_states
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_sensor_success
--
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_sensor_with_task_group
--
tests/sensors/test_external_task_sensor.py::test_clear_multiple_external_task_marker
--
tests/sensors/test_external_task_sensor.py::test_external_task_marker_clear_activate
--
tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_deep
--
tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_shallow
--
tests/sensors/test_external_task_sensor.py::test_external_task_marker_exception
-- tests/sensors/test_external_task_sensor.py::test_external_task_marker_future
--
tests/sensors/test_external_task_sensor.py::test_external_task_marker_transitive
-- tests/sensors/test_timeout_sensor.py::TestSensorTimeout::test_timeout
-
tests/triggers/test_external_task.py::TestTaskStateTrigger::test_task_state_trigger_success
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index 557e4cf00d..28db24305f 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -109,12 +109,13 @@ class TestExternalTaskSensor:
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+ self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE)
def add_time_sensor(self, task_id=TEST_TASK_ID):
op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
- def add_dummy_task_group(self, target_states=None):
+ def add_fake_task_group(self, target_states=None):
target_states = [State.SUCCESS] * 2 if target_states is None else
target_states
with self.dag as dag:
with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
@@ -122,36 +123,36 @@ class TestExternalTaskSensor:
SerializedDagModel.write_dag(dag)
for idx, task in enumerate(task_group):
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti = TaskInstance(task=task, run_id=self.dag_run_id)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_states[idx])
- def add_dummy_task_group_with_dynamic_tasks(self,
target_state=State.SUCCESS):
+ def add_fake_task_group_with_dynamic_tasks(self,
target_state=State.SUCCESS):
map_indexes = range(5)
with self.dag as dag:
with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
@task_deco
- def dummy_task():
+ def fake_task():
pass
@task_deco
- def dummy_mapped_task(x: int):
+ def fake_mapped_task(x: int):
return x
- dummy_task()
- dummy_mapped_task.expand(x=list(map_indexes))
+ fake_task()
+ fake_mapped_task.expand(x=list(map_indexes))
SerializedDagModel.write_dag(dag)
for task in task_group:
- if task.task_id == "dummy_mapped_task":
+ if task.task_id == "fake_mapped_task":
for map_index in map_indexes:
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE,
map_index=map_index)
+ ti = TaskInstance(task=task, run_id=self.dag_run_id,
map_index=map_index)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_state)
else:
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti = TaskInstance(task=task, run_id=self.dag_run_id)
ti.run(ignore_ti_state=True, mark_success=True)
ti.set_state(target_state)
@@ -178,7 +179,7 @@ class TestExternalTaskSensor:
def test_external_task_sensor_with_task_group(self):
self.add_time_sensor()
- self.add_dummy_task_group()
+ self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_task_group",
external_dag_id=TEST_DAG_ID,
@@ -236,7 +237,7 @@ class TestExternalTaskSensor:
# this behaviour is similar to external_task_id doesn't exists
def test_external_task_group_not_exists_without_check_existence(self):
self.add_time_sensor()
- self.add_dummy_task_group()
+ self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -250,7 +251,7 @@ class TestExternalTaskSensor:
def test_external_task_group_sensor_success(self):
self.add_time_sensor()
- self.add_dummy_task_group()
+ self.add_fake_task_group()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -263,7 +264,7 @@ class TestExternalTaskSensor:
def test_external_task_group_sensor_failed_states(self):
ti_states = [State.FAILED, State.FAILED]
self.add_time_sensor()
- self.add_dummy_task_group(ti_states)
+ self.add_fake_task_group(ti_states)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -435,7 +436,11 @@ class TestExternalTaskSensor:
def test_external_dag_sensor(self):
other_dag = DAG("other_dag", default_args=self.args,
end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
- run_id="test", start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE, state=State.SUCCESS
+ run_id="test",
+ start_date=DEFAULT_DATE,
+ execution_date=DEFAULT_DATE,
+ state=State.SUCCESS,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
@@ -448,7 +453,11 @@ class TestExternalTaskSensor:
def test_external_dag_sensor_log(self, caplog):
other_dag = DAG("other_dag", default_args=self.args,
end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
- run_id="test", start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE, state=State.SUCCESS
+ run_id="test",
+ start_date=DEFAULT_DATE,
+ execution_date=DEFAULT_DATE,
+ state=State.SUCCESS,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
@@ -461,7 +470,11 @@ class TestExternalTaskSensor:
def test_external_dag_sensor_soft_fail_as_skipped(self):
other_dag = DAG("other_dag", default_args=self.args,
end_date=DEFAULT_DATE, schedule="@once")
other_dag.create_dagrun(
- run_id="test", start_date=DEFAULT_DATE,
execution_date=DEFAULT_DATE, state=State.SUCCESS
+ run_id="test",
+ start_date=DEFAULT_DATE,
+ execution_date=DEFAULT_DATE,
+ state=State.SUCCESS,
+ data_interval=(DEFAULT_DATE, DEFAULT_DATE),
)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
@@ -795,7 +808,7 @@ exit 0
def test_external_task_group_with_mapped_tasks_sensor_success(self):
self.add_time_sensor()
- self.add_dummy_task_group_with_dynamic_tasks()
+ self.add_fake_task_group_with_dynamic_tasks()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -807,7 +820,7 @@ exit 0
def test_external_task_group_with_mapped_tasks_failed_states(self):
self.add_time_sensor()
- self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+ self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -824,7 +837,7 @@ exit 0
def test_external_task_group_when_there_is_no_TIs(self):
"""Test that the sensor does not fail when there are no TIs to
check."""
self.add_time_sensor()
- self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+ self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
@@ -1232,6 +1245,7 @@ def run_tasks(dag_bag, execution_date=DEFAULT_DATE,
session=None):
execution_date=execution_date,
start_date=execution_date,
run_type=DagRunType.MANUAL,
+ data_interval=(execution_date, execution_date),
session=session,
)
# we use sorting by task_id here because for the test DAG structure of
ours
@@ -1615,7 +1629,7 @@ def dag_bag_head_tail_mapped_tasks():
with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag:
@task_deco
- def dummy_task(x: int):
+ def fake_task(x: int):
return x
head = ExternalTaskSensor(
@@ -1626,7 +1640,7 @@ def dag_bag_head_tail_mapped_tasks():
mode="reschedule",
)
- body = dummy_task.expand(x=range(5))
+ body = fake_task.expand(x=range(5))
tail = ExternalTaskMarker(
task_id="tail",
external_dag_id=dag.dag_id,
@@ -1656,7 +1670,7 @@ def
test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m
)
session.add(dagrun)
for task in dag.tasks:
- if task.task_id == "dummy_task":
+ if task.task_id == "fake_task":
for map_index in range(5):
ti = TaskInstance(task=task, run_id=dagrun.run_id,
map_index=map_index)
ti.state = TaskInstanceState.SUCCESS
diff --git a/tests/sensors/test_timeout_sensor.py
b/tests/sensors/test_timeout_sensor.py
deleted file mode 100644
index 315658a607..0000000000
--- a/tests/sensors/test_timeout_sensor.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import time
-from datetime import timedelta
-from typing import TYPE_CHECKING
-
-import pytest
-
-from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException
-from airflow.models.dag import DAG
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils import timezone
-from airflow.utils.timezone import datetime
-
-pytestmark = pytest.mark.db_test
-
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-DEFAULT_DATE = datetime(2015, 1, 1)
-TEST_DAG_ID = "unit_test_dag"
-
-
-class TimeoutTestSensor(BaseSensorOperator):
- """
- Sensor that always returns the return_value provided
-
- :param return_value: Set to true to mark the task as SKIPPED on failure
- """
-
- def __init__(self, return_value=False, **kwargs):
- self.return_value = return_value
- super().__init__(**kwargs)
-
- def poke(self, context: Context):
- return self.return_value
-
- def execute(self, context: Context):
- started_at = timezone.utcnow()
- time_jump = self.params["time_jump"]
- while not self.poke(context):
- if time_jump:
- started_at -= time_jump
- if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
- if self.soft_fail:
- raise AirflowSkipException("timeout")
- else:
- raise AirflowSensorTimeout("timeout")
- time.sleep(self.poke_interval)
- self.log.info("Success criteria met. Exiting.")
-
-
-class TestSensorTimeout:
- def setup_method(self):
- args = {"owner": "airflow", "start_date": DEFAULT_DATE}
- self.dag = DAG(TEST_DAG_ID, default_args=args)
-
- def test_timeout(self):
- op = TimeoutTestSensor(
- task_id="test_timeout",
- execution_timeout=timedelta(days=2),
- return_value=False,
- poke_interval=5,
- params={"time_jump": timedelta(days=2, seconds=1)},
- dag=self.dag,
- )
- with pytest.raises(AirflowSensorTimeout):
- op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)