This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4abb134e48 Resolve deprecations in core sensors tests (#39404)
4abb134e48 is described below

commit 4abb134e482cc248d6f0cfd0b5faaa5144ce1b53
Author: Andrey Anshin <[email protected]>
AuthorDate: Sat May 4 18:53:25 2024 +0400

    Resolve deprecations in core sensors tests (#39404)
---
 tests/deprecations_ignore.yml              | 18 -------
 tests/sensors/test_external_task_sensor.py | 60 +++++++++++++--------
 tests/sensors/test_timeout_sensor.py       | 85 ------------------------------
 3 files changed, 37 insertions(+), 126 deletions(-)

diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 2887cd13a3..131ea00c67 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -299,24 +299,6 @@
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_execute_on_failure_callbacks_without_dag
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_failure_callbacks_should_not_drop_hostname
 - 
tests/dag_processing/test_processor.py::TestDagFileProcessor::test_process_file_should_failure_callback
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_log
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_dag_sensor_soft_fail_as_skipped
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_not_exists_without_check_existence
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_failed_states
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_sensor_success
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_when_there_is_no_TIs
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_failed_states
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_group_with_mapped_tasks_sensor_success
-- 
tests/sensors/test_external_task_sensor.py::TestExternalTaskSensor::test_external_task_sensor_with_task_group
-- 
tests/sensors/test_external_task_sensor.py::test_clear_multiple_external_task_marker
-- 
tests/sensors/test_external_task_sensor.py::test_external_task_marker_clear_activate
-- 
tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_deep
-- 
tests/sensors/test_external_task_sensor.py::test_external_task_marker_cyclic_shallow
-- 
tests/sensors/test_external_task_sensor.py::test_external_task_marker_exception
-- tests/sensors/test_external_task_sensor.py::test_external_task_marker_future
-- 
tests/sensors/test_external_task_sensor.py::test_external_task_marker_transitive
-- tests/sensors/test_timeout_sensor.py::TestSensorTimeout::test_timeout
 - 
tests/triggers/test_external_task.py::TestTaskStateTrigger::test_task_state_trigger_success
 
 
diff --git a/tests/sensors/test_external_task_sensor.py 
b/tests/sensors/test_external_task_sensor.py
index 557e4cf00d..28db24305f 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -109,12 +109,13 @@ class TestExternalTaskSensor:
         self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
         self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
         self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+        self.dag_run_id = DagRunType.MANUAL.generate_run_id(DEFAULT_DATE)
 
     def add_time_sensor(self, task_id=TEST_TASK_ID):
         op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
 
-    def add_dummy_task_group(self, target_states=None):
+    def add_fake_task_group(self, target_states=None):
         target_states = [State.SUCCESS] * 2 if target_states is None else 
target_states
         with self.dag as dag:
             with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
@@ -122,36 +123,36 @@ class TestExternalTaskSensor:
             SerializedDagModel.write_dag(dag)
 
         for idx, task in enumerate(task_group):
-            ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+            ti = TaskInstance(task=task, run_id=self.dag_run_id)
             ti.run(ignore_ti_state=True, mark_success=True)
             ti.set_state(target_states[idx])
 
-    def add_dummy_task_group_with_dynamic_tasks(self, 
target_state=State.SUCCESS):
+    def add_fake_task_group_with_dynamic_tasks(self, 
target_state=State.SUCCESS):
         map_indexes = range(5)
         with self.dag as dag:
             with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
 
                 @task_deco
-                def dummy_task():
+                def fake_task():
                     pass
 
                 @task_deco
-                def dummy_mapped_task(x: int):
+                def fake_mapped_task(x: int):
                     return x
 
-                dummy_task()
-                dummy_mapped_task.expand(x=list(map_indexes))
+                fake_task()
+                fake_mapped_task.expand(x=list(map_indexes))
 
         SerializedDagModel.write_dag(dag)
 
         for task in task_group:
-            if task.task_id == "dummy_mapped_task":
+            if task.task_id == "fake_mapped_task":
                 for map_index in map_indexes:
-                    ti = TaskInstance(task=task, execution_date=DEFAULT_DATE, 
map_index=map_index)
+                    ti = TaskInstance(task=task, run_id=self.dag_run_id, 
map_index=map_index)
                     ti.run(ignore_ti_state=True, mark_success=True)
                     ti.set_state(target_state)
             else:
-                ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+                ti = TaskInstance(task=task, run_id=self.dag_run_id)
                 ti.run(ignore_ti_state=True, mark_success=True)
                 ti.set_state(target_state)
 
@@ -178,7 +179,7 @@ class TestExternalTaskSensor:
 
     def test_external_task_sensor_with_task_group(self):
         self.add_time_sensor()
-        self.add_dummy_task_group()
+        self.add_fake_task_group()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_task_group",
             external_dag_id=TEST_DAG_ID,
@@ -236,7 +237,7 @@ class TestExternalTaskSensor:
     # this behaviour is similar to external_task_id doesn't exists
     def test_external_task_group_not_exists_without_check_existence(self):
         self.add_time_sensor()
-        self.add_dummy_task_group()
+        self.add_fake_task_group()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -250,7 +251,7 @@ class TestExternalTaskSensor:
 
     def test_external_task_group_sensor_success(self):
         self.add_time_sensor()
-        self.add_dummy_task_group()
+        self.add_fake_task_group()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -263,7 +264,7 @@ class TestExternalTaskSensor:
     def test_external_task_group_sensor_failed_states(self):
         ti_states = [State.FAILED, State.FAILED]
         self.add_time_sensor()
-        self.add_dummy_task_group(ti_states)
+        self.add_fake_task_group(ti_states)
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -435,7 +436,11 @@ class TestExternalTaskSensor:
     def test_external_dag_sensor(self):
         other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
         other_dag.create_dagrun(
-            run_id="test", start_date=DEFAULT_DATE, 
execution_date=DEFAULT_DATE, state=State.SUCCESS
+            run_id="test",
+            start_date=DEFAULT_DATE,
+            execution_date=DEFAULT_DATE,
+            state=State.SUCCESS,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
         )
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
@@ -448,7 +453,11 @@ class TestExternalTaskSensor:
     def test_external_dag_sensor_log(self, caplog):
         other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
         other_dag.create_dagrun(
-            run_id="test", start_date=DEFAULT_DATE, 
execution_date=DEFAULT_DATE, state=State.SUCCESS
+            run_id="test",
+            start_date=DEFAULT_DATE,
+            execution_date=DEFAULT_DATE,
+            state=State.SUCCESS,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
         )
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
@@ -461,7 +470,11 @@ class TestExternalTaskSensor:
     def test_external_dag_sensor_soft_fail_as_skipped(self):
         other_dag = DAG("other_dag", default_args=self.args, 
end_date=DEFAULT_DATE, schedule="@once")
         other_dag.create_dagrun(
-            run_id="test", start_date=DEFAULT_DATE, 
execution_date=DEFAULT_DATE, state=State.SUCCESS
+            run_id="test",
+            start_date=DEFAULT_DATE,
+            execution_date=DEFAULT_DATE,
+            state=State.SUCCESS,
+            data_interval=(DEFAULT_DATE, DEFAULT_DATE),
         )
         op = ExternalTaskSensor(
             task_id="test_external_dag_sensor_check",
@@ -795,7 +808,7 @@ exit 0
 
     def test_external_task_group_with_mapped_tasks_sensor_success(self):
         self.add_time_sensor()
-        self.add_dummy_task_group_with_dynamic_tasks()
+        self.add_fake_task_group_with_dynamic_tasks()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -807,7 +820,7 @@ exit 0
 
     def test_external_task_group_with_mapped_tasks_failed_states(self):
         self.add_time_sensor()
-        self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+        self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -824,7 +837,7 @@ exit 0
     def test_external_task_group_when_there_is_no_TIs(self):
         """Test that the sensor does not fail when there are no TIs to 
check."""
         self.add_time_sensor()
-        self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+        self.add_fake_task_group_with_dynamic_tasks(State.FAILED)
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
             external_dag_id=TEST_DAG_ID,
@@ -1232,6 +1245,7 @@ def run_tasks(dag_bag, execution_date=DEFAULT_DATE, 
session=None):
             execution_date=execution_date,
             start_date=execution_date,
             run_type=DagRunType.MANUAL,
+            data_interval=(execution_date, execution_date),
             session=session,
         )
         # we use sorting by task_id here because for the test DAG structure of 
ours
@@ -1615,7 +1629,7 @@ def dag_bag_head_tail_mapped_tasks():
     with DAG("head_tail", start_date=DEFAULT_DATE, schedule="@daily") as dag:
 
         @task_deco
-        def dummy_task(x: int):
+        def fake_task(x: int):
             return x
 
         head = ExternalTaskSensor(
@@ -1626,7 +1640,7 @@ def dag_bag_head_tail_mapped_tasks():
             mode="reschedule",
         )
 
-        body = dummy_task.expand(x=range(5))
+        body = fake_task.expand(x=range(5))
         tail = ExternalTaskMarker(
             task_id="tail",
             external_dag_id=dag.dag_id,
@@ -1656,7 +1670,7 @@ def 
test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m
         )
         session.add(dagrun)
         for task in dag.tasks:
-            if task.task_id == "dummy_task":
+            if task.task_id == "fake_task":
                 for map_index in range(5):
                     ti = TaskInstance(task=task, run_id=dagrun.run_id, 
map_index=map_index)
                     ti.state = TaskInstanceState.SUCCESS
diff --git a/tests/sensors/test_timeout_sensor.py 
b/tests/sensors/test_timeout_sensor.py
deleted file mode 100644
index 315658a607..0000000000
--- a/tests/sensors/test_timeout_sensor.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import time
-from datetime import timedelta
-from typing import TYPE_CHECKING
-
-import pytest
-
-from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException
-from airflow.models.dag import DAG
-from airflow.sensors.base import BaseSensorOperator
-from airflow.utils import timezone
-from airflow.utils.timezone import datetime
-
-pytestmark = pytest.mark.db_test
-
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-DEFAULT_DATE = datetime(2015, 1, 1)
-TEST_DAG_ID = "unit_test_dag"
-
-
-class TimeoutTestSensor(BaseSensorOperator):
-    """
-    Sensor that always returns the return_value provided
-
-    :param return_value: Set to true to mark the task as SKIPPED on failure
-    """
-
-    def __init__(self, return_value=False, **kwargs):
-        self.return_value = return_value
-        super().__init__(**kwargs)
-
-    def poke(self, context: Context):
-        return self.return_value
-
-    def execute(self, context: Context):
-        started_at = timezone.utcnow()
-        time_jump = self.params["time_jump"]
-        while not self.poke(context):
-            if time_jump:
-                started_at -= time_jump
-            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
-                if self.soft_fail:
-                    raise AirflowSkipException("timeout")
-                else:
-                    raise AirflowSensorTimeout("timeout")
-            time.sleep(self.poke_interval)
-        self.log.info("Success criteria met. Exiting.")
-
-
-class TestSensorTimeout:
-    def setup_method(self):
-        args = {"owner": "airflow", "start_date": DEFAULT_DATE}
-        self.dag = DAG(TEST_DAG_ID, default_args=args)
-
-    def test_timeout(self):
-        op = TimeoutTestSensor(
-            task_id="test_timeout",
-            execution_timeout=timedelta(days=2),
-            return_value=False,
-            poke_interval=5,
-            params={"time_jump": timedelta(days=2, seconds=1)},
-            dag=self.dag,
-        )
-        with pytest.raises(AirflowSensorTimeout):
-            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)

Reply via email to