This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 37b03c1d541 Move XComOperatorLink to serialized_objects as only API
server uses it (#47674)
37b03c1d541 is described below
commit 37b03c1d541a26bc90f9e6ed5ac3868867369824
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Mar 12 21:47:22 2025 +0530
Move XComOperatorLink to serialized_objects as only API server uses it
(#47674)
---
airflow/serialization/serialized_objects.py | 38 +++++++++++++++++++++-
.../airflow/sdk/definitions/baseoperatorlink.py | 38 ----------------------
tests/serialization/test_dag_serialization.py | 2 +-
3 files changed, 38 insertions(+), 40 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 0bffc2e831b..aa8791cc67a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -48,6 +48,7 @@ from airflow.models.expandinput import (
)
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
+from airflow.models.xcom import BaseXCom
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
from airflow.providers_manager import ProvidersManager
from airflow.sdk.definitions.asset import (
@@ -63,7 +64,6 @@ from airflow.sdk.definitions.asset import (
BaseAsset,
)
from airflow.sdk.definitions.baseoperator import BaseOperator as
TaskSDKBaseOperator
-from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.sdk.definitions.taskgroup import MappedTaskGroup, TaskGroup
@@ -88,6 +88,7 @@ from airflow.utils.context import (
)
from airflow.utils.db import LazySelectSequence
from airflow.utils.docs import get_docs_url
+from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.timezone import from_timestamp, parse_timezone
@@ -1991,3 +1992,38 @@ class LazyDeserializedDAG(pydantic.BaseModel):
access_control: Mapping[str, Mapping[str, Collection[str]] |
Collection[str]] | None = pydantic.Field(
init=False, default=None
)
+
+
[email protected]()
+class XComOperatorLink(LoggingMixin):
+ """A generic operator link class that can retrieve link only using XCOMs.
Used while deserializing operators."""
+
+ name: str
+ xcom_key: str
+
+ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
+ """
+ Retrieve the link from the XComs.
+
+ :param operator: The Airflow operator object this link is associated
to.
+ :param ti_key: TaskInstance ID to return link for.
+ :return: link to external system, but by pulling it from XComs
+ """
+ self.log.info(
+ "Attempting to retrieve link from XComs with key: %s for task id:
%s", self.xcom_key, ti_key
+ )
+ value = BaseXCom.get_one(
+ key=self.xcom_key,
+ run_id=ti_key.run_id,
+ dag_id=ti_key.dag_id,
+ task_id=ti_key.task_id,
+ map_index=ti_key.map_index,
+ )
+ if not value:
+ self.log.debug(
+ "No link with name: %s present in XCom as key: %s, returning
empty link",
+ self.name,
+ self.xcom_key,
+ )
+ return ""
+ return value
diff --git a/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
b/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
index 0da1d3f2b6d..06f5e4d98e7 100644
--- a/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
+++ b/task-sdk/src/airflow/sdk/definitions/baseoperatorlink.py
@@ -22,49 +22,11 @@ from typing import TYPE_CHECKING, ClassVar
import attrs
-from airflow.models.xcom import BaseXCom
-from airflow.utils.log.logging_mixin import LoggingMixin
-
if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskinstancekey import TaskInstanceKey
[email protected]()
-class XComOperatorLink(LoggingMixin):
- """A generic operator link class that can retrieve link only using XCOMs.
Used while deserializing operators."""
-
- name: str
- xcom_key: str
-
- def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
- """
- Retrieve the link from the XComs.
-
- :param operator: The Airflow operator object this link is associated
to.
- :param ti_key: TaskInstance ID to return link for.
- :return: link to external system, but by pulling it from XComs
- """
- self.log.info(
- "Attempting to retrieve link from XComs with key: %s for task id:
%s", self.xcom_key, ti_key
- )
- value = BaseXCom.get_one(
- key=self.xcom_key,
- run_id=ti_key.run_id,
- dag_id=ti_key.dag_id,
- task_id=ti_key.task_id,
- map_index=ti_key.map_index,
- )
- if not value:
- self.log.debug(
- "No link with name: %s present in XCom as key: %s, returning
empty link",
- self.name,
- self.xcom_key,
- )
- return ""
- return value
-
-
@attrs.define()
class BaseOperatorLink(metaclass=ABCMeta):
"""Abstract base class that defines how we get an operator link."""
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index d4f3d0140a8..7cf606b4058 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -67,7 +67,6 @@ from airflow.providers.cncf.kubernetes.pod_generator import
PodGenerator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.baseoperatorlink import XComOperatorLink
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.security import permissions
from airflow.serialization.enums import Encoding
@@ -76,6 +75,7 @@ from airflow.serialization.serialized_objects import (
BaseSerialization,
SerializedBaseOperator,
SerializedDAG,
+ XComOperatorLink,
)
from airflow.task.priority_strategy import _DownstreamPriorityWeightStrategy
from airflow.timetables.simple import NullTimetable, OnceTimetable