This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4ab0183cfa Add AirflowRun on COMPLETE/FAIL events (#40996)
4ab0183cfa is described below
commit 4ab0183cfad9a4afc8543970b8910da0ef1f3b19
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Jul 28 07:34:33 2024 +0200
Add AirflowRun on COMPLETE/FAIL events (#40996)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
airflow/providers/openlineage/plugins/adapter.py | 9 +++++----
airflow/providers/openlineage/plugins/listener.py | 10 ++++++++--
tests/providers/openlineage/plugins/test_adapter.py | 5 ++++-
tests/providers/openlineage/plugins/test_listener.py | 20 ++++++++++++++++----
4 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 7405556088..e71334184e 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -178,7 +178,7 @@ class OpenLineageAdapter(LoggingMixin):
nominal_end_time: str | None,
owners: list[str],
task: OperatorLineage | None,
- run_facets: dict[str, RunFacet] | None = None, # Custom run facets
+ run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type START.
@@ -243,7 +243,7 @@ class OpenLineageAdapter(LoggingMixin):
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
- run_facets: dict[str, RunFacet] | None = None, # Custom run facets
+ run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type COMPLETE.
@@ -255,7 +255,7 @@ class OpenLineageAdapter(LoggingMixin):
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from
operator
- :param run_facets: custom run facets
+ :param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
@@ -285,8 +285,8 @@ class OpenLineageAdapter(LoggingMixin):
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
- run_facets: dict[str, RunFacet] | None = None, # Custom run facets
error: str | BaseException | None = None,
+ run_facets: dict[str, RunFacet] | None = None,
) -> RunEvent:
"""
Emit openlineage event of type FAIL.
@@ -300,6 +300,7 @@ class OpenLineageAdapter(LoggingMixin):
:param task: metadata container with information extracted from
operator
:param run_facets: custom run facets
:param error: error
+ :param run_facets: additional run facets
"""
run_facets = run_facets or {}
if task:
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 58ccdcad24..2f227fa2e6 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -235,7 +235,10 @@ class OpenLineageListener:
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
- run_facets=get_user_provided_run_facets(task_instance,
TaskInstanceState.SUCCESS),
+ run_facets={
+ **get_user_provided_run_facets(task_instance,
TaskInstanceState.SUCCESS),
+ **get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
+ },
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
@@ -330,8 +333,11 @@ class OpenLineageListener:
parent_run_id=parent_run_id,
end_time=end_date.isoformat(),
task=task_metadata,
- run_facets=get_user_provided_run_facets(task_instance,
TaskInstanceState.FAILED),
error=error,
+ run_facets={
+ **get_user_provided_run_facets(task_instance,
TaskInstanceState.FAILED),
+ **get_airflow_run_facet(dagrun, dag, task_instance, task,
task_uuid),
+ },
)
Stats.gauge(
f"ol.event.size.{event_type}.{operator_name}",
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index b648bb51d3..3deeac3894 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -45,7 +45,10 @@ from airflow.operators.empty import EmptyOperator
from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER,
OpenLineageAdapter
-from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowStateRunFacet
+from airflow.providers.openlineage.plugins.facets import (
+ AirflowDagRunFacet,
+ AirflowStateRunFacet,
+)
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
from airflow.utils.task_group import TaskGroup
from tests.test_utils.config import conf_vars
diff --git a/tests/providers/openlineage/plugins/test_listener.py
b/tests/providers/openlineage/plugins/test_listener.py
index b05a934e02..3b0c9f0159 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -264,11 +264,16 @@ def
test_adapter_start_task_is_called_with_proper_arguments(
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
[email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute",
new=regular_call)
def test_adapter_fail_task_is_called_with_proper_arguments(
- mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter,
mock_disabled
+ mock_get_job_name,
+ mock_get_user_provided_run_facets,
+ mock_get_airflow_run_facet,
+ mocked_adapter,
+ mock_disabled,
):
"""Tests that the 'fail_task' method of the OpenLineageAdapter is invoked
with the correct arguments.
@@ -289,6 +294,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
+ mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False
err = ValueError("test")
@@ -305,18 +311,23 @@ def
test_adapter_fail_task_is_called_with_proper_arguments(
parent_run_id="execution_date.dag_id",
run_id="execution_date.dag_id.task_id.1",
task=listener.extractor_manager.extract_metadata(),
- run_facets={"custom_user_facet": 2},
+ run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
**expected_err_kwargs,
)
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
[email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
@mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute",
new=regular_call)
def test_adapter_complete_task_is_called_with_proper_arguments(
- mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter,
mock_disabled
+ mock_get_job_name,
+ mock_get_user_provided_run_facets,
+ mock_get_airflow_run_facet,
+ mocked_adapter,
+ mock_disabled,
):
"""Tests that the 'complete_task' method of the OpenLineageAdapter is
called with the correct arguments.
@@ -338,6 +349,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
+ mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
mock_disabled.return_value = False
listener.on_task_instance_success(None, task_instance, None)
@@ -352,7 +364,7 @@ def
test_adapter_complete_task_is_called_with_proper_arguments(
parent_run_id="execution_date.dag_id",
run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
task=listener.extractor_manager.extract_metadata(),
- run_facets={"custom_user_facet": 2},
+ run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
)