This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ddf371aa9cc Move test_sensor_helper.py to standard provider tests
(#49396)
ddf371aa9cc is described below
commit ddf371aa9ccc3fc426135496aafacd3a732c1af4
Author: GPK <[email protected]>
AuthorDate: Fri Apr 18 10:20:12 2025 +0100
Move test_sensor_helper.py to standard provider tests (#49396)
* Move test_sensor_helper.py to standard provider tests
* Move test_sensor_helper.py to standard provider tests
* mark it as db test
* Fix tests
* Fix tests
* Fix tests arguments
* Remove not using if check
---
.../unit}/standard/utils/test_sensor_helper.py | 84 +++++++++++++++++-----
1 file changed, 66 insertions(+), 18 deletions(-)
diff --git a/providers/tests/standard/utils/test_sensor_helper.py
b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
similarity index 86%
rename from providers/tests/standard/utils/test_sensor_helper.py
rename to providers/standard/tests/unit/standard/utils/test_sensor_helper.py
index 8d5026dcad6..346e956a981 100644
--- a/providers/tests/standard/utils/test_sensor_helper.py
+++ b/providers/standard/tests/unit/standard/utils/test_sensor_helper.py
@@ -27,7 +27,7 @@ import pytest
from airflow.models import DAG, TaskInstance
from airflow.models.dagbag import DagBag
-from airflow.operators.empty import EmptyOperator
+from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.utils.sensor_helper import (
_count_stmt,
_get_count,
@@ -41,15 +41,14 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils import db
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-if AIRFLOW_V_3_0_PLUS:
- from airflow.utils.types import DagRunTriggeredByType
-
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
TI = TaskInstance
[email protected](AIRFLOW_V_3_0_PLUS, reason="Test only for AF2")
[email protected]_test
class TestSensorHelper:
DAG_ID = "test_dag"
TASK_ID = "test_task"
@@ -80,27 +79,26 @@ class TestSensorHelper:
dag: DAG,
*,
task_states: Mapping[str, TaskInstanceState] | None = None,
- logical_date: datetime.datetime | None = None,
+ execution_date: datetime.datetime | None = None,
session: Session,
):
now = timezone.utcnow()
- logical_date = pendulum.instance(logical_date or now)
- triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if
AIRFLOW_V_3_0_PLUS else {}
+ execution_date = pendulum.instance(execution_date or now)
run_type = DagRunType.MANUAL
- data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
+ data_interval =
dag.timetable.infer_manual_data_interval(run_after=execution_date)
dag_run = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=run_type,
- run_after=logical_date,
- data_interval=data_interval,
+ run_after=execution_date,
+ data_interval=None,
+ logical_date=execution_date,
),
run_type=run_type,
- logical_date=logical_date,
data_interval=data_interval,
start_date=now,
state=DagRunState.SUCCESS, # not important
external_trigger=False,
- **triggered_by_kwargs, # type: ignore
+ execution_date=execution_date, # type: ignore[call-arg]
)
if task_states is not None:
@@ -167,7 +165,7 @@ class TestSensorHelper:
for dttm in self.DTTM_FILTER:
self.create_dag_run(
- dag=dag, task_states=dttm_to_task_state[dttm],
logical_date=dttm, session=session
+ dag=dag, task_states=dttm_to_task_state[dttm],
execution_date=dttm, session=session
)
dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -194,7 +192,7 @@ class TestSensorHelper:
EmptyOperator(task_id=subtask_id)
for dttm in self.DTTM_FILTER:
- self.create_dag_run(dag=dag, logical_date=dttm, session=session)
+ self.create_dag_run(dag=dag, execution_date=dttm, session=session)
with mock.patch.object(DagBag, "get_dag") as mock_get_dag:
mock_get_dag.return_value = dag
@@ -224,7 +222,7 @@ class TestSensorHelper:
EmptyOperator(task_id=self.TASK_ID)
now = timezone.utcnow()
- self.create_dag_run(dag, task_states={self.TASK_ID: state},
logical_date=now, session=session)
+ self.create_dag_run(dag, task_states={self.TASK_ID: state},
execution_date=now, session=session)
count = _get_count(
dttm_filter=[now],
@@ -258,7 +256,7 @@ class TestSensorHelper:
for dttm in self.DTTM_FILTER:
self.create_dag_run(
- dag=dag, task_states={self.TASK_ID: task_states[dttm]},
logical_date=dttm, session=session
+ dag=dag, task_states={self.TASK_ID: task_states[dttm]},
execution_date=dttm, session=session
)
dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -314,7 +312,7 @@ class TestSensorHelper:
for dttm in self.DTTM_FILTER:
self.create_dag_run(
- dag=dag, task_states=dttm_to_task_state[dttm],
logical_date=dttm, session=session
+ dag=dag, task_states=dttm_to_task_state[dttm],
execution_date=dttm, session=session
)
dttm_filter = [pendulum.instance(dttm) for dttm in self.DTTM_FILTER]
@@ -378,7 +376,7 @@ class TestSensorHelper:
for dttm in self.DTTM_FILTER:
self.create_dag_run(
- dag=dag, task_states=dttm_to_subtask_state[dttm],
logical_date=dttm, session=session
+ dag=dag, task_states=dttm_to_subtask_state[dttm],
execution_date=dttm, session=session
)
with mock.patch.object(DagBag, "get_dag") as mock_get_dag:
@@ -403,3 +401,53 @@ class TestSensorHelper:
allowed_state_count = len(allowed_task_instance_states) /
len(self.TASK_ID_LIST)
assert count == allowed_state_count
+
+
[email protected](
+ "run_id_task_state_map, states, expected_count",
+ [
+ pytest.param(
+ {
+ "run_id_1": {"task_id_1": "success", "task_id_2": "success"},
+ },
+ ["success"],
+ 1,
+ id="single_runid_with_success",
+ ),
+ pytest.param(
+ {
+ "run_id_1": {"task_id_1": "success", "task_id_2": "success"},
+ "run_id_2": {"task_id_1": "success", "task_id_2": "success"},
+ },
+ ["success"],
+ 2,
+ id="multiple_runid_with_success",
+ ),
+ pytest.param(
+ {
+ "run_id_1": {"task_id_1": "failed", "task_id_2": "failed"},
+ },
+ ["failed"],
+ 1,
+ id="single_runid_with_failed",
+ ),
+ pytest.param(
+ {
+ "run_id_1": {"task_id_1": "success", "task_id_2": "failed"},
+ },
+ ["failed"],
+ 0,
+ id="single_runid_with_no_success",
+ ),
+ ],
+)
[email protected](not AIRFLOW_V_3_0_PLUS, reason="Test only for AF3")
+def test_get_count_by_matched_states(
+ run_id_task_state_map,
+ states,
+ expected_count,
+):
+ from airflow.providers.standard.utils.sensor_helper import
_get_count_by_matched_states
+
+ count =
_get_count_by_matched_states(run_id_task_state_map=run_id_task_state_map,
states=states)
+ assert count == expected_count