This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ec9eb79a0 openlineage: Add AirflowRunFacet for dag runEvents (#40854)
9ec9eb79a0 is described below
commit 9ec9eb79a0cc845d86e7380c73269d2ee1d3c210
Author: Maxim Martynov <[email protected]>
AuthorDate: Tue Jul 23 15:43:34 2024 +0300
openlineage: Add AirflowRunFacet for dag runEvents (#40854)
---
.../openlineage/facets/AirflowDagRunFacet.json | 105 +++++++++++++++++++++
airflow/providers/openlineage/plugins/adapter.py | 5 +-
airflow/providers/openlineage/plugins/facets.py | 10 +-
airflow/providers/openlineage/plugins/listener.py | 2 +-
airflow/providers/openlineage/utils/utils.py | 12 +++
.../providers/openlineage/plugins/test_adapter.py | 32 +++++--
tests/providers/openlineage/plugins/test_facets.py | 21 ++++-
tests/providers/openlineage/utils/test_utils.py | 58 +++++++++++-
8 files changed, 231 insertions(+), 14 deletions(-)
diff --git a/airflow/providers/openlineage/facets/AirflowDagRunFacet.json
b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json
new file mode 100644
index 0000000000..165a8e6a59
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json
@@ -0,0 +1,105 @@
+{
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$defs": {
+ "AirflowDagRunFacet": {
+ "allOf": [
+ {
+ "$ref":
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
+ },
+ {
+ "type": "object",
+ "properties": {
+ "dag": {
+ "$ref": "#/$defs/DAG"
+ },
+ "dagRun": {
+ "$ref": "#/$defs/DagRun"
+ }
+ },
+ "required": [
+ "dag",
+ "dagRun"
+ ]
+ }
+ ]
+ },
+ "DAG": {
+ "type": "object",
+ "properties": {
+ "dag_id": {
+ "type": "string"
+ },
+ "description": {
+ "type": "string"
+ },
+ "owner": {
+ "type": "string"
+ },
+ "schedule_interval": {
+ "type": "string"
+ },
+ "start_date": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "tags": {
+ "type": "string"
+ },
+ "timetable": {
+ "description": "Describes timetable (successor of
schedule_interval)",
+ "type": "object",
+ "additionalProperties": true
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "dag_id",
+ "start_date"
+ ]
+ },
+ "DagRun": {
+ "type": "object",
+ "properties": {
+ "conf": {
+ "type": "object",
+ "additionalProperties": true
+ },
+ "dag_id": {
+ "type": "string"
+ },
+ "data_interval_start": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "data_interval_end": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "external_trigger": {
+ "type": "boolean"
+ },
+ "run_id": {
+ "type": "string"
+ },
+ "run_type": {
+ "type": "string"
+ },
+ "start_date": {
+ "type": "string",
+ "format": "date-time"
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "dag_id",
+ "run_id"
+ ]
+ }
+ },
+ "type": "object",
+ "properties": {
+ "airflowDagRun": {
+ "$ref": "#/$defs/AirflowDagRunFacet"
+ }
+ }
+}
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index 9e2613d8db..8e1d924bb9 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -40,6 +40,7 @@ from openlineage.client.uuid import generate_static_uuid
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
+ get_airflow_dag_run_facet,
get_airflow_state_run_facet,
)
from airflow.stats import Stats
@@ -334,6 +335,7 @@ class OpenLineageAdapter(LoggingMixin):
job_facets: dict[str, BaseFacet] | None = None, # Custom job facets
):
try:
+ owner = [x.strip() for x in dag_run.dag.owner.split(",")] if
dag_run.dag else None
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
@@ -341,7 +343,7 @@ class OpenLineageAdapter(LoggingMixin):
job_name=dag_run.dag_id,
job_type=_JOB_TYPE_DAG,
job_description=dag_run.dag.description if dag_run.dag
else None,
- owners=[x.strip() for x in dag_run.dag.owner.split(",")]
if dag_run.dag else None,
+ owners=owner,
job_facets=job_facets,
),
run=self._build_run(
@@ -352,6 +354,7 @@ class OpenLineageAdapter(LoggingMixin):
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
+ run_facets=get_airflow_dag_run_facet(dag_run),
),
inputs=[],
outputs=[],
diff --git a/airflow/providers/openlineage/plugins/facets.py
b/airflow/providers/openlineage/plugins/facets.py
index fb642579fc..d282c72ac8 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -91,7 +91,7 @@ class AirflowStateRunFacet(BaseFacet):
@define(slots=False)
class AirflowRunFacet(BaseFacet):
- """Composite Airflow run facet."""
+ """Composite Airflow task run facet."""
dag: dict
dagRun: dict
@@ -100,6 +100,14 @@ class AirflowRunFacet(BaseFacet):
taskUuid: str
+@define(slots=False)
+class AirflowDagRunFacet(BaseFacet):
+ """Composite Airflow DAG run facet."""
+
+ dag: dict
+ dagRun: dict
+
+
@define(slots=False)
class UnknownOperatorInstance(RedactMixin):
"""
diff --git a/airflow/providers/openlineage/plugins/listener.py
b/airflow/providers/openlineage/plugins/listener.py
index 8798c542c1..a552cb283b 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -420,7 +420,7 @@ class OpenLineageListener:
nominal_end_time=data_interval_end,
# AirflowJobFacet should be created outside ProcessPoolExecutor
that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event
emission.
- job_facets={**get_airflow_job_facet(dag_run=dag_run)},
+ job_facets=get_airflow_job_facet(dag_run=dag_run),
)
@hookimpl
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index a36f44b3d5..171f35a775 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -37,6 +37,7 @@ from airflow.exceptions import
AirflowProviderDeprecationWarning # TODO: move t
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
+ AirflowDagRunFacet,
AirflowJobFacet,
AirflowMappedTaskRunFacet,
AirflowRunFacet,
@@ -345,6 +346,17 @@ class TaskGroupInfo(InfoJsonEncodable):
]
+def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+ if not dag_run.dag:
+ return {}
+ return {
+ "airflowDagRun": AirflowDagRunFacet(
+ dag=DagInfo(dag_run.dag),
+ dagRun=DagRunInfo(dag_run),
+ )
+ }
+
+
def get_airflow_run_facet(
dag_run: DagRun,
dag: DAG,
diff --git a/tests/providers/openlineage/plugins/test_adapter.py
b/tests/providers/openlineage/plugins/test_adapter.py
index e588b25dcc..fb60b5cc8c 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -43,14 +43,10 @@ from airflow.models.dagrun import DagRun, DagRunState
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
-from airflow.providers.openlineage.conf import (
- namespace,
-)
+from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import _PRODUCER,
OpenLineageAdapter
-from airflow.providers.openlineage.plugins.facets import (
- AirflowStateRunFacet,
-)
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowStateRunFacet
from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
from airflow.utils.task_group import TaskGroup
from tests.test_utils.config import conf_vars
@@ -518,6 +514,7 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
run_id=run_id,
start_date=event_time,
execution_date=event_time,
+ data_interval=(event_time, event_time),
)
dag_run.dag = dag
generate_static_uuid.return_value = random_uuid
@@ -544,7 +541,28 @@ def test_emit_dag_started_event(mock_stats_incr,
mock_stats_timer, generate_stat
"nominalTime": NominalTimeRunFacet(
nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat(),
- )
+ ),
+ "airflowDagRun": AirflowDagRunFacet(
+ dag={
+ "timetable": {"delta": 86400.0},
+ "dag_id": dag_id,
+ "description": "dag desc",
+ "owner": "airflow",
+ "schedule_interval": "86400.0 seconds",
+ "start_date": "2024-06-01T00:00:00+00:00",
+ "tags": [],
+ },
+ dagRun={
+ "conf": {},
+ "dag_id": "dag_id",
+ "data_interval_start": event_time.isoformat(),
+ "data_interval_end": event_time.isoformat(),
+ "external_trigger": None,
+ "run_id": run_id,
+ "run_type": None,
+ "start_date": event_time.isoformat(),
+ },
+ ),
},
),
job=Job(
diff --git a/tests/providers/openlineage/plugins/test_facets.py
b/tests/providers/openlineage/plugins/test_facets.py
index dd4e5851f2..73eaebd0c0 100644
--- a/tests/providers/openlineage/plugins/test_facets.py
+++ b/tests/providers/openlineage/plugins/test_facets.py
@@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations
-from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowRunFacet
def test_airflow_run_facet():
@@ -27,7 +27,11 @@ def test_airflow_run_facet():
task_uuid = "XXX"
airflow_run_facet = AirflowRunFacet(
- dag=dag, dagRun=dag_run, task=task, taskInstance=task_instance,
taskUuid=task_uuid
+ dag=dag,
+ dagRun=dag_run,
+ task=task,
+ taskInstance=task_instance,
+ taskUuid=task_uuid,
)
assert airflow_run_facet.dag == dag
@@ -35,3 +39,16 @@ def test_airflow_run_facet():
assert airflow_run_facet.task == task
assert airflow_run_facet.taskInstance == task_instance
assert airflow_run_facet.taskUuid == task_uuid
+
+
+def test_airflow_dag_run_facet():
+ dag = {"dag_id": "123"}
+ dag_run = {"dag_run_id": "456"}
+
+ airflow_dag_run_facet = AirflowDagRunFacet(
+ dag=dag,
+ dagRun=dag_run,
+ )
+
+ assert airflow_dag_run_facet.dag == dag
+ assert airflow_dag_run_facet.dagRun == dag_run
diff --git a/tests/providers/openlineage/utils/test_utils.py
b/tests/providers/openlineage/utils/test_utils.py
index d3a9d89445..6f6fc104b3 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -23,17 +23,19 @@ from unittest.mock import ANY, MagicMock, patch
from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import BaseOperator
+from airflow.models.dagrun import DagRun
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
-from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowJobFacet
from airflow.providers.openlineage.utils.utils import (
_get_parsed_dag_tree,
_get_task_groups_details,
_get_tasks_details,
_safe_get_dag_tree_view,
+ get_airflow_dag_run_facet,
get_airflow_job_facet,
get_custom_facets,
get_fully_qualified_class_name,
@@ -42,6 +44,7 @@ from airflow.providers.openlineage.utils.utils import (
)
from airflow.serialization.serialized_objects import SerializedBaseOperator
from airflow.utils.task_group import TaskGroup
+from airflow.utils.types import DagRunType
from tests.test_utils.mock_operators import MockOperator
@@ -62,7 +65,7 @@ def test_get_airflow_job_facet():
task_0 >> task_10
- dagrun_mock = MagicMock()
+ dagrun_mock = MagicMock(DagRun)
dagrun_mock.dag = dag
result = get_airflow_job_facet(dagrun_mock)
@@ -104,6 +107,57 @@ def test_get_airflow_job_facet():
}
+def test_get_airflow_dag_run_facet():
+ with DAG(
+ dag_id="dag",
+ schedule="@once",
+ start_date=datetime.datetime(2024, 6, 1),
+ tags=["test"],
+ ) as dag:
+ task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+
+ with TaskGroup("section_1", prefix_group_id=True):
+ task_10 = PythonOperator(task_id="task_3", python_callable=lambda:
1)
+
+ task_0 >> task_10
+
+ dagrun_mock = MagicMock(DagRun)
+ dagrun_mock.dag = dag
+ dagrun_mock.conf = {}
+ dagrun_mock.dag_id = dag.dag_id
+ dagrun_mock.data_interval_start = datetime.datetime(2024, 6, 1, 1, 2, 3,
tzinfo=datetime.timezone.utc)
+ dagrun_mock.data_interval_end = datetime.datetime(2024, 6, 1, 2, 3, 4,
tzinfo=datetime.timezone.utc)
+ dagrun_mock.external_trigger = True
+ dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00"
+ dagrun_mock.run_type = DagRunType.MANUAL
+ dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4,
tzinfo=datetime.timezone.utc)
+
+ result = get_airflow_dag_run_facet(dagrun_mock)
+ assert result == {
+ "airflowDagRun": AirflowDagRunFacet(
+ dag={
+ "dag_id": "dag",
+ "description": None,
+ "owner": "airflow",
+ "timetable": {},
+ "schedule_interval": "@once",
+ "start_date": "2024-06-01T00:00:00+00:00",
+ "tags": ["test"],
+ },
+ dagRun={
+ "conf": {},
+ "dag_id": "dag",
+ "data_interval_start": "2024-06-01T01:02:03+00:00",
+ "data_interval_end": "2024-06-01T02:03:04+00:00",
+ "external_trigger": True,
+ "run_id": "manual_2024-06-01T00:00:00+00:00",
+ "run_type": "manual",
+ "start_date": "2024-06-01T01:02:04+00:00",
+ },
+ )
+ }
+
+
def test_get_fully_qualified_class_name_serialized_operator():
op_module_path = "airflow.operators.bash"
op_name = "BashOperator"