This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ddf371aa9cc Move test_sensor_helper.py to standard provider tests 
(#49396)
ddf371aa9cc is described below

commit ddf371aa9ccc3fc426135496aafacd3a732c1af4
Author: GPK <[email protected]>
AuthorDate: Fri Apr 18 10:20:12 2025 +0100

    Move test_sensor_helper.py to standard provider tests (#49396)
    
    * Move test_sensor_helper.py to standard provider tests
    
    * Move test_sensor_helper.py to standard provider tests
    
    * mark it as db test
    
    * Fix tests
    
    * Fix tests
    
    * Fix tests arguments
    
    * Remove not using if check
---
 .../unit}/standard/utils/test_sensor_helper.py     | 84 +++++++++++++++++-----
 1 file changed, 66 insertions(+), 18 deletions(-)

diff --git a/providers/tests/standard/utils/test_sensor_helper.py 
b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
similarity index 86%
rename from providers/tests/standard/utils/test_sensor_helper.py
rename to providers/standard/tests/unit/standard/utils/test_sensor_helper.py
index 8d5026dcad6..346e956a981 100644
--- a/providers/tests/standard/utils/test_sensor_helper.py
+++ b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
@@ -27,7 +27,7 @@ import pytest
 
 from airflow.models import DAG, TaskInstance
 from airflow.models.dagbag import DagBag
-from airflow.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.providers.standard.utils.sensor_helper import (
     _count_stmt,
     _get_count,
@@ -41,15 +41,14 @@ from airflow.utils.types import DagRunType
 from tests_common.test_utils import db
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
-if AIRFLOW_V_3_0_PLUS:
-    from airflow.utils.types import DagRunTriggeredByType
-
 if TYPE_CHECKING:
     from sqlalchemy.orm.session import Session
 
 TI = TaskInstance
 
 
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for AF2")
[email protected]_test
 class TestSensorHelper:
     DAG_ID = "test_dag"
     TASK_ID = "test_task"
@@ -80,27 +79,26 @@ class TestSensorHelper:
         dag: DAG,
         *,
         task_states: Mapping[str, TaskInstanceState] | None = None,
-        logical_date: datetime.datetime | None = None,
+        execution_date: datetime.datetime | None = None,
         session: Session,
     ):
         now = timezone.utcnow()
-        logical_date = pendulum.instance(logical_date or now)
-        triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if 
AIRFLOW_V_3_0_PLUS else {}
+        execution_date = pendulum.instance(execution_date or now)
         run_type = DagRunType.MANUAL
-        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+        data_interval = 
dag.timetable.infer_manual_data_interval(run_after=execution_date)
         dag_run = dag.create_dagrun(
             run_id=dag.timetable.generate_run_id(
                 run_type=run_type,
-                run_after=logical_date,
-                data_interval=data_interval,
+                run_after=execution_date,
+                data_interval=None,
+                logical_date=execution_date,
             ),
             run_type=run_type,
-            logical_date=logical_date,
             data_interval=data_interval,
             start_date=now,
             state=DagRunState.SUCCESS,  # not important
             external_trigger=False,
-            **triggered_by_kwargs,  # type: ignore
+            execution_date=execution_date,  # type: ignore[call-arg]
         )
 
         if task_states is not None:
@@ -167,7 +165,7 @@ class TestSensorHelper:
 
         for dttm in self.DTTM_FILTER:
             self.create_dag_run(
-                dag=dag, task_states=dttm_to_task_state[dttm], 
logical_date=dttm, session=session
+                dag=dag, task_states=dttm_to_task_state[dttm], 
execution_date=dttm, session=session
             )
 
         dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -194,7 +192,7 @@ class TestSensorHelper:
                     EmptyOperator(task_id=subtask_id)
 
         for dttm in self.DTTM_FILTER:
-            self.create_dag_run(dag=dag, logical_date=dttm, session=session)
+            self.create_dag_run(dag=dag, execution_date=dttm, session=session)
 
         with mock.patch.object(DagBag, "get_dag") as mock_get_dag:
             mock_get_dag.return_value = dag
@@ -224,7 +222,7 @@ class TestSensorHelper:
             EmptyOperator(task_id=self.TASK_ID)
 
         now = timezone.utcnow()
-        self.create_dag_run(dag, task_states={self.TASK_ID: state}, 
logical_date=now, session=session)
+        self.create_dag_run(dag, task_states={self.TASK_ID: state}, 
execution_date=now, session=session)
 
         count = _get_count(
             dttm_filter=[now],
@@ -258,7 +256,7 @@ class TestSensorHelper:
 
         for dttm in self.DTTM_FILTER:
             self.create_dag_run(
-                dag=dag, task_states={self.TASK_ID: task_states[dttm]}, 
logical_date=dttm, session=session
+                dag=dag, task_states={self.TASK_ID: task_states[dttm]}, 
execution_date=dttm, session=session
             )
 
         dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -314,7 +312,7 @@ class TestSensorHelper:
 
         for dttm in self.DTTM_FILTER:
             self.create_dag_run(
-                dag=dag, task_states=dttm_to_task_state[dttm], 
logical_date=dttm, session=session
+                dag=dag, task_states=dttm_to_task_state[dttm], 
execution_date=dttm, session=session
             )
 
         dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -378,7 +376,7 @@ class TestSensorHelper:
 
         for dttm in self.DTTM_FILTER:
             self.create_dag_run(
-                dag=dag, task_states=dttm_to_subtask_state[dttm], 
logical_date=dttm, session=session
+                dag=dag, task_states=dttm_to_subtask_state[dttm], 
execution_date=dttm, session=session
             )
 
         with mock.patch.object(DagBag, "get_dag") as mock_get_dag:
@@ -403,3 +401,53 @@ class TestSensorHelper:
 
         allowed_state_count = len(allowed_task_instance_states) / 
len(self.TASK_ID_LIST)
         assert count == allowed_state_count
+
+
[email protected](
+    "run_id_task_state_map, states, expected_count",
+    [
+        pytest.param(
+            {
+                "run_id_1": {"task_id_1": "success", "task_id_2": "success"},
+            },
+            ["success"],
+            1,
+            id="single_runid_with_success",
+        ),
+        pytest.param(
+            {
+                "run_id_1": {"task_id_1": "success", "task_id_2": "success"},
+                "run_id_2": {"task_id_1": "success", "task_id_2": "success"},
+            },
+            ["success"],
+            2,
+            id="multiple_runid_with_success",
+        ),
+        pytest.param(
+            {
+                "run_id_1": {"task_id_1": "failed", "task_id_2": "failed"},
+            },
+            ["failed"],
+            1,
+            id="single_runid_with_failed",
+        ),
+        pytest.param(
+            {
+                "run_id_1": {"task_id_1": "success", "task_id_2": "failed"},
+            },
+            ["failed"],
+            0,
+            id="single_runid_with_no_success",
+        ),
+    ],
+)
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for AF3")
+def test_get_count_by_matched_states(
+    run_id_task_state_map,
+    states,
+    expected_count,
+):
+    from airflow.providers.standard.utils.sensor_helper import 
_get_count_by_matched_states
+
+    count = 
_get_count_by_matched_states(run_id_task_state_map=run_id_task_state_map, 
states=states)
+    assert count == expected_count

Reply via email to