This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ec9eb79a0 openlineage: Add AirflowRunFacet for dag runEvents (#40854)
9ec9eb79a0 is described below

commit 9ec9eb79a0cc845d86e7380c73269d2ee1d3c210
Author: Maxim Martynov <[email protected]>
AuthorDate: Tue Jul 23 15:43:34 2024 +0300

    openlineage: Add AirflowRunFacet for dag runEvents (#40854)
---
 .../openlineage/facets/AirflowDagRunFacet.json     | 105 +++++++++++++++++++++
 airflow/providers/openlineage/plugins/adapter.py   |   5 +-
 airflow/providers/openlineage/plugins/facets.py    |  10 +-
 airflow/providers/openlineage/plugins/listener.py  |   2 +-
 airflow/providers/openlineage/utils/utils.py       |  12 +++
 .../providers/openlineage/plugins/test_adapter.py  |  32 +++++--
 tests/providers/openlineage/plugins/test_facets.py |  21 ++++-
 tests/providers/openlineage/utils/test_utils.py    |  58 +++++++++++-
 8 files changed, 231 insertions(+), 14 deletions(-)

diff --git a/airflow/providers/openlineage/facets/AirflowDagRunFacet.json 
b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json
new file mode 100644
index 0000000000..165a8e6a59
--- /dev/null
+++ b/airflow/providers/openlineage/facets/AirflowDagRunFacet.json
@@ -0,0 +1,105 @@
+{
+  "$schema": "https://json-schema.org/draft/2020-12/schema";,
+  "$defs": {
+    "AirflowDagRunFacet": {
+      "allOf": [
+        {
+          "$ref": 
"https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet";
+        },
+        {
+          "type": "object",
+          "properties": {
+            "dag": {
+              "$ref": "#/$defs/DAG"
+            },
+            "dagRun": {
+              "$ref": "#/$defs/DagRun"
+            }
+          },
+          "required": [
+            "dag",
+            "dagRun"
+          ]
+        }
+      ]
+    },
+    "DAG": {
+      "type": "object",
+      "properties": {
+        "dag_id": {
+          "type": "string"
+        },
+        "description": {
+          "type": "string"
+        },
+        "owner": {
+          "type": "string"
+        },
+        "schedule_interval": {
+          "type": "string"
+        },
+        "start_date": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "tags": {
+          "type": "string"
+        },
+        "timetable": {
+          "description": "Describes timetable (successor of 
schedule_interval)",
+          "type": "object",
+          "additionalProperties": true
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "dag_id",
+        "start_date"
+      ]
+    },
+    "DagRun": {
+      "type": "object",
+      "properties": {
+        "conf": {
+          "type": "object",
+          "additionalProperties": true
+        },
+        "dag_id": {
+          "type": "string"
+        },
+        "data_interval_start": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "data_interval_end": {
+          "type": "string",
+          "format": "date-time"
+        },
+        "external_trigger": {
+          "type": "boolean"
+        },
+        "run_id": {
+          "type": "string"
+        },
+        "run_type": {
+          "type": "string"
+        },
+        "start_date": {
+          "type": "string",
+          "format": "date-time"
+        }
+      },
+      "additionalProperties": true,
+      "required": [
+        "dag_id",
+        "run_id"
+      ]
+    }
+  },
+  "type": "object",
+  "properties": {
+    "airflowDagRun": {
+      "$ref": "#/$defs/AirflowDagRunFacet"
+    }
+  }
+}
diff --git a/airflow/providers/openlineage/plugins/adapter.py 
b/airflow/providers/openlineage/plugins/adapter.py
index 9e2613d8db..8e1d924bb9 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -40,6 +40,7 @@ from openlineage.client.uuid import generate_static_uuid
 from airflow.providers.openlineage import __version__ as 
OPENLINEAGE_PROVIDER_VERSION, conf
 from airflow.providers.openlineage.utils.utils import (
     OpenLineageRedactor,
+    get_airflow_dag_run_facet,
     get_airflow_state_run_facet,
 )
 from airflow.stats import Stats
@@ -334,6 +335,7 @@ class OpenLineageAdapter(LoggingMixin):
         job_facets: dict[str, BaseFacet] | None = None,  # Custom job facets
     ):
         try:
+            owner = [x.strip() for x in dag_run.dag.owner.split(",")] if 
dag_run.dag else None
             event = RunEvent(
                 eventType=RunState.START,
                 eventTime=dag_run.start_date.isoformat(),
@@ -341,7 +343,7 @@ class OpenLineageAdapter(LoggingMixin):
                     job_name=dag_run.dag_id,
                     job_type=_JOB_TYPE_DAG,
                     job_description=dag_run.dag.description if dag_run.dag 
else None,
-                    owners=[x.strip() for x in dag_run.dag.owner.split(",")] 
if dag_run.dag else None,
+                    owners=owner,
                     job_facets=job_facets,
                 ),
                 run=self._build_run(
@@ -352,6 +354,7 @@ class OpenLineageAdapter(LoggingMixin):
                     job_name=dag_run.dag_id,
                     nominal_start_time=nominal_start_time,
                     nominal_end_time=nominal_end_time,
+                    run_facets=get_airflow_dag_run_facet(dag_run),
                 ),
                 inputs=[],
                 outputs=[],
diff --git a/airflow/providers/openlineage/plugins/facets.py 
b/airflow/providers/openlineage/plugins/facets.py
index fb642579fc..d282c72ac8 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -91,7 +91,7 @@ class AirflowStateRunFacet(BaseFacet):
 
 @define(slots=False)
 class AirflowRunFacet(BaseFacet):
-    """Composite Airflow run facet."""
+    """Composite Airflow task run facet."""
 
     dag: dict
     dagRun: dict
@@ -100,6 +100,14 @@ class AirflowRunFacet(BaseFacet):
     taskUuid: str
 
 
+@define(slots=False)
+class AirflowDagRunFacet(BaseFacet):
+    """Composite Airflow DAG run facet."""
+
+    dag: dict
+    dagRun: dict
+
+
 @define(slots=False)
 class UnknownOperatorInstance(RedactMixin):
     """
diff --git a/airflow/providers/openlineage/plugins/listener.py 
b/airflow/providers/openlineage/plugins/listener.py
index 8798c542c1..a552cb283b 100644
--- a/airflow/providers/openlineage/plugins/listener.py
+++ b/airflow/providers/openlineage/plugins/listener.py
@@ -420,7 +420,7 @@ class OpenLineageListener:
             nominal_end_time=data_interval_end,
             # AirflowJobFacet should be created outside ProcessPoolExecutor 
that pickles objects,
             # as it causes lack of some TaskGroup attributes and crashes event 
emission.
-            job_facets={**get_airflow_job_facet(dag_run=dag_run)},
+            job_facets=get_airflow_job_facet(dag_run=dag_run),
         )
 
     @hookimpl
diff --git a/airflow/providers/openlineage/utils/utils.py 
b/airflow/providers/openlineage/utils/utils.py
index a36f44b3d5..171f35a775 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -37,6 +37,7 @@ from airflow.exceptions import 
AirflowProviderDeprecationWarning  # TODO: move t
 from airflow.models import DAG, BaseOperator, MappedOperator
 from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.plugins.facets import (
+    AirflowDagRunFacet,
     AirflowJobFacet,
     AirflowMappedTaskRunFacet,
     AirflowRunFacet,
@@ -345,6 +346,17 @@ class TaskGroupInfo(InfoJsonEncodable):
     ]
 
 
+def get_airflow_dag_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]:
+    if not dag_run.dag:
+        return {}
+    return {
+        "airflowDagRun": AirflowDagRunFacet(
+            dag=DagInfo(dag_run.dag),
+            dagRun=DagRunInfo(dag_run),
+        )
+    }
+
+
 def get_airflow_run_facet(
     dag_run: DagRun,
     dag: DAG,
diff --git a/tests/providers/openlineage/plugins/test_adapter.py 
b/tests/providers/openlineage/plugins/test_adapter.py
index e588b25dcc..fb60b5cc8c 100644
--- a/tests/providers/openlineage/plugins/test_adapter.py
+++ b/tests/providers/openlineage/plugins/test_adapter.py
@@ -43,14 +43,10 @@ from airflow.models.dagrun import DagRun, DagRunState
 from airflow.models.taskinstance import TaskInstance, TaskInstanceState
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
-from airflow.providers.openlineage.conf import (
-    namespace,
-)
+from airflow.providers.openlineage.conf import namespace
 from airflow.providers.openlineage.extractors import OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import _PRODUCER, 
OpenLineageAdapter
-from airflow.providers.openlineage.plugins.facets import (
-    AirflowStateRunFacet,
-)
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowStateRunFacet
 from airflow.providers.openlineage.utils.utils import get_airflow_job_facet
 from airflow.utils.task_group import TaskGroup
 from tests.test_utils.config import conf_vars
@@ -518,6 +514,7 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
         run_id=run_id,
         start_date=event_time,
         execution_date=event_time,
+        data_interval=(event_time, event_time),
     )
     dag_run.dag = dag
     generate_static_uuid.return_value = random_uuid
@@ -544,7 +541,28 @@ def test_emit_dag_started_event(mock_stats_incr, 
mock_stats_timer, generate_stat
                         "nominalTime": NominalTimeRunFacet(
                             nominalStartTime=event_time.isoformat(),
                             nominalEndTime=event_time.isoformat(),
-                        )
+                        ),
+                        "airflowDagRun": AirflowDagRunFacet(
+                            dag={
+                                "timetable": {"delta": 86400.0},
+                                "dag_id": dag_id,
+                                "description": "dag desc",
+                                "owner": "airflow",
+                                "schedule_interval": "86400.0 seconds",
+                                "start_date": "2024-06-01T00:00:00+00:00",
+                                "tags": [],
+                            },
+                            dagRun={
+                                "conf": {},
+                                "dag_id": "dag_id",
+                                "data_interval_start": event_time.isoformat(),
+                                "data_interval_end": event_time.isoformat(),
+                                "external_trigger": None,
+                                "run_id": run_id,
+                                "run_type": None,
+                                "start_date": event_time.isoformat(),
+                            },
+                        ),
                     },
                 ),
                 job=Job(
diff --git a/tests/providers/openlineage/plugins/test_facets.py 
b/tests/providers/openlineage/plugins/test_facets.py
index dd4e5851f2..73eaebd0c0 100644
--- a/tests/providers/openlineage/plugins/test_facets.py
+++ b/tests/providers/openlineage/plugins/test_facets.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from airflow.providers.openlineage.plugins.facets import AirflowRunFacet
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowRunFacet
 
 
 def test_airflow_run_facet():
@@ -27,7 +27,11 @@ def test_airflow_run_facet():
     task_uuid = "XXX"
 
     airflow_run_facet = AirflowRunFacet(
-        dag=dag, dagRun=dag_run, task=task, taskInstance=task_instance, 
taskUuid=task_uuid
+        dag=dag,
+        dagRun=dag_run,
+        task=task,
+        taskInstance=task_instance,
+        taskUuid=task_uuid,
     )
 
     assert airflow_run_facet.dag == dag
@@ -35,3 +39,16 @@ def test_airflow_run_facet():
     assert airflow_run_facet.task == task
     assert airflow_run_facet.taskInstance == task_instance
     assert airflow_run_facet.taskUuid == task_uuid
+
+
+def test_airflow_dag_run_facet():
+    dag = {"dag_id": "123"}
+    dag_run = {"dag_run_id": "456"}
+
+    airflow_dag_run_facet = AirflowDagRunFacet(
+        dag=dag,
+        dagRun=dag_run,
+    )
+
+    assert airflow_dag_run_facet.dag == dag
+    assert airflow_dag_run_facet.dagRun == dag_run
diff --git a/tests/providers/openlineage/utils/test_utils.py 
b/tests/providers/openlineage/utils/test_utils.py
index d3a9d89445..6f6fc104b3 100644
--- a/tests/providers/openlineage/utils/test_utils.py
+++ b/tests/providers/openlineage/utils/test_utils.py
@@ -23,17 +23,19 @@ from unittest.mock import ANY, MagicMock, patch
 from airflow import DAG
 from airflow.decorators import task
 from airflow.models.baseoperator import BaseOperator
+from airflow.models.dagrun import DagRun
 from airflow.models.mappedoperator import MappedOperator
 from airflow.models.taskinstance import TaskInstance
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
-from airflow.providers.openlineage.plugins.facets import AirflowJobFacet
+from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowJobFacet
 from airflow.providers.openlineage.utils.utils import (
     _get_parsed_dag_tree,
     _get_task_groups_details,
     _get_tasks_details,
     _safe_get_dag_tree_view,
+    get_airflow_dag_run_facet,
     get_airflow_job_facet,
     get_custom_facets,
     get_fully_qualified_class_name,
@@ -42,6 +44,7 @@ from airflow.providers.openlineage.utils.utils import (
 )
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.task_group import TaskGroup
+from airflow.utils.types import DagRunType
 from tests.test_utils.mock_operators import MockOperator
 
 
@@ -62,7 +65,7 @@ def test_get_airflow_job_facet():
 
         task_0 >> task_10
 
-    dagrun_mock = MagicMock()
+    dagrun_mock = MagicMock(DagRun)
     dagrun_mock.dag = dag
 
     result = get_airflow_job_facet(dagrun_mock)
@@ -104,6 +107,57 @@ def test_get_airflow_job_facet():
     }
 
 
+def test_get_airflow_dag_run_facet():
+    with DAG(
+        dag_id="dag",
+        schedule="@once",
+        start_date=datetime.datetime(2024, 6, 1),
+        tags=["test"],
+    ) as dag:
+        task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
+
+        with TaskGroup("section_1", prefix_group_id=True):
+            task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 
1)
+
+        task_0 >> task_10
+
+    dagrun_mock = MagicMock(DagRun)
+    dagrun_mock.dag = dag
+    dagrun_mock.conf = {}
+    dagrun_mock.dag_id = dag.dag_id
+    dagrun_mock.data_interval_start = datetime.datetime(2024, 6, 1, 1, 2, 3, 
tzinfo=datetime.timezone.utc)
+    dagrun_mock.data_interval_end = datetime.datetime(2024, 6, 1, 2, 3, 4, 
tzinfo=datetime.timezone.utc)
+    dagrun_mock.external_trigger = True
+    dagrun_mock.run_id = "manual_2024-06-01T00:00:00+00:00"
+    dagrun_mock.run_type = DagRunType.MANUAL
+    dagrun_mock.start_date = datetime.datetime(2024, 6, 1, 1, 2, 4, 
tzinfo=datetime.timezone.utc)
+
+    result = get_airflow_dag_run_facet(dagrun_mock)
+    assert result == {
+        "airflowDagRun": AirflowDagRunFacet(
+            dag={
+                "dag_id": "dag",
+                "description": None,
+                "owner": "airflow",
+                "timetable": {},
+                "schedule_interval": "@once",
+                "start_date": "2024-06-01T00:00:00+00:00",
+                "tags": ["test"],
+            },
+            dagRun={
+                "conf": {},
+                "dag_id": "dag",
+                "data_interval_start": "2024-06-01T01:02:03+00:00",
+                "data_interval_end": "2024-06-01T02:03:04+00:00",
+                "external_trigger": True,
+                "run_id": "manual_2024-06-01T00:00:00+00:00",
+                "run_type": "manual",
+                "start_date": "2024-06-01T01:02:04+00:00",
+            },
+        )
+    }
+
+
 def test_get_fully_qualified_class_name_serialized_operator():
     op_module_path = "airflow.operators.bash"
     op_name = "BashOperator"

Reply via email to