This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce88001bb21 fix: ExternalTaskSensor check_existence ignored in
deferrable mode (Airflow < 3.0) (#64394)
ce88001bb21 is described below
commit ce88001bb21ba2c895088a8762c66dd18a4b7e4f
Author: dpan <[email protected]>
AuthorDate: Wed Apr 1 20:23:24 2026 +0100
fix: ExternalTaskSensor check_existence ignored in deferrable mode (Airflow
< 3.0) (#64394)
* fix: ExternalTaskSensor check_existence ignored in deferrable mode <3.0
Closes #40745
* test: implement test for ExternalTaskSensor when deferred and
chech_existence=True
* fix: change deferrable sensor raising ExternalDagNotFound test description
Co-authored-by: Wei Lee <[email protected]>
* docs: add TODO for removal when Airflow 2 support is dropped
---------
Co-authored-by: Wei Lee <[email protected]>
---
.../providers/standard/sensors/external_task.py | 7 +++++++
.../standard/sensors/test_external_task_sensor.py | 22 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index ff266fc9010..e511d71b309 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -457,6 +457,13 @@ class ExternalTaskSensor(BaseSensorOperator):
method_name="execute_complete",
)
else:
+ # TODO: Remove this block when Airflow 2 support is dropped
+ if self.check_existence and not self._has_checked_existence:
+ from airflow.utils.session import create_session
+
+ with create_session() as session:
+ self._check_for_existence(session=session)
+
self.defer(
timeout=datetime.timedelta(seconds=timeout_value) if
timeout_value else None,
trigger=WorkflowTrigger(
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index f48189f07ad..fde38e4e236 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -1120,6 +1120,28 @@ exit 0
op.execute(context=context)
assert exc.value.timeout == timedelta(seconds=90)
+ @pytest.mark.execution_timeout(10)
+ def
test_external_task_sensor_deferrable_check_existence_dag_not_found(self,
dag_maker):
+ """Test that deferrable sensor raises ExternalDagNotFoundError when
Dag doesn't exist."""
+ context = {"execution_date": DEFAULT_DATE}
+ with dag_maker() as dag:
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id="non_existing_dag",
+ external_task_id=None,
+ deferrable=True,
+ check_existence=True,
+ )
+ dag.create_dagrun(
+ run_id="test_run",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ )
+ context.update(logical_date=DEFAULT_DATE)
+
+ with pytest.raises(ExternalDagNotFoundError):
+ op.execute(context=context)
+
def test_get_logical_date(self):
"""For AF 2, we check for execution_date in context."""
context = {"execution_date": DEFAULT_DATE}