This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eca05550d3 Set `slots` to True for facets used in DagRun (#40972)
eca05550d3 is described below

commit eca05550d39ad41dce4949101afdc8b578cffdc9
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Wed Jul 24 11:18:34 2024 +0200

    Set `slots` to True for facets used in DagRun (#40972)
    
    state change level OpenLineage listener hooks.
    
    Signed-off-by: Jakub Dardzinski <[email protected]>
---
 airflow/providers/openlineage/plugins/facets.py    |  6 +-
 tests/providers/openlineage/plugins/test_facets.py | 66 +++++++++++++++++++++-
 2 files changed, 68 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/openlineage/plugins/facets.py 
b/airflow/providers/openlineage/plugins/facets.py
index 24f411477a..4c0b99d39c 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -47,7 +47,7 @@ class AirflowMappedTaskRunFacet(RunFacet):
         )
 
 
-@define(slots=False)
+@define(slots=True)
 class AirflowJobFacet(JobFacet):
     """
     Composite Airflow job facet.
@@ -70,7 +70,7 @@ class AirflowJobFacet(JobFacet):
     tasks: dict
 
 
-@define(slots=False)
+@define(slots=True)
 class AirflowStateRunFacet(RunFacet):
     """
     Airflow facet providing state information.
@@ -100,7 +100,7 @@ class AirflowRunFacet(RunFacet):
     taskUuid: str
 
 
-@define(slots=False)
+@define(slots=True)
 class AirflowDagRunFacet(RunFacet):
     """Composite Airflow DAG run facet."""
 
diff --git a/tests/providers/openlineage/plugins/test_facets.py 
b/tests/providers/openlineage/plugins/test_facets.py
index 73eaebd0c0..0ed5b4bf7c 100644
--- a/tests/providers/openlineage/plugins/test_facets.py
+++ b/tests/providers/openlineage/plugins/test_facets.py
@@ -16,7 +16,17 @@
 # under the License.
 from __future__ import annotations
 
-from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, 
AirflowRunFacet
+import pickle
+
+import attrs
+import pytest
+
+from airflow.providers.openlineage.plugins.facets import (
+    AirflowDagRunFacet,
+    AirflowJobFacet,
+    AirflowRunFacet,
+    AirflowStateRunFacet,
+)
 
 
 def test_airflow_run_facet():
@@ -52,3 +62,57 @@ def test_airflow_dag_run_facet():
 
     assert airflow_dag_run_facet.dag == dag
     assert airflow_dag_run_facet.dagRun == dag_run
+
+
[email protected](
+    "instance",
+    [
+        pytest.param(
+            AirflowJobFacet(
+                taskTree={"task_0": {"section_1.task_3": {}}},
+                taskGroups={
+                    "section_1": {
+                        "parent_group": None,
+                        "tooltip": "",
+                        "ui_color": "CornflowerBlue",
+                        "ui_fgcolor": "#000",
+                        "ui_label": "section_1",
+                    }
+                },
+                tasks={
+                    "task_0": {
+                        "operator": "airflow.operators.bash.BashOperator",
+                        "task_group": None,
+                        "emits_ol_events": True,
+                        "ui_color": "#f0ede4",
+                        "ui_fgcolor": "#000",
+                        "ui_label": "task_0",
+                        "is_setup": False,
+                        "is_teardown": False,
+                    }
+                },
+            ),
+            id="AirflowJobFacet",
+        ),
+        pytest.param(
+            AirflowStateRunFacet(dagRunState="SUCCESS", tasksState={"task_0": 
"SKIPPED"}),
+            id="AirflowStateRunFacet",
+        ),
+        pytest.param(
+            AirflowDagRunFacet(
+                dag={
+                    "timetable": {"delta": 86400.0},
+                    "owner": "airflow",
+                    "start_date": "2024-06-01T00:00:00+00:00",
+                },
+                dagRun={"conf": {}, "dag_id": "dag_id"},
+            ),
+            id="AirflowDagRunFacet",
+        ),
+    ],
+)
+def test_facets_are_pickled_correctly(instance):
+    cls = instance.__class__
+    instance = pickle.loads(pickle.dumps(instance))
+    for field in attrs.fields(cls):
+        getattr(instance, field.name)

Reply via email to