kyungjunleeme commented on code in PR #53455:
URL: https://github.com/apache/airflow/pull/53455#discussion_r2213565230
##########
providers/standard/tests/unit/standard/utils/test_skipmixin.py:
##########
@@ -359,3 +359,84 @@ def
test_raise_exception_on_not_valid_branch_task_ids(self, dag_maker, branch_ta
error_message = r"'branch_task_ids' must contain only valid task_ids.
Invalid tasks found: .*"
with pytest.raises(AirflowException, match=error_message):
SkipMixin().skip_all_except(ti=ti1,
branch_task_ids=branch_task_ids)
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Issue only exists in
Airflow 3.x")
+ def test_ensure_tasks_includes_sensors_airflow_3x(self, dag_maker):
+ """Test that sensors (inheriting from airflow.sdk.BaseOperator) are
properly handled by _ensure_tasks."""
+ from airflow.providers.standard.utils.skipmixin import _ensure_tasks
+ from airflow.sdk import BaseOperator as SDKBaseOperator
+ from airflow.sdk.bases.sensor import BaseSensorOperator
+
+ class DummySensor(BaseSensorOperator):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.timeout = 0
+ self.poke_interval = 0
+
+ def poke(self, context):
+ return True
+
+ with dag_maker("dag_test_sensor_skipping", serialized=True) as dag:
+ regular_task = EmptyOperator(task_id="regular_task")
+ sensor_task = DummySensor(task_id="sensor_task")
+ downstream_task = EmptyOperator(task_id="downstream_task")
+
+ regular_task >> [sensor_task, downstream_task]
+
+ dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID)
+
+ downstream_nodes = dag.get_task("regular_task").downstream_list
+ task_list = _ensure_tasks(downstream_nodes)
+
+ # Verify both the regular operator and sensor are included
+ task_ids = [t.task_id for t in task_list]
+ assert "sensor_task" in task_ids, "Sensor should be included in task
list"
+ assert "downstream_task" in task_ids, "Regular task should be included
in task list"
+ assert len(task_list) == 2, "Both tasks should be included"
+
+ # Also verify that the sensor is actually an instance of the correct
BaseOperator
+ sensor_in_list = next((t for t in task_list if t.task_id ==
"sensor_task"), None)
+ assert sensor_in_list is not None, "Sensor task should be found in
list"
+ assert isinstance(sensor_in_list, SDKBaseOperator), "Sensor should be
instance of SDK BaseOperator"
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Integration test for
Airflow 3.x sensor skipping")
+ def test_skip_sensor_in_branching_scenario(self, dag_maker):
+ """Integration test: verify sensors are properly skipped by branching
operators in Airflow 3.x."""
+ from airflow.sdk.bases.sensor import BaseSensorOperator
+
+ # Create a dummy sensor for testing
+ class DummySensor(BaseSensorOperator):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+ self.timeout = 0
+ self.poke_interval = 0
+
+ def poke(self, context):
+ return True
+
+ with dag_maker("dag_test_branch_sensor_skipping", serialized=True):
Review Comment:
```suggestion
with dag_maker("dag_test_branch_sensor_skipping"):
```
you have to remove serialize=True. because. `_ensure_tasks()` expects
instances of BaseOperator, so it does not work properly with serialized DAGs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]