This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eca05550d3 Set `slots` to True for facets used in DagRun (#40972)
eca05550d3 is described below
commit eca05550d39ad41dce4949101afdc8b578cffdc9
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Wed Jul 24 11:18:34 2024 +0200
Set `slots` to True for facets used in DagRun (#40972)
state change level OpenLineage listener hooks.
Signed-off-by: Jakub Dardzinski <[email protected]>
---
airflow/providers/openlineage/plugins/facets.py | 6 +-
tests/providers/openlineage/plugins/test_facets.py | 66 +++++++++++++++++++++-
2 files changed, 68 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/facets.py
b/airflow/providers/openlineage/plugins/facets.py
index 24f411477a..4c0b99d39c 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -47,7 +47,7 @@ class AirflowMappedTaskRunFacet(RunFacet):
)
-@define(slots=False)
+@define(slots=True)
class AirflowJobFacet(JobFacet):
"""
Composite Airflow job facet.
@@ -70,7 +70,7 @@ class AirflowJobFacet(JobFacet):
tasks: dict
-@define(slots=False)
+@define(slots=True)
class AirflowStateRunFacet(RunFacet):
"""
Airflow facet providing state information.
@@ -100,7 +100,7 @@ class AirflowRunFacet(RunFacet):
taskUuid: str
-@define(slots=False)
+@define(slots=True)
class AirflowDagRunFacet(RunFacet):
"""Composite Airflow DAG run facet."""
diff --git a/tests/providers/openlineage/plugins/test_facets.py
b/tests/providers/openlineage/plugins/test_facets.py
index 73eaebd0c0..0ed5b4bf7c 100644
--- a/tests/providers/openlineage/plugins/test_facets.py
+++ b/tests/providers/openlineage/plugins/test_facets.py
@@ -16,7 +16,17 @@
# under the License.
from __future__ import annotations
-from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet,
AirflowRunFacet
+import pickle
+
+import attrs
+import pytest
+
+from airflow.providers.openlineage.plugins.facets import (
+ AirflowDagRunFacet,
+ AirflowJobFacet,
+ AirflowRunFacet,
+ AirflowStateRunFacet,
+)
def test_airflow_run_facet():
@@ -52,3 +62,57 @@ def test_airflow_dag_run_facet():
assert airflow_dag_run_facet.dag == dag
assert airflow_dag_run_facet.dagRun == dag_run
+
+
[email protected](
+ "instance",
+ [
+ pytest.param(
+ AirflowJobFacet(
+ taskTree={"task_0": {"section_1.task_3": {}}},
+ taskGroups={
+ "section_1": {
+ "parent_group": None,
+ "tooltip": "",
+ "ui_color": "CornflowerBlue",
+ "ui_fgcolor": "#000",
+ "ui_label": "section_1",
+ }
+ },
+ tasks={
+ "task_0": {
+ "operator": "airflow.operators.bash.BashOperator",
+ "task_group": None,
+ "emits_ol_events": True,
+ "ui_color": "#f0ede4",
+ "ui_fgcolor": "#000",
+ "ui_label": "task_0",
+ "is_setup": False,
+ "is_teardown": False,
+ }
+ },
+ ),
+ id="AirflowJobFacet",
+ ),
+ pytest.param(
+ AirflowStateRunFacet(dagRunState="SUCCESS", tasksState={"task_0":
"SKIPPED"}),
+ id="AirflowStateRunFacet",
+ ),
+ pytest.param(
+ AirflowDagRunFacet(
+ dag={
+ "timetable": {"delta": 86400.0},
+ "owner": "airflow",
+ "start_date": "2024-06-01T00:00:00+00:00",
+ },
+ dagRun={"conf": {}, "dag_id": "dag_id"},
+ ),
+ id="AirflowDagRunFacet",
+ ),
+ ],
+)
+def test_facets_are_pickled_correctly(instance):
+ cls = instance.__class__
+ instance = pickle.loads(pickle.dumps(instance))
+ for field in attrs.fields(cls):
+ getattr(instance, field.name)