This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f60886cf368 add ProcessingEngineRunFacet to OL DAG Start event (#43213)
f60886cf368 is described below

commit f60886cf368b943120af20889b83704ccdbb8c91
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Thu Nov 14 15:57:27 2024 +0100

    add ProcessingEngineRunFacet to OL DAG Start event (#43213)
    
    Signed-off-by: Maciej Obuchowski <[email protected]>
---
 .../airflow/providers/openlineage/plugins/adapter.py    | 14 +++-----------
 .../src/airflow/providers/openlineage/utils/utils.py    | 16 ++++++++++++++--
 providers/tests/openlineage/plugins/test_adapter.py     |  3 +++
 providers/tests/openlineage/plugins/test_utils.py       | 17 +++++++++++++++++
 4 files changed, 37 insertions(+), 13 deletions(-)

diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py 
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index fb58cc5dc02..199df880e79 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -32,7 +32,6 @@ from openlineage.client.facet_v2 import (
     nominal_time_run,
     ownership_job,
     parent_run,
-    processing_engine_run,
     source_code_location_job,
 )
 from openlineage.client.uuid import generate_static_uuid
@@ -42,6 +41,7 @@ from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
     get_airflow_debug_facet,
     get_airflow_state_run_facet,
+    get_processing_engine_facet,
 )
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
@@ -195,18 +195,10 @@ class OpenLineageAdapter(LoggingMixin):
         :param task: metadata container with information extracted from 
operator
         :param run_facets: custom run facets
         """
-        from airflow.version import version as AIRFLOW_VERSION
-
-        processing_engine_version_facet = 
processing_engine_run.ProcessingEngineRunFacet(
-            version=AIRFLOW_VERSION,
-            name="Airflow",
-            openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
-        )
-
         run_facets = run_facets or {}
         if task:
             run_facets = {**task.run_facets, **run_facets}
-        run_facets["processing_engine"] = processing_engine_version_facet  # 
type: ignore
+        run_facets = {**run_facets, **get_processing_engine_facet()}  # type: 
ignore
         event = RunEvent(
             eventType=RunState.START,
             eventTime=event_time,
@@ -362,7 +354,7 @@ class OpenLineageAdapter(LoggingMixin):
                     job_name=dag_id,
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
-                    run_facets={**run_facets, **get_airflow_debug_facet()},
+                    run_facets={**run_facets, **get_airflow_debug_facet(), 
**get_processing_engine_facet()},
                 ),
                 inputs=[],
                 outputs=[],
diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py 
b/providers/src/airflow/providers/openlineage/utils/utils.py
index 8c67c32f95b..99faa3c4d5c 100644
--- a/providers/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/src/airflow/providers/openlineage/utils/utils.py
@@ -38,7 +38,7 @@ from airflow.exceptions import (
 # TODO: move this maybe to Airflow's logic?
 from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
 from airflow.providers.common.compat.assets import Asset
-from airflow.providers.openlineage import conf
+from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.plugins.facets import (
     AirflowDagRunFacet,
     AirflowDebugRunFacet,
@@ -65,7 +65,7 @@ from airflow.utils.module_loading import import_string
 
 if TYPE_CHECKING:
     from openlineage.client.event_v2 import Dataset as OpenLineageDataset
-    from openlineage.client.facet_v2 import RunFacet
+    from openlineage.client.facet_v2 import RunFacet, processing_engine_run
 
     from airflow.models import TaskInstance
     from airflow.utils.state import DagRunState, TaskInstanceState
@@ -428,6 +428,18 @@ def _get_all_packages_installed() -> dict[str, str]:
     return {dist.metadata["Name"]: dist.version for dist in 
metadata.distributions()}
 
 
+def get_processing_engine_facet() -> dict[str, 
processing_engine_run.ProcessingEngineRunFacet]:
+    from openlineage.client.facet_v2 import processing_engine_run
+
+    return {
+        "processing_engine": processing_engine_run.ProcessingEngineRunFacet(
+            version=AIRFLOW_VERSION,
+            name="Airflow",
+            openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
+        )
+    }
+
+
 def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
     if not conf.debug_mode():
         return {}
diff --git a/providers/tests/openlineage/plugins/test_adapter.py 
b/providers/tests/openlineage/plugins/test_adapter.py
index f0928dd70db..73145f9e4b1 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -606,6 +606,9 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                         nominalStartTime=event_time.isoformat(),
                         nominalEndTime=event_time.isoformat(),
                     ),
+                    "processing_engine": 
processing_engine_run.ProcessingEngineRunFacet(
+                        version=ANY, name="Airflow", 
openlineageAdapterVersion=ANY
+                    ),
                     "airflowDagRun": AirflowDagRunFacet(
                         dag=expected_dag_info,
                         dagRun={
diff --git a/providers/tests/openlineage/plugins/test_utils.py 
b/providers/tests/openlineage/plugins/test_utils.py
index 22b80120bb6..e84fac11865 100644
--- a/providers/tests/openlineage/plugins/test_utils.py
+++ b/providers/tests/openlineage/plugins/test_utils.py
@@ -40,6 +40,7 @@ from airflow.providers.openlineage.utils.utils import (
     get_airflow_debug_facet,
     get_airflow_run_facet,
     get_fully_qualified_class_name,
+    get_processing_engine_facet,
     is_operator_disabled,
 )
 from airflow.serialization.enums import DagAttributeTypes
@@ -438,3 +439,19 @@ def test_serialize_timetable_2_8():
             ],
         }
     }
+
+
[email protected](
+    ("airflow_version", "ol_version"),
+    [
+        ("2.9.3", "1.12.2"),
+        ("2.10.1", "1.13.0"),
+        ("3.0.0", "1.14.0"),
+    ],
+)
+def test_get_processing_engine_facet(airflow_version, ol_version):
+    with patch("airflow.providers.openlineage.utils.utils.AIRFLOW_VERSION", 
airflow_version):
+        with 
patch("airflow.providers.openlineage.utils.utils.OPENLINEAGE_PROVIDER_VERSION", 
ol_version):
+            result = get_processing_engine_facet()
+            assert result["processing_engine"].version == airflow_version
+            assert result["processing_engine"].openlineageAdapterVersion == 
ol_version

Reply via email to