This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3318212482 Fix `operator_extra_links` property serialization in mapped 
tasks (#31904)
3318212482 is described below

commit 3318212482c6e11ac5c2e2828f7e467bca5b7245
Author: Hussein Awala <[email protected]>
AuthorDate: Thu Jul 6 07:50:06 2023 +0200

    Fix `operator_extra_links` property serialization in mapped tasks (#31904)
    
    * reproduce the problem in a unit test
    
    * Fix operator_extra_links serialization
    
    * replace fget by __get__
---
 airflow/serialization/serialized_objects.py   |  6 ++--
 tests/serialization/test_dag_serialization.py | 40 ++++++++++++++++++++++++++-
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 4f5c61943e..60e2d4b02d 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -287,7 +287,7 @@ class BaseSerialization:
     _datetime_types = (datetime.datetime,)
 
     # Object types that are always excluded in serialization.
-    _excluded_types = (logging.Logger, Connection, type)
+    _excluded_types = (logging.Logger, Connection, type, property)
 
     _json_schema: Validator | None = None
 
@@ -822,7 +822,9 @@ class SerializedBaseOperator(BaseOperator, 
BaseSerialization):
 
         if op.operator_extra_links:
             serialize_op["_operator_extra_links"] = 
cls._serialize_operator_extra_links(
-                op.operator_extra_links
+                op.operator_extra_links.__get__(op)
+                if isinstance(op.operator_extra_links, property)
+                else op.operator_extra_links
             )
 
         if include_deps:
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 69a2c9df22..c0c0f2f0cd 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -68,7 +68,7 @@ from airflow.utils.operator_resources import Resources
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.test_utils.config import conf_vars
-from tests.test_utils.mock_operators import CustomOperator, GoogleLink, 
MockOperator
+from tests.test_utils.mock_operators import AirflowLink2, CustomOperator, 
GoogleLink, MockOperator
 from tests.test_utils.timetables import CustomSerializationTimetable, 
cron_timetable, delta_timetable
 
 repo_root = Path(airflow.__file__).parent.parent
@@ -2479,3 +2479,41 @@ def test_mapped_task_group_serde():
     serde_tg = serde_dag.task_group.children["tg"]
     assert isinstance(serde_tg, MappedTaskGroup)
     assert serde_tg._expand_input == DictOfListsExpandInput({"a": [".", ".."]})
+
+
+def test_mapped_task_with_operator_extra_links_property():
+    class _DummyOperator(BaseOperator):
+        def __init__(self, inputs, **kwargs):
+            super().__init__(**kwargs)
+            self.inputs = inputs
+
+        @property
+        def operator_extra_links(self):
+            return (AirflowLink2(),)
+
+    with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag:
+        _DummyOperator.partial(task_id="task").expand(inputs=[1, 2, 3])
+    serialized_dag = SerializedBaseOperator.serialize(dag)
+    assert serialized_dag["tasks"][0] == {
+        "task_id": "task",
+        "expand_input": {
+            "type": "dict-of-lists",
+            "value": {"__type": "dict", "__var": {"inputs": [1, 2, 3]}},
+        },
+        "partial_kwargs": {},
+        "_disallow_kwargs_override": False,
+        "_expand_input_attr": "expand_input",
+        "downstream_task_ids": [],
+        "_operator_extra_links": 
[{"tests.test_utils.mock_operators.AirflowLink2": {}}],
+        "ui_color": "#fff",
+        "ui_fgcolor": "#000",
+        "template_ext": [],
+        "template_fields": [],
+        "template_fields_renderers": {},
+        "_task_type": "_DummyOperator",
+        "_task_module": "tests.serialization.test_dag_serialization",
+        "_is_empty": False,
+        "_is_mapped": True,
+    }
+    deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag)
+    assert deserialized_dag.task_dict["task"].operator_extra_links == 
[AirflowLink2()]

Reply via email to