This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 37b03c1d541 Move XComOperatorLink to serialized_objects as only API 
server uses it (#47674)
37b03c1d541 is described below

commit 37b03c1d541a26bc90f9e6ed5ac3868867369824
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Mar 12 21:47:22 2025 +0530

    Move XComOperatorLink to serialized_objects as only API server uses it 
(#47674)
---
 airflow/serialization/serialized_objects.py        | 38 +++++++++++++++++++++-
 .../airflow/sdk/definitions/baseoperatorlink.py    | 38 ----------------------
 tests/serialization/test_dag_serialization.py      |  2 +-
 3 files changed, 38 insertions(+), 40 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 0bffc2e831b..aa8791cc67a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -48,6 +48,7 @@ from airflow.models.expandinput import (
 )
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
 from airflow.models.taskinstancekey import TaskInstanceKey
+from airflow.models.xcom import BaseXCom
 from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
 from airflow.providers_manager import ProvidersManager
 from airflow.sdk.definitions.asset import (
@@ -63,7 +64,6 @@ from airflow.sdk.definitions.asset import (
     BaseAsset,
 )
 from airflow.sdk.definitions.baseoperator import BaseOperator as 
TaskSDKBaseOperator
-from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
 from airflow.sdk.definitions.mappedoperator import MappedOperator
 from airflow.sdk.definitions.param import Param, ParamsDict
 from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
@@ -88,6 +88,7 @@ from airflow.utils.context import (
 )
 from airflow.utils.db import LazySelectSequence
 from airflow.utils.docs import get_docs_url
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string, qualname
 from airflow.utils.operator_resources import Resources
 from airflow.utils.timezone import from_timestamp, parse_timezone
@@ -1991,3 +1992,38 @@ class LazyDeserializedDAG(pydantic.BaseModel):
         access_control: Mapping[str, Mapping[str, Collection[str]] | 
Collection[str]] | None = pydantic.Field(
             init=False, default=None
         )
+
+
[email protected]()
+class XComOperatorLink(LoggingMixin):
+    """A generic operator link class that can retrieve link only using XCOMs. 
Used while deserializing operators."""
+
+    name: str
+    xcom_key: str
+
+    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
+        """
+        Retrieve the link from the XComs.
+
+        :param operator: The Airflow operator object this link is associated 
to.
+        :param ti_key: TaskInstance ID to return link for.
+        :return: link to external system, but by pulling it from XComs
+        """
+        self.log.info(
+            "Attempting to retrieve link from XComs with key: %s for task id: 
%s", self.xcom_key, ti_key
+        )
+        value = BaseXCom.get_one(
+            key=self.xcom_key,
+            run_id=ti_key.run_id,
+            dag_id=ti_key.dag_id,
+            task_id=ti_key.task_id,
+            map_index=ti_key.map_index,
+        )
+        if not value:
+            self.log.debug(
+                "No link with name: %s present in XCom as key: %s, returning 
empty link",
+                self.name,
+                self.xcom_key,
+            )
+            return ""
+        return value
diff --git a/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py 
b/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
index 0da1d3f2b6d..06f5e4d98e7 100644
--- a/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
+++ b/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
@@ -22,49 +22,11 @@ from typing import TYPE_CHECKING, ClassVar
 
 import attrs
 
-from airflow.models.xcom import BaseXCom
-from airflow.utils.log.logging_mixin import LoggingMixin
-
 if TYPE_CHECKING:
     from airflow.models.baseoperator import BaseOperator
     from airflow.models.taskinstancekey import TaskInstanceKey
 
 
[email protected]()
-class XComOperatorLink(LoggingMixin):
-    """A generic operator link class that can retrieve link only using XCOMs. 
Used while deserializing operators."""
-
-    name: str
-    xcom_key: str
-
-    def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> 
str:
-        """
-        Retrieve the link from the XComs.
-
-        :param operator: The Airflow operator object this link is associated 
to.
-        :param ti_key: TaskInstance ID to return link for.
-        :return: link to external system, but by pulling it from XComs
-        """
-        self.log.info(
-            "Attempting to retrieve link from XComs with key: %s for task id: 
%s", self.xcom_key, ti_key
-        )
-        value = BaseXCom.get_one(
-            key=self.xcom_key,
-            run_id=ti_key.run_id,
-            dag_id=ti_key.dag_id,
-            task_id=ti_key.task_id,
-            map_index=ti_key.map_index,
-        )
-        if not value:
-            self.log.debug(
-                "No link with name: %s present in XCom as key: %s, returning 
empty link",
-                self.name,
-                self.xcom_key,
-            )
-            return ""
-        return value
-
-
 @attrs.define()
 class BaseOperatorLink(metaclass=ABCMeta):
     """Abstract base class that defines how we get an operator link."""
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index d4f3d0140a8..7cf606b4058 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -67,7 +67,6 @@ from airflow.providers.cncf.kubernetes.pod_generator import 
PodGenerator
 from airflow.providers.standard.operators.bash import BashOperator
 from airflow.providers.standard.sensors.bash import BashSensor
 from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
 from airflow.sdk.definitions.param import Param, ParamsDict
 from airflow.security import permissions
 from airflow.serialization.enums import Encoding
@@ -76,6 +75,7 @@ from airflow.serialization.serialized_objects import (
     BaseSerialization,
     SerializedBaseOperator,
     SerializedDAG,
+    XComOperatorLink,
 )
 from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
 from airflow.timetables.simple import NullTimetable, OnceTimetable

Reply via email to