This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3318212482 Fix `operator_extra_links` property serialization in mapped
tasks (#31904)
3318212482 is described below
commit 3318212482c6e11ac5c2e2828f7e467bca5b7245
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jul 6 07:50:06 2023 +0200
Fix `operator_extra_links` property serialization in mapped tasks (#31904)
* reproduce the problem in a unit test
* Fix operator_extra_links serialization
* replace fget by __get__
---
airflow/serialization/serialized_objects.py | 6 ++--
tests/serialization/test_dag_serialization.py | 40 ++++++++++++++++++++++++++-
2 files changed, 43 insertions(+), 3 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 4f5c61943e..60e2d4b02d 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -287,7 +287,7 @@ class BaseSerialization:
_datetime_types = (datetime.datetime,)
# Object types that are always excluded in serialization.
- _excluded_types = (logging.Logger, Connection, type)
+ _excluded_types = (logging.Logger, Connection, type, property)
_json_schema: Validator | None = None
@@ -822,7 +822,9 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
if op.operator_extra_links:
serialize_op["_operator_extra_links"] =
cls._serialize_operator_extra_links(
- op.operator_extra_links
+ op.operator_extra_links.__get__(op)
+ if isinstance(op.operator_extra_links, property)
+ else op.operator_extra_links
)
if include_deps:
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 69a2c9df22..c0c0f2f0cd 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -68,7 +68,7 @@ from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import TaskGroup
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.config import conf_vars
-from tests.test_utils.mock_operators import CustomOperator, GoogleLink,
MockOperator
+from tests.test_utils.mock_operators import AirflowLink2, CustomOperator,
GoogleLink, MockOperator
from tests.test_utils.timetables import CustomSerializationTimetable,
cron_timetable, delta_timetable
repo_root = Path(airflow.__file__).parent.parent
@@ -2479,3 +2479,41 @@ def test_mapped_task_group_serde():
serde_tg = serde_dag.task_group.children["tg"]
assert isinstance(serde_tg, MappedTaskGroup)
assert serde_tg._expand_input == DictOfListsExpandInput({"a": [".", ".."]})
+
+
+def test_mapped_task_with_operator_extra_links_property():
+ class _DummyOperator(BaseOperator):
+ def __init__(self, inputs, **kwargs):
+ super().__init__(**kwargs)
+ self.inputs = inputs
+
+ @property
+ def operator_extra_links(self):
+ return (AirflowLink2(),)
+
+ with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+ _DummyOperator.partial(task_id="task").expand(inputs=[1, 2, 3])
+ serialized_dag = SerializedBaseOperator.serialize(dag)
+ assert serialized_dag["tasks"][0] == {
+ "task_id": "task",
+ "expand_input": {
+ "type": "dict-of-lists",
+ "value": {"__type": "dict", "__var": {"inputs": [1, 2, 3]}},
+ },
+ "partial_kwargs": {},
+ "_disallow_kwargs_override": False,
+ "_expand_input_attr": "expand_input",
+ "downstream_task_ids": [],
+ "_operator_extra_links":
[{"tests.test_utils.mock_operators.AirflowLink2": {}}],
+ "ui_color": "#fff",
+ "ui_fgcolor": "#000",
+ "template_ext": [],
+ "template_fields": [],
+ "template_fields_renderers": {},
+ "_task_type": "_DummyOperator",
+ "_task_module": "tests.serialization.test_dag_serialization",
+ "_is_empty": False,
+ "_is_mapped": True,
+ }
+ deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag)
+ assert deserialized_dag.task_dict["task"].operator_extra_links ==
[AirflowLink2()]