This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f73cf9d9938 Move XComOperatorLink to separate module (#59776)
f73cf9d9938 is described below

commit f73cf9d993892d7d90e1773a89b92db2c01edba2
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Dec 24 10:09:23 2025 +0800

    Move XComOperatorLink to separate module (#59776)
---
 airflow-core/src/airflow/models/mappedoperator.py  |  2 +-
 .../serialization/definitions/baseoperator.py      |  2 +-
 .../serialization/definitions/operatorlink.py      | 76 ++++++++++++++++++++++
 .../airflow/serialization/serialized_objects.py    | 46 +------------
 .../unit/serialization/test_dag_serialization.py   |  6 +-
 5 files changed, 82 insertions(+), 50 deletions(-)

diff --git a/airflow-core/src/airflow/models/mappedoperator.py 
b/airflow-core/src/airflow/models/mappedoperator.py
index 779f07226eb..473ca098613 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -51,7 +51,7 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions._internal.node import DAGNode as 
TaskSDKDAGNode
     from airflow.sdk.definitions.operator_resources import Resources
     from airflow.serialization.definitions.dag import SerializedDAG
-    from airflow.serialization.serialized_objects import XComOperatorLink
+    from airflow.serialization.definitions.operatorlink import XComOperatorLink
     from airflow.task.trigger_rule import TriggerRule
     from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
     from airflow.triggers.base import StartTriggerArgs
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py 
b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 3e8293156c0..0f1c1faf980 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -42,8 +42,8 @@ if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
     from airflow.sdk import Context
     from airflow.serialization.definitions.dag import SerializedDAG
+    from airflow.serialization.definitions.operatorlink import XComOperatorLink
     from airflow.serialization.definitions.taskgroup import 
SerializedMappedTaskGroup, SerializedTaskGroup
-    from airflow.serialization.serialized_objects import XComOperatorLink
     from airflow.task.trigger_rule import TriggerRule
     from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
     from airflow.triggers.base import StartTriggerArgs
diff --git a/airflow-core/src/airflow/serialization/definitions/operatorlink.py 
b/airflow-core/src/airflow/serialization/definitions/operatorlink.py
new file mode 100644
index 00000000000..16a39a9a039
--- /dev/null
+++ b/airflow-core/src/airflow/serialization/definitions/operatorlink.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, TypeAlias
+
+import attrs
+
+from airflow.models.xcom import XComModel
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import create_session
+
+if TYPE_CHECKING:
+    from airflow.models.mappedoperator import MappedOperator as 
SerializedMappedOperator
+    from airflow.models.taskinstancekey import TaskInstanceKey
+    from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
+
+    Operator: TypeAlias = "SerializedMappedOperator | SerializedBaseOperator"
+
+
[email protected]()
+class XComOperatorLink(LoggingMixin):
+    """
+    Generic operator link class that can retrieve link only using XCOMs.
+
+    Used while deserializing operators.
+    """
+
+    name: str
+    xcom_key: str
+
+    def get_link(self, operator: Operator, *, ti_key: TaskInstanceKey) -> str:
+        """
+        Retrieve the link from the XComs.
+
+        :param operator: The Airflow operator object this link is associated 
to.
+        :param ti_key: TaskInstance ID to return link for.
+        :return: link to external system, but by pulling it from XComs
+        """
+        self.log.info(
+            "Attempting to retrieve link from XComs with key: %s for task id: 
%s", self.xcom_key, ti_key
+        )
+        with create_session() as session:
+            value = session.execute(
+                XComModel.get_many(
+                    key=self.xcom_key,
+                    run_id=ti_key.run_id,
+                    dag_ids=ti_key.dag_id,
+                    task_ids=ti_key.task_id,
+                    map_indexes=ti_key.map_index,
+                ).with_only_columns(XComModel.value)
+            ).first()
+        if not value:
+            self.log.debug(
+                "No link with name: %s present in XCom as key: %s, returning 
empty link",
+                self.name,
+                self.xcom_key,
+            )
+            return ""
+        return XComModel.deserialize_value(value)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 5bf74f6ee10..c70deab2e0a 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -48,7 +48,6 @@ from airflow.exceptions import AirflowException, 
DeserializationError, Serializa
 from airflow.models.connection import Connection
 from airflow.models.expandinput import create_expand_input
 from airflow.models.taskinstancekey import TaskInstanceKey
-from airflow.models.xcom import XComModel
 from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
 from airflow.sdk import DAG, Asset, AssetAlias, BaseOperator, XComArg
 from airflow.sdk.bases.operator import OPERATOR_DEFAULTS  # TODO: Copy this 
into the scheduler?
@@ -76,6 +75,7 @@ from airflow.serialization.definitions.assets import (
 from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
 from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.serialization.definitions.node import DAGNode
+from airflow.serialization.definitions.operatorlink import XComOperatorLink
 from airflow.serialization.definitions.param import SerializedParam, 
SerializedParamsDict
 from airflow.serialization.definitions.taskgroup import 
SerializedMappedTaskGroup, SerializedTaskGroup
 from airflow.serialization.encoders import (
@@ -100,8 +100,6 @@ from airflow.triggers.base import BaseTrigger, 
StartTriggerArgs
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor
 from airflow.utils.db import LazySelectSequence
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
     from inspect import Parameter
@@ -2363,48 +2361,6 @@ class LazyDeserializedDAG(pydantic.BaseModel):
         )
 
 
[email protected]()
-class XComOperatorLink(LoggingMixin):
-    """
-    Generic operator link class that can retrieve link only using XCOMs.
-
-    Used while deserializing operators.
-    """
-
-    name: str
-    xcom_key: str
-
-    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
-        """
-        Retrieve the link from the XComs.
-
-        :param operator: The Airflow operator object this link is associated 
to.
-        :param ti_key: TaskInstance ID to return link for.
-        :return: link to external system, but by pulling it from XComs
-        """
-        self.log.info(
-            "Attempting to retrieve link from XComs with key: %s for task id: 
%s", self.xcom_key, ti_key
-        )
-        with create_session() as session:
-            value = session.execute(
-                XComModel.get_many(
-                    key=self.xcom_key,
-                    run_id=ti_key.run_id,
-                    dag_ids=ti_key.dag_id,
-                    task_ids=ti_key.task_id,
-                    map_indexes=ti_key.map_index,
-                ).with_only_columns(XComModel.value)
-            ).first()
-        if not value:
-            self.log.debug(
-                "No link with name: %s present in XCom as key: %s, returning 
empty link",
-                self.name,
-                self.xcom_key,
-            )
-            return ""
-        return XComModel.deserialize_value(value)
-
-
 @overload
 def create_scheduler_operator(op: BaseOperator | SerializedBaseOperator) -> 
SerializedBaseOperator: ...
 
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index cb9883c2d7f..a59e2efe5f3 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -69,8 +69,11 @@ from airflow.sdk.definitions.operator_resources import 
Resources
 from airflow.sdk.definitions.param import Param, ParamsDict
 from airflow.security import permissions
 from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
+from airflow.serialization.definitions.baseoperator import 
SerializedBaseOperator
 from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.serialization.definitions.notset import NOTSET
+from airflow.serialization.definitions.operatorlink import XComOperatorLink
+from airflow.serialization.definitions.param import SerializedParam
 from airflow.serialization.encoders import ensure_serialized_asset
 from airflow.serialization.enums import Encoding
 from airflow.serialization.json_schema import load_dag_schema_dict
@@ -78,9 +81,6 @@ from airflow.serialization.serialized_objects import (
     BaseSerialization,
     DagSerialization,
     OperatorSerialization,
-    SerializedBaseOperator,
-    SerializedParam,
-    XComOperatorLink,
 )
 from airflow.task.priority_strategy import _AbsolutePriorityWeightStrategy, 
_DownstreamPriorityWeightStrategy
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep

Reply via email to