This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 10f250347d openlineage: make value of slots in attrs.define consistent
across all OL usages (#40992)
10f250347d is described below
commit 10f250347d17e8f1362e8fffc3dddce655a11cf7
Author: Kacper Muda <[email protected]>
AuthorDate: Wed Jul 24 14:55:37 2024 +0200
openlineage: make value of slots in attrs.define consistent across all OL
usages (#40992)
Signed-off-by: Kacper Muda <[email protected]>
---
airflow/providers/openlineage/plugins/facets.py | 14 +++++------
airflow/providers/openlineage/utils/utils.py | 14 ++++++++++-
.../guides/developer.rst | 2 +-
tests/providers/openlineage/plugins/test_utils.py | 28 +++++++++++++++++++---
.../openlineage/utils/custom_facet_fixture.py | 2 +-
5 files changed, 47 insertions(+), 13 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/facets.py
b/airflow/providers/openlineage/plugins/facets.py
index 4c0b99d39c..57d6cb1a15 100644
--- a/airflow/providers/openlineage/plugins/facets.py
+++ b/airflow/providers/openlineage/plugins/facets.py
@@ -28,7 +28,7 @@ from airflow.exceptions import
AirflowProviderDeprecationWarning
reason="To be removed in the next release. Make sure to use information
from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
-@define(slots=False)
+@define
class AirflowMappedTaskRunFacet(RunFacet):
"""Run facet containing information about mapped tasks."""
@@ -47,7 +47,7 @@ class AirflowMappedTaskRunFacet(RunFacet):
)
-@define(slots=True)
+@define
class AirflowJobFacet(JobFacet):
"""
Composite Airflow job facet.
@@ -70,7 +70,7 @@ class AirflowJobFacet(JobFacet):
tasks: dict
-@define(slots=True)
+@define
class AirflowStateRunFacet(RunFacet):
"""
Airflow facet providing state information.
@@ -89,7 +89,7 @@ class AirflowStateRunFacet(RunFacet):
tasksState: dict[str, str]
-@define(slots=False)
+@define
class AirflowRunFacet(RunFacet):
"""Composite Airflow run facet."""
@@ -100,7 +100,7 @@ class AirflowRunFacet(RunFacet):
taskUuid: str
-@define(slots=True)
+@define
class AirflowDagRunFacet(RunFacet):
"""Composite Airflow DAG run facet."""
@@ -108,7 +108,7 @@ class AirflowDagRunFacet(RunFacet):
dagRun: dict
-@define(slots=False)
+@define
class UnknownOperatorInstance(RedactMixin):
"""
Describes an unknown operator.
@@ -127,7 +127,7 @@ class UnknownOperatorInstance(RedactMixin):
reason="To be removed in the next release. Make sure to use information
from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
-@define(slots=False)
+@define
class UnknownOperatorAttributeRunFacet(RunFacet):
"""RunFacet that describes unknown operators in an Airflow DAG."""
diff --git a/airflow/providers/openlineage/utils/utils.py
b/airflow/providers/openlineage/utils/utils.py
index 2e995fb7a8..17eb522a6d 100644
--- a/airflow/providers/openlineage/utils/utils.py
+++ b/airflow/providers/openlineage/utils/utils.py
@@ -219,7 +219,19 @@ class InfoJsonEncodable(dict):
setattr(self, field, getattr(self.obj, field))
self._fields.append(field)
else:
- for field, val in self.obj.__dict__.items():
+ if hasattr(self.obj, "__dict__"):
+ obj_fields = self.obj.__dict__
+ elif attrs.has(self.obj.__class__): # e.g. attrs.define class
with slots=True has no __dict__
+ obj_fields = {
+ field.name: getattr(self.obj, field.name) for field in
attrs.fields(self.obj.__class__)
+ }
+ else:
+ raise ValueError(
+ "Cannot iterate over fields: "
+ f"The object of type {type(self.obj).__name__} neither has
a __dict__ attribute "
+ "nor is defined as an attrs class."
+ )
+ for field, val in obj_fields.items():
if field not in self._fields and field not in self.excludes
and field not in self.renames:
setattr(self, field, val)
self._fields.append(field)
diff --git a/docs/apache-airflow-providers-openlineage/guides/developer.rst
b/docs/apache-airflow-providers-openlineage/guides/developer.rst
index 4e9ada44c2..9b56de3977 100644
--- a/docs/apache-airflow-providers-openlineage/guides/developer.rst
+++ b/docs/apache-airflow-providers-openlineage/guides/developer.rst
@@ -481,7 +481,7 @@ Writing a custom facet function
from airflow.providers.common.compat.openlineage.facet import RunFacet
- @attrs.define(slots=False)
+ @attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
diff --git a/tests/providers/openlineage/plugins/test_utils.py
b/tests/providers/openlineage/plugins/test_utils.py
index 8ca245d1f3..962429e30e 100644
--- a/tests/providers/openlineage/plugins/test_utils.py
+++ b/tests/providers/openlineage/plugins/test_utils.py
@@ -102,6 +102,28 @@ def test_info_json_encodable():
casts = {"iwanttobeint": lambda x: int(x.imastring)}
renames = {"_faulty_name": "goody_name"}
+ @define
+ class Test:
+ exclude_1: str
+ imastring: str
+ _faulty_name: str
+ donotcare: str
+
+ obj = Test("val", "123", "not_funny", "abc")
+
+ assert json.loads(json.dumps(TestInfo(obj))) == {
+ "iwanttobeint": 123,
+ "goody_name": "not_funny",
+ "donotcare": "abc",
+ }
+
+
+def test_info_json_encodable_without_slots():
+ class TestInfo(InfoJsonEncodable):
+ excludes = ["exclude_1", "exclude_2", "imastring"]
+ casts = {"iwanttobeint": lambda x: int(x.imastring)}
+ renames = {"_faulty_name": "goody_name"}
+
@define(slots=False)
class Test:
exclude_1: str
@@ -122,7 +144,7 @@ def test_info_json_encodable_list_does_not_flatten():
class TestInfo(InfoJsonEncodable):
includes = ["alist"]
- @define(slots=False)
+ @define
class Test:
alist: list[str]
@@ -135,7 +157,7 @@ def
test_info_json_encodable_list_does_include_nonexisting():
class TestInfo(InfoJsonEncodable):
includes = ["exists", "doesnotexist"]
- @define(slots=False)
+ @define
class Test:
exists: str
@@ -191,7 +213,7 @@ def test_redact_with_exclusions(monkeypatch):
self.password = "passwd"
self.transparent = "123"
- @define(slots=False)
+ @define
class NestedMixined(RedactMixin):
_skip_redact = ["nested_field"]
password: str
diff --git a/tests/providers/openlineage/utils/custom_facet_fixture.py
b/tests/providers/openlineage/utils/custom_facet_fixture.py
index 6b9d0edcce..040c8c774c 100644
--- a/tests/providers/openlineage/utils/custom_facet_fixture.py
+++ b/tests/providers/openlineage/utils/custom_facet_fixture.py
@@ -26,7 +26,7 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
[email protected](slots=False)
[email protected]
class MyCustomRunFacet(RunFacet):
"""Define a custom run facet."""