This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4ff925761d3 feat: add few attrs from external_task sensor to
OpenLineage events (#58719)
4ff925761d3 is described below
commit 4ff925761d3923775c8abbd37e88abe170b70fec
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Nov 26 15:10:20 2025 +0100
feat: add few attrs from external_task sensor to OpenLineage events (#58719)
---
.../airflow/providers/openlineage/utils/utils.py | 15 +++++--
.../tests/unit/openlineage/utils/test_utils.py | 29 ++++++++++++++
.../providers/standard/sensors/external_task.py | 12 +++++-
.../standard/sensors/test_external_task_sensor.py | 46 ++++++++++++++++++----
4 files changed, 90 insertions(+), 12 deletions(-)
diff --git
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 477d088d3bf..62604042e74 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -712,6 +712,7 @@ class DagRunInfo(InfoJsonEncodable):
"data_interval_start",
"data_interval_end",
"external_trigger", # Removed in Airflow 3, use run_type instead
+ "execution_date", # Airflow 2
"logical_date", # Airflow 3
"run_after", # Airflow 3
"run_id",
@@ -802,14 +803,20 @@ class TaskInfo(InfoJsonEncodable):
"run_as_user",
"sla",
"task_id",
- "trigger_dag_id",
- "trigger_run_id",
- "external_dag_id",
- "external_task_id",
"trigger_rule",
"upstream_task_ids",
"wait_for_downstream",
"wait_for_past_depends_before_skipping",
+ # Operator-specific useful attributes
+ "trigger_dag_id", # TriggerDagRunOperator
+ "trigger_run_id", # TriggerDagRunOperator
+ "external_dag_id", # ExternalTaskSensor and ExternalTaskMarker (if
run, as it's EmptyOperator)
+ "external_task_id", # ExternalTaskSensor and ExternalTaskMarker (if
run, as it's EmptyOperator)
+ "external_task_ids", # ExternalTaskSensor
+ "external_task_group_id", # ExternalTaskSensor
+ "external_dates_filter", # ExternalTaskSensor
+ "logical_date", # AF 3 ExternalTaskMarker (if run, as it's
EmptyOperator)
+ "execution_date", # AF 2 ExternalTaskMarker (if run, as it's
EmptyOperator)
]
casts = {
"operator_class": lambda task: task.task_type,
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index d6c99d525de..8c14987db7d 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -163,6 +163,7 @@ def test_get_airflow_dag_run_facet():
dagrun_mock.external_trigger = True
dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00"
dagrun_mock.run_type = DagRunType.MANUAL
+ dagrun_mock.execution_date = datetime.datetime(2024, 6, 1, 1, 2, 4,
tzinfo=datetime.timezone.utc)
dagrun_mock.logical_date = datetime.datetime(2024, 6, 1, 1, 2, 4,
tzinfo=datetime.timezone.utc)
dagrun_mock.run_after = datetime.datetime(2024, 6, 1, 1, 2, 4,
tzinfo=datetime.timezone.utc)
dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4,
tzinfo=datetime.timezone.utc)
@@ -205,6 +206,7 @@ def test_get_airflow_dag_run_facet():
"start_date": "2024-06-01T01:02:04+00:00",
"end_date": "2024-06-01T01:02:14.034172+00:00",
"duration": 10.034172,
+ "execution_date": "2024-06-01T01:02:04+00:00",
"logical_date": "2024-06-01T01:02:04+00:00",
"run_after": "2024-06-01T01:02:04+00:00",
"dag_bundle_name": "bundle_name",
@@ -2345,6 +2347,7 @@ def test_dagrun_info_af2():
"run_type": DagRunType.MANUAL,
"external_trigger": False,
"start_date": "2024-06-01T00:00:00+00:00",
+ "execution_date": "2024-06-01T00:00:00+00:00",
"logical_date": "2024-06-01T00:00:00+00:00",
"dag_bundle_name": None,
"dag_bundle_version": None,
@@ -2424,10 +2427,17 @@ def test_taskinstance_info_af2():
def test_task_info_af3():
class CustomOperator(PythonOperator):
def __init__(self, *args, **kwargs):
+ # Mock some specific attributes from different operators
self.deferrable = True
self.trigger_dag_id = "trigger_dag_id"
+ self.trigger_run_id = "trigger_run_id"
self.external_dag_id = "external_dag_id"
self.external_task_id = "external_task_id"
+ self.external_task_ids = "external_task_ids"
+ self.external_task_group_id = "external_task_group_id"
+ self.external_dates_filter = "external_dates_filter"
+ self.logical_date = "logical_date"
+ self.execution_date = "execution_date"
super().__init__(*args, **kwargs)
with DAG(
@@ -2464,12 +2474,17 @@ def test_task_info_af3():
"deferrable": True,
"depends_on_past": False,
"downstream_task_ids": "['task_1']",
+ "execution_date": "execution_date",
"execution_timeout": None,
"executor_config": {},
"external_dag_id": "external_dag_id",
+ "external_dates_filter": "external_dates_filter",
"external_task_id": "external_task_id",
+ "external_task_ids": "external_task_ids",
+ "external_task_group_id": "external_task_group_id",
"ignore_first_depends_on_past": False,
"inlets": "[{'uri': 'uri1', 'extra': {'a': 1}}]",
+ "logical_date": "logical_date",
"mapped": False,
"max_active_tis_per_dag": None,
"max_active_tis_per_dagrun": None,
@@ -2488,6 +2503,7 @@ def test_task_info_af3():
"task_group": tg_info,
"task_id": "section_1.task_3",
"trigger_dag_id": "trigger_dag_id",
+ "trigger_run_id": "trigger_run_id",
"trigger_rule": "all_success",
"upstream_task_ids": "['task_0']",
"wait_for_downstream": False,
@@ -2499,10 +2515,17 @@ def test_task_info_af3():
def test_task_info_af2():
class CustomOperator(PythonOperator):
def __init__(self, *args, **kwargs):
+ # Mock some specific attributes from different operators
self.deferrable = True
self.trigger_dag_id = "trigger_dag_id"
+ self.trigger_run_id = "trigger_run_id"
self.external_dag_id = "external_dag_id"
self.external_task_id = "external_task_id"
+ self.external_task_ids = "external_task_ids"
+ self.external_task_group_id = "external_task_group_id"
+ self.external_dates_filter = "external_dates_filter"
+ self.logical_date = "logical_date"
+ self.execution_date = "execution_date"
super().__init__(*args, **kwargs)
with DAG(
@@ -2539,15 +2562,20 @@ def test_task_info_af2():
"deferrable": True,
"depends_on_past": False,
"downstream_task_ids": "['task_1']",
+ "execution_date": "execution_date",
"execution_timeout": None,
"executor_config": {},
"external_dag_id": "external_dag_id",
+ "external_dates_filter": "external_dates_filter",
"external_task_id": "external_task_id",
+ "external_task_ids": "external_task_ids",
+ "external_task_group_id": "external_task_group_id",
"ignore_first_depends_on_past": True,
"is_setup": False,
"is_teardown": False,
"sla": None,
"inlets": "[{'uri': 'uri1', 'extra': {'a': 1}}]",
+ "logical_date": "logical_date",
"mapped": False,
"max_active_tis_per_dag": None,
"max_active_tis_per_dagrun": None,
@@ -2566,6 +2594,7 @@ def test_task_info_af2():
"task_group": tg_info,
"task_id": "section_1.task_3",
"trigger_dag_id": "trigger_dag_id",
+ "trigger_run_id": "trigger_run_id",
"trigger_rule": "all_success",
"upstream_task_ids": "['task_0']",
"wait_for_downstream": False,
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 28c4492ef71..5c44503372c 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -250,6 +250,7 @@ class ExternalTaskSensor(BaseSensorOperator):
self._has_checked_existence = False
self.deferrable = deferrable
self.poll_interval = poll_interval
+ self.external_dates_filter: str | None = None
def _get_dttm_filter(self, context: Context) ->
Sequence[datetime.datetime]:
logical_date = self._get_logical_date(context)
@@ -261,13 +262,19 @@ class ExternalTaskSensor(BaseSensorOperator):
return result if isinstance(result, list) else [result]
return [logical_date]
+ @staticmethod
+ def _serialize_dttm_filter(dttm_filter: Sequence[datetime.datetime]) ->
str:
+ return ",".join(dt.isoformat() for dt in dttm_filter)
+
def poke(self, context: Context) -> bool:
# delay check to poke rather than __init__ in case it was supplied as
XComArgs
if self.external_task_ids and len(self.external_task_ids) >
len(set(self.external_task_ids)):
raise ValueError("Duplicate task_ids passed in external_task_ids
parameter")
dttm_filter = self._get_dttm_filter(context)
- serialized_dttm_filter = ",".join(dt.isoformat() for dt in dttm_filter)
+ serialized_dttm_filter = self._serialize_dttm_filter(dttm_filter)
+ # Save as attribute - to be used by listeners
+ self.external_dates_filter = serialized_dttm_filter
if self.external_task_ids:
self.log.info(
@@ -456,6 +463,9 @@ class ExternalTaskSensor(BaseSensorOperator):
if event is None:
raise ExternalTaskNotFoundError("No event received from trigger")
+ # Re-set as attribute after coming back from deferral - to be used by
listeners
+ self.external_dates_filter =
self._serialize_dttm_filter(self._get_dttm_filter(context))
+
if event["status"] == "success":
self.log.info("External tasks %s has executed successfully.",
self.external_task_ids)
elif event["status"] == "skipped":
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index e4597c982e5..c95c61b6cd3 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -901,7 +901,7 @@ exit 0
def test_fail_poke(
self, _get_dttm_filter, get_count, soft_fail, expected_exception,
kwargs, expected_message
):
- _get_dttm_filter.return_value = []
+ _get_dttm_filter.return_value = [DEFAULT_DATE]
get_count.return_value = 1
op = ExternalTaskSensor(
task_id="test_external_task_duplicate_task_ids",
@@ -912,6 +912,7 @@ exit 0
deferrable=False,
**kwargs,
)
+ assert op.external_dates_filter is None
# We need to handle the specific exception types based on kwargs
if not soft_fail:
@@ -931,6 +932,8 @@ exit 0
with pytest.raises(expected_exception, match=expected_message):
op.execute(context={})
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
+
@pytest.mark.parametrize(
("response_get_current", "response_exists", "kwargs",
"expected_message"),
(
@@ -1105,6 +1108,7 @@ class TestExternalTaskSensorV3:
states=["success"],
task_ids=["test_external_task_sensor_success"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_failure(self, dag_maker):
@@ -1128,6 +1132,7 @@ class TestExternalTaskSensorV3:
states=[State.FAILED],
task_ids=["test_external_task_sensor_failure"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_soft_fail(self, dag_maker):
@@ -1152,6 +1157,7 @@ class TestExternalTaskSensorV3:
states=[State.FAILED],
task_ids=["test_external_task_sensor_soft_fail"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_multiple_task_ids(self, dag_maker):
@@ -1172,6 +1178,7 @@ class TestExternalTaskSensorV3:
states=["success"],
task_ids=["task1", "task2"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_skipped_states(self, dag_maker):
@@ -1193,6 +1200,7 @@ class TestExternalTaskSensorV3:
states=[State.SKIPPED],
task_ids=["test_external_task_sensor_skipped_states"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
def test_external_task_sensor_invalid_combination(self, dag_maker):
"""Test that the sensor raises an error with invalid parameter
combinations."""
@@ -1239,6 +1247,7 @@ class TestExternalTaskSensorV3:
logical_dates=[DEFAULT_DATE],
task_group_id="test_group",
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_execution_date_fn(self, dag_maker):
@@ -1264,6 +1273,7 @@ class TestExternalTaskSensorV3:
states=["success"],
task_ids=["test_task"],
)
+ assert op.external_dates_filter == expected_date.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_execution_delta(self, dag_maker):
@@ -1286,6 +1296,7 @@ class TestExternalTaskSensorV3:
states=["success"],
task_ids=["test_task"],
)
+ assert op.external_dates_filter == expected_date.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_duplicate_task_ids(self, dag_maker):
@@ -1338,6 +1349,7 @@ class TestExternalTaskSensorV3:
logical_dates=[DEFAULT_DATE],
states=["success"],
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
@pytest.mark.execution_timeout(10)
def test_external_task_sensor_task_group_failed_states(self, dag_maker):
@@ -1359,6 +1371,7 @@ class TestExternalTaskSensorV3:
logical_dates=[DEFAULT_DATE],
task_group_id="test_group",
)
+ assert op.external_dates_filter == DEFAULT_DATE.isoformat()
def test_get_logical_date(self):
"""For AF 3, we check for logical date or dag_run.run_after in
context."""
@@ -1416,7 +1429,7 @@ class TestExternalTaskAsyncSensor:
deferrable=True,
)
- context = {"execution_date": datetime(2025, 1, 1), "logical_date":
datetime(2025, 1, 1)}
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with pytest.raises(TaskDeferred) as exc:
sensor.execute(context=context)
@@ -1431,9 +1444,10 @@ class TestExternalTaskAsyncSensor:
deferrable=True,
)
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with pytest.raises(ExternalTaskNotFoundError):
sensor.execute_complete(
- context=mock.MagicMock(), event={"status": "error", "message":
"test failure message"}
+ context=context, event={"status": "error", "message": "test
failure message"}
)
def test_defer_and_fire_timeout_state_trigger(self):
@@ -1445,9 +1459,10 @@ class TestExternalTaskAsyncSensor:
deferrable=True,
)
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with pytest.raises(ExternalTaskNotFoundError):
sensor.execute_complete(
- context=mock.MagicMock(),
+ context=context,
event={"status": "timeout", "message": "Dag was not started
within 1 minute, assuming fail."},
)
@@ -1460,9 +1475,10 @@ class TestExternalTaskAsyncSensor:
deferrable=True,
)
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with mock.patch.object(sensor.log, "info") as mock_log_info:
sensor.execute_complete(
- context=mock.MagicMock(),
+ context=context,
event={"status": "success"},
)
mock_log_info.assert_called_with("External tasks %s has executed
successfully.", [EXTERNAL_TASK_ID])
@@ -1476,9 +1492,10 @@ class TestExternalTaskAsyncSensor:
deferrable=True,
)
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with pytest.raises(ExternalDagFailedError, match="External job has
failed."):
sensor.execute_complete(
- context=mock.MagicMock(),
+ context=context,
event={"status": "failed"},
)
@@ -1492,9 +1509,10 @@ class TestExternalTaskAsyncSensor:
soft_fail=True,
)
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
with pytest.raises(AirflowSkipException, match="External job has
failed skipping."):
sensor.execute_complete(
- context=mock.MagicMock(),
+ context=context,
event={"status": "failed"},
)
@@ -1517,6 +1535,20 @@ class TestExternalTaskAsyncSensor:
assert isinstance(trigger, WorkflowTrigger), "Trigger is not a
WorkflowTrigger"
assert trigger.failed_states == failed_states, "failed_states not
properly passed to WorkflowTrigger"
+ def test_defer_execute_complete_re_sets_external_dates_filter_attr(self):
+ sensor = ExternalTaskSensor(
+ task_id=TASK_ID,
+ external_task_id=EXTERNAL_TASK_ID,
+ external_dag_id=EXTERNAL_DAG_ID,
+ deferrable=True,
+ )
+ assert sensor.external_dates_filter is None
+
+ context = {"execution_date": DEFAULT_DATE, "logical_date":
DEFAULT_DATE}
+ sensor.execute_complete(context=context, event={"status": "success"})
+
+ assert sensor.external_dates_filter == DEFAULT_DATE.isoformat()
+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Needs Flask app context
fixture for AF 2")
@pytest.mark.parametrize(