This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ce88001bb21 fix: ExternalTaskSensor check_existence ignored in 
deferrable mode (Airflow < 3.0) (#64394)
ce88001bb21 is described below

commit ce88001bb21ba2c895088a8762c66dd18a4b7e4f
Author: dpan <[email protected]>
AuthorDate: Wed Apr 1 20:23:24 2026 +0100

    fix: ExternalTaskSensor check_existence ignored in deferrable mode (Airflow 
< 3.0) (#64394)
    
    * fix: ExternalTaskSensor check_existence ignored in deferrable mode <3.0
    
    Closes #40745
    
    * test: implement test for ExternalTaskSensor when deferred and 
chech_existence=True
    
    * fix: change deferrable sensor raising ExternalDagNotFound test description
    
    Co-authored-by: Wei Lee <[email protected]>
    
    * docs: add TODO for removal when Airflow 2 support is dropped
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
---
 .../providers/standard/sensors/external_task.py    |  7 +++++++
 .../standard/sensors/test_external_task_sensor.py  | 22 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py 
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index ff266fc9010..e511d71b309 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -457,6 +457,13 @@ class ExternalTaskSensor(BaseSensorOperator):
                     method_name="execute_complete",
                 )
             else:
+                # TODO: Remove this block when Airflow 2 support is dropped
+                if self.check_existence and not self._has_checked_existence:
+                    from airflow.utils.session import create_session
+
+                    with create_session() as session:
+                        self._check_for_existence(session=session)
+
                 self.defer(
                     timeout=datetime.timedelta(seconds=timeout_value) if 
timeout_value else None,
                     trigger=WorkflowTrigger(
diff --git 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index f48189f07ad..fde38e4e236 100644
--- 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++ 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -1120,6 +1120,28 @@ exit 0
             op.execute(context=context)
         assert exc.value.timeout == timedelta(seconds=90)
 
+    @pytest.mark.execution_timeout(10)
+    def 
test_external_task_sensor_deferrable_check_existence_dag_not_found(self, 
dag_maker):
+        """Test that deferrable sensor raises ExternalDagNotFoundError when 
Dag doesn't exist."""
+        context = {"execution_date": DEFAULT_DATE}
+        with dag_maker() as dag:
+            op = ExternalTaskSensor(
+                task_id="test_external_task_sensor_check",
+                external_dag_id="non_existing_dag",
+                external_task_id=None,
+                deferrable=True,
+                check_existence=True,
+            )
+            dag.create_dagrun(
+                run_id="test_run",
+                run_type=DagRunType.MANUAL,
+                state=None,
+            )
+            context.update(logical_date=DEFAULT_DATE)
+
+        with pytest.raises(ExternalDagNotFoundError):
+            op.execute(context=context)
+
     def test_get_logical_date(self):
         """For AF 2, we check for execution_date in context."""
         context = {"execution_date": DEFAULT_DATE}

Reply via email to