This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ab0183cfa Add AirflowRun on COMPLETE/FAIL events (#40996)
4ab0183cfa is described below

commit 4ab0183cfad9a4afc8543970b8910da0ef1f3b19
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Sun Jul 28 07:34:33 2024 +0200

    Add AirflowRun on COMPLETE/FAIL events (#40996)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 airflow/providers/openlineage/plugins/adapter.py     |  9 +++++----
 airflow/providers/openlineage/plugins/listener.py    | 10 ++++++++--
 tests/providers/openlineage/plugins/test_adapter.py  |  5 ++++-
 tests/providers/openlineage/plugins/test_listener.py | 20 ++++++++++++++++----
 4 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 7405556088..e71334184e 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -178,7 +178,7 @@ class OpenLineageAdapter(LoggingMixin):
         nominal_end_time: str | None,
         owners: list[str],
         task: OperatorLineage | None,
-        run_facets: dict[str, RunFacet] | None = None,  # Custom run facets
+        run_facets: dict[str, RunFacet] | None = None,
     ) -> RunEvent:
         """
         Emit openlineage event of type START.
@@ -243,7 +243,7 @@ class OpenLineageAdapter(LoggingMixin):
         parent_run_id: str | None,
         end_time: str,
         task: OperatorLineage,
-        run_facets: dict[str, RunFacet] | None = None,  # Custom run facets
+        run_facets: dict[str, RunFacet] | None = None,
     ) -> RunEvent:
         """
         Emit openlineage event of type COMPLETE.
@@ -255,7 +255,7 @@ class OpenLineageAdapter(LoggingMixin):
         :param parent_run_id: identifier of job spawning this task
         :param end_time: time of task completion
         :param task: metadata container with information extracted from 
operator
-        :param run_facets: custom run facets
+        :param run_facets: additional run facets
         """
         run_facets = run_facets or {}
         if task:
@@ -285,8 +285,8 @@ class OpenLineageAdapter(LoggingMixin):
         parent_run_id: str | None,
         end_time: str,
         task: OperatorLineage,
-        run_facets: dict[str, RunFacet] | None = None,  # Custom run facets
         error: str | BaseException | None = None,
+        run_facets: dict[str, RunFacet] | None = None,
     ) -> RunEvent:
         """
         Emit openlineage event of type FAIL.
@@ -300,6 +300,7 @@ class OpenLineageAdapter(LoggingMixin):
         :param task: metadata container with information extracted from 
operator
         :param run_facets: custom run facets
         :param error: error
+        :param run_facets: additional run facets
         """
         run_facets = run_facets or {}
         if task:
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 58ccdcad24..2f227fa2e6 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -235,7 +235,10 @@ class OpenLineageListener:
                 parent_run_id=parent_run_id,
                 end_time=end_date.isoformat(),
                 task=task_metadata,
-                run_facets=get_user_provided_run_facets(task_instance, 
TaskInstanceState.SUCCESS),
+                run_facets={
+                    **get_user_provided_run_facets(task_instance, 
TaskInstanceState.SUCCESS),
+                    **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                },
             )
             Stats.gauge(
                 f"ol.event.size.{event_type}.{operator_name}",
@@ -330,8 +333,11 @@ class OpenLineageListener:
                 parent_run_id=parent_run_id,
                 end_time=end_date.isoformat(),
                 task=task_metadata,
-                run_facets=get_user_provided_run_facets(task_instance, 
TaskInstanceState.FAILED),
                 error=error,
+                run_facets={
+                    **get_user_provided_run_facets(task_instance, 
TaskInstanceState.FAILED),
+                    **get_airflow_run_facet(dagrun, dag, task_instance, task, 
task_uuid),
+                },
             )
             Stats.gauge(
                 f"ol.event.size.{event_type}.{operator_name}",
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index b648bb51d3..3deeac3894 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -45,7 +45,10 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.openlineage.conf import namespace
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _PRODUCER, 
OpenLineageAdapter
-from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowStateRunFacet
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowDagRunFacet,
+    AirflowStateRunFacet,
+)
 from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
 from airflow.utils.task_group import TaskGroup
 from tests.test_utils.config import conf_vars
diff --git a/tests/providers/openlineage/plugins/test_listener.py 
b/tests/providers/openlineage/plugins/test_listener.py
index b05a934e02..3b0c9f0159 100644
--- a/tests/providers/openlineage/plugins/test_listener.py
+++ b/tests/providers/openlineage/plugins/test_listener.py
@@ -264,11 +264,16 @@ def 
test_adapter_start_task_is_called_with_proper_arguments(
 
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
[email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
 @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute",
 new=regular_call)
 def test_adapter_fail_task_is_called_with_proper_arguments(
-    mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, 
mock_disabled
+    mock_get_job_name,
+    mock_get_user_provided_run_facets,
+    mock_get_airflow_run_facet,
+    mocked_adapter,
+    mock_disabled,
 ):
     """Tests that the 'fail_task' method of the OpenLineageAdapter is invoked 
with the correct arguments.
 
@@ -289,6 +294,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments(
     mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
     mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
+    mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
     mock_disabled.return_value = False
 
     err = ValueError("test")
@@ -305,18 +311,23 @@ def 
test_adapter_fail_task_is_called_with_proper_arguments(
         parent_run_id="execution_date.dag_id",
         run_id="execution_date.dag_id.task_id.1",
         task=listener.extractor_manager.extract_metadata(),
-        run_facets={"custom_user_facet": 2},
+        run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
         **expected_err_kwargs,
     )
 
 
 
@mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageAdapter")
[email protected]("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets")
 @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name")
 
@mock.patch("airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute",
 new=regular_call)
 def test_adapter_complete_task_is_called_with_proper_arguments(
-    mock_get_job_name, mock_get_user_provided_run_facets, mocked_adapter, 
mock_disabled
+    mock_get_job_name,
+    mock_get_user_provided_run_facets,
+    mock_get_airflow_run_facet,
+    mocked_adapter,
+    mock_disabled,
 ):
     """Tests that the 'complete_task' method of the OpenLineageAdapter is 
called with the correct arguments.
 
@@ -338,6 +349,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
     mocked_adapter.build_dag_run_id.side_effect = mock_dag_id
     mocked_adapter.build_task_instance_run_id.side_effect = mock_task_id
     mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2}
+    mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}}
     mock_disabled.return_value = False
 
     listener.on_task_instance_success(None, task_instance, None)
@@ -352,7 +364,7 @@ def 
test_adapter_complete_task_is_called_with_proper_arguments(
         parent_run_id="execution_date.dag_id",
         run_id=f"execution_date.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}",
         task=listener.extractor_manager.extract_metadata(),
-        run_facets={"custom_user_facet": 2},
+        run_facets={"custom_user_facet": 2, "airflow": {"task": "..."}},
     )
 
 

Reply via email to