This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fc5410c27d9 Fix sensor skipping in Airflow 3.x branching operators 
(#53455)
fc5410c27d9 is described below

commit fc5410c27d9c5979d46b1c0d13a364096bbc00ab
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Jul 18 00:03:00 2025 +0530

    Fix sensor skipping in Airflow 3.x branching operators (#53455)
    
    In Airflow 3.x, sensors inherit from airflow.sdk.BaseOperator instead of
    airflow.models.BaseOperator. The _ensure_tasks function in SkipMixin was
    only checking for the models BaseOperator, causing sensors to be filtered
    out and not properly skipped by branching operators like BranchSQLOperator.
    
    Updated the import logic to use the correct SDK BaseOperator for Airflow 3.x
    and added comprehensive tests to verify sensors are properly included in
    branching skip operations.
    
    Fixes #52219
---
 .../airflow/providers/standard/utils/skipmixin.py  |  4 +-
 .../tests/unit/standard/operators/test_python.py   | 48 +++++++++++++
 .../tests/unit/standard/utils/test_skipmixin.py    | 81 ++++++++++++++++++++++
 3 files changed, 131 insertions(+), 2 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/utils/skipmixin.py 
b/providers/standard/src/airflow/providers/standard/utils/skipmixin.py
index 6be0533fcbb..56295bae64d 100644
--- a/providers/standard/src/airflow/providers/standard/utils/skipmixin.py
+++ b/providers/standard/src/airflow/providers/standard/utils/skipmixin.py
@@ -22,7 +22,7 @@ from types import GeneratorType
 from typing import TYPE_CHECKING
 
 from airflow.exceptions import AirflowException
-from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
@@ -40,7 +40,7 @@ XCOM_SKIPMIXIN_FOLLOWED = "followed"
 
 
 def _ensure_tasks(nodes: Iterable[DAGNode]) -> Sequence[Operator]:
-    if AIRFLOW_V_3_1_PLUS:
+    if AIRFLOW_V_3_0_PLUS:
         from airflow.sdk import BaseOperator
         from airflow.sdk.definitions.mappedoperator import MappedOperator
     else:
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py 
b/providers/standard/tests/unit/standard/operators/test_python.py
index e9cecea81c3..7a166559a72 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -845,6 +845,54 @@ class TestShortCircuitOperator(BasePythonTest):
             "skipped": ["empty_task"]
         }
 
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Airflow 2 
implementation is different")
+    def test_short_circuit_operator_skips_sensors(self):
+        """Test that ShortCircuitOperator properly skips sensors in Airflow 
3.x."""
+        from airflow.sdk.bases.sensor import BaseSensorOperator
+
+        # Create a sensor similar to S3FileSensor to reproduce the issue
+        class CustomS3Sensor(BaseSensorOperator):
+            def __init__(self, bucket_name: str, object_key: str, **kwargs):
+                super().__init__(**kwargs)
+                self.bucket_name = bucket_name
+                self.object_key = object_key
+                self.timeout = 0
+                self.poke_interval = 0
+
+            def poke(self, context):
+                # Simulate sensor logic
+                return True
+
+        with self.dag_maker(self.dag_id):
+            # ShortCircuit that evaluates to False (should skip all downstream)
+            short_circuit = ShortCircuitOperator(
+                task_id="check_dis_is_mon_to_fri_not_holiday",
+                python_callable=lambda: False,  # This causes skipping
+            )
+
+            sensor_task = CustomS3Sensor(
+                task_id="wait_for_ticker_to_secid_lookup_s3_file",
+                bucket_name="test-bucket",
+                object_key="ticker_to_secid_lookup.csv",
+            )
+
+            short_circuit >> sensor_task
+
+        dr = self.dag_maker.create_dagrun()
+
+        self.dag_maker.run_ti("check_dis_is_mon_to_fri_not_holiday", dr)
+
+        # Verify the sensor is included in the skip list by checking XCom
+        # (this was the bug - sensors were not being included in skip list)
+        tis = dr.get_task_instances()
+        xcom_data = 
tis[0].xcom_pull(task_ids="check_dis_is_mon_to_fri_not_holiday", 
key="skipmixin_key")
+
+        assert xcom_data is not None, "XCom data should exist"
+        skipped_task_ids = set(xcom_data.get("skipped", []))
+        assert "wait_for_ticker_to_secid_lookup_s3_file" in skipped_task_ids, (
+            "Sensor should be skipped by ShortCircuitOperator"
+        )
+
 
 virtualenv_string_args: list[str] = []
 
diff --git a/providers/standard/tests/unit/standard/utils/test_skipmixin.py 
b/providers/standard/tests/unit/standard/utils/test_skipmixin.py
index 30b26b2624f..7d4c9875efb 100644
--- a/providers/standard/tests/unit/standard/utils/test_skipmixin.py
+++ b/providers/standard/tests/unit/standard/utils/test_skipmixin.py
@@ -359,3 +359,84 @@ class TestSkipMixin:
         error_message = r"'branch_task_ids' must contain only valid task_ids. 
Invalid tasks found: .*"
         with pytest.raises(AirflowException, match=error_message):
             SkipMixin().skip_all_except(ti=ti1, 
branch_task_ids=branch_task_ids)
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Issue only exists in 
Airflow 3.x")
+    def test_ensure_tasks_includes_sensors_airflow_3x(self, dag_maker):
+        """Test that sensors (inheriting from airflow.sdk.BaseOperator) are 
properly handled by _ensure_tasks."""
+        from airflow.providers.standard.utils.skipmixin import _ensure_tasks
+        from airflow.sdk import BaseOperator as SDKBaseOperator
+        from airflow.sdk.bases.sensor import BaseSensorOperator
+
+        class DummySensor(BaseSensorOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+                self.timeout = 0
+                self.poke_interval = 0
+
+            def poke(self, context):
+                return True
+
+        with dag_maker("dag_test_sensor_skipping") as dag:
+            regular_task = EmptyOperator(task_id="regular_task")
+            sensor_task = DummySensor(task_id="sensor_task")
+            downstream_task = EmptyOperator(task_id="downstream_task")
+
+            regular_task >> [sensor_task, downstream_task]
+
+        dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID)
+
+        downstream_nodes = dag.get_task("regular_task").downstream_list
+        task_list = _ensure_tasks(downstream_nodes)
+
+        # Verify both the regular operator and sensor are included
+        task_ids = [t.task_id for t in task_list]
+        assert "sensor_task" in task_ids, "Sensor should be included in task 
list"
+        assert "downstream_task" in task_ids, "Regular task should be included 
in task list"
+        assert len(task_list) == 2, "Both tasks should be included"
+
+        # Also verify that the sensor is actually an instance of the correct 
BaseOperator
+        sensor_in_list = next((t for t in task_list if t.task_id == 
"sensor_task"), None)
+        assert sensor_in_list is not None, "Sensor task should be found in 
list"
+        assert isinstance(sensor_in_list, SDKBaseOperator), "Sensor should be 
instance of SDK BaseOperator"
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Integration test for 
Airflow 3.x sensor skipping")
+    def test_skip_sensor_in_branching_scenario(self, dag_maker):
+        """Integration test: verify sensors are properly skipped by branching 
operators in Airflow 3.x."""
+        from airflow.sdk.bases.sensor import BaseSensorOperator
+
+        # Create a dummy sensor for testing
+        class DummySensor(BaseSensorOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+                self.timeout = 0
+                self.poke_interval = 0
+
+            def poke(self, context):
+                return True
+
+        with dag_maker("dag_test_branch_sensor_skipping"):
+            branch_task = EmptyOperator(task_id="branch_task")
+            regular_task = EmptyOperator(task_id="regular_task")
+            sensor_task = DummySensor(task_id="sensor_task")
+            branch_task >> [regular_task, sensor_task]
+
+        dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID)
+
+        dag_version = DagVersion.get_latest_version(branch_task.dag_id)
+        ti_branch = TI(branch_task, run_id=DEFAULT_DAG_RUN_ID, 
dag_version_id=dag_version.id)
+
+        # Test skipping the sensor (follow regular_task branch)
+        with pytest.raises(DownstreamTasksSkipped) as exc_info:
+            SkipMixin().skip_all_except(ti=ti_branch, 
branch_task_ids="regular_task")
+
+        # Verify that the sensor task is properly marked for skipping
+        skipped_tasks = set(exc_info.value.tasks)
+        assert ("sensor_task", -1) in skipped_tasks, "Sensor task should be 
marked for skipping"
+
+        # Test skipping the regular task (follow sensor_task branch)
+        with pytest.raises(DownstreamTasksSkipped) as exc_info:
+            SkipMixin().skip_all_except(ti=ti_branch, 
branch_task_ids="sensor_task")
+
+        # Verify that the regular task is properly marked for skipping
+        skipped_tasks = set(exc_info.value.tasks)
+        assert ("regular_task", -1) in skipped_tasks, "Regular task should be 
marked for skipping"

Reply via email to