This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f60886cf368 add ProcessingEngineRunFacet to OL DAG Start event (#43213)
f60886cf368 is described below
commit f60886cf368b943120af20889b83704ccdbb8c91
Author: Maciej Obuchowski <[email protected]>
AuthorDate: Thu Nov 14 15:57:27 2024 +0100
add ProcessingEngineRunFacet to OL DAG Start event (#43213)
Signed-off-by: Maciej Obuchowski <[email protected]>
---
.../airflow/providers/openlineage/plugins/adapter.py | 14 +++-----------
.../src/airflow/providers/openlineage/utils/utils.py | 16 ++++++++++++++--
providers/tests/openlineage/plugins/test_adapter.py | 3 +++
providers/tests/openlineage/plugins/test_utils.py | 17 +++++++++++++++++
4 files changed, 37 insertions(+), 13 deletions(-)
diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py
b/providers/src/airflow/providers/openlineage/plugins/adapter.py
index fb58cc5dc02..199df880e79 100644
--- a/providers/src/airflow/providers/openlineage/plugins/adapter.py
+++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py
@@ -32,7 +32,6 @@ from openlineage.client.facet_v2 import (
nominal_time_run,
ownership_job,
parent_run,
- processing_engine_run,
source_code_location_job,
)
from openlineage.client.uuid import generate_static_uuid
@@ -42,6 +41,7 @@ from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
get_airflow_debug_facet,
get_airflow_state_run_facet,
+ get_processing_engine_facet,
)
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -195,18 +195,10 @@ class OpenLineageAdapter(LoggingMixin):
:param task: metadata container with information extracted from
operator
:param run_facets: custom run facets
"""
- from airflow.version import version as AIRFLOW_VERSION
-
- processing_engine_version_facet =
processing_engine_run.ProcessingEngineRunFacet(
- version=AIRFLOW_VERSION,
- name="Airflow",
- openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
- )
-
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
- run_facets["processing_engine"] = processing_engine_version_facet #
type: ignore
+ run_facets = {**run_facets, **get_processing_engine_facet()} # type:
ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
@@ -362,7 +354,7 @@ class OpenLineageAdapter(LoggingMixin):
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
- run_facets={**run_facets, **get_airflow_debug_facet()},
+ run_facets={**run_facets, **get_airflow_debug_facet(),
**get_processing_engine_facet()},
),
inputs=[],
outputs=[],
diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py
b/providers/src/airflow/providers/openlineage/utils/utils.py
index 8c67c32f95b..99faa3c4d5c 100644
--- a/providers/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/src/airflow/providers/openlineage/utils/utils.py
@@ -38,7 +38,7 @@ from airflow.exceptions import (
# TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.common.compat.assets import Asset
-from airflow.providers.openlineage import conf
+from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowDebugRunFacet,
@@ -65,7 +65,7 @@ from airflow.utils.module_loading import import_string
if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
- from openlineage.client.facet_v2 import RunFacet
+ from openlineage.client.facet_v2 import RunFacet, processing_engine_run
from airflow.models import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -428,6 +428,18 @@ def _get_all_packages_installed() -> dict[str, str]:
return {dist.metadata["Name"]: dist.version for dist in
metadata.distributions()}
+def get_processing_engine_facet() -> dict[str,
processing_engine_run.ProcessingEngineRunFacet]:
+ from openlineage.client.facet_v2 import processing_engine_run
+
+ return {
+ "processing_engine": processing_engine_run.ProcessingEngineRunFacet(
+ version=AIRFLOW_VERSION,
+ name="Airflow",
+ openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
+ )
+ }
+
+
def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
if not conf.debug_mode():
return {}
diff --git a/providers/tests/openlineage/plugins/test_adapter.py
b/providers/tests/openlineage/plugins/test_adapter.py
index f0928dd70db..73145f9e4b1 100644
--- a/providers/tests/openlineage/plugins/test_adapter.py
+++ b/providers/tests/openlineage/plugins/test_adapter.py
@@ -606,6 +606,9 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat(),
),
+ "processing_engine":
processing_engine_run.ProcessingEngineRunFacet(
+ version=ANY, name="Airflow",
openlineageAdapterVersion=ANY
+ ),
"airflowDagRun": AirflowDagRunFacet(
dag=expected_dag_info,
dagRun={
diff --git a/providers/tests/openlineage/plugins/test_utils.py
b/providers/tests/openlineage/plugins/test_utils.py
index 22b80120bb6..e84fac11865 100644
--- a/providers/tests/openlineage/plugins/test_utils.py
+++ b/providers/tests/openlineage/plugins/test_utils.py
@@ -40,6 +40,7 @@ from airflow.providers.openlineage.utils.utils import (
get_airflow_debug_facet,
get_airflow_run_facet,
get_fully_qualified_class_name,
+ get_processing_engine_facet,
is_operator_disabled,
)
from airflow.serialization.enums import DagAttributeTypes
@@ -438,3 +439,19 @@ def test_serialize_timetable_2_8():
],
}
}
+
+
[email protected](
+ ("airflow_version", "ol_version"),
+ [
+ ("2.9.3", "1.12.2"),
+ ("2.10.1", "1.13.0"),
+ ("3.0.0", "1.14.0"),
+ ],
+)
+def test_get_processing_engine_facet(airflow_version, ol_version):
+ with patch("airflow.providers.openlineage.utils.utils.AIRFLOW_VERSION",
airflow_version):
+ with
patch("airflow.providers.openlineage.utils.utils.OPENLINEAGE_PROVIDER_VERSION",
ol_version):
+ result = get_processing_engine_facet()
+ assert result["processing_engine"].version == airflow_version
+ assert result["processing_engine"].openlineageAdapterVersion ==
ol_version