This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f73cf9d9938 Move XComOperatorLink to separate module (#59776)
f73cf9d9938 is described below
commit f73cf9d993892d7d90e1773a89b92db2c01edba2
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Dec 24 10:09:23 2025 +0800
Move XComOperatorLink to separate module (#59776)
---
airflow-core/src/airflow/models/mappedoperator.py | 2 +-
.../serialization/definitions/baseoperator.py | 2 +-
.../serialization/definitions/operatorlink.py | 76 ++++++++++++++++++++++
.../airflow/serialization/serialized_objects.py | 46 +------------
.../unit/serialization/test_dag_serialization.py | 6 +-
5 files changed, 82 insertions(+), 50 deletions(-)
diff --git a/airflow-core/src/airflow/models/mappedoperator.py
b/airflow-core/src/airflow/models/mappedoperator.py
index 779f07226eb..473ca098613 100644
--- a/airflow-core/src/airflow/models/mappedoperator.py
+++ b/airflow-core/src/airflow/models/mappedoperator.py
@@ -51,7 +51,7 @@ if TYPE_CHECKING:
from airflow.sdk.definitions._internal.node import DAGNode as
TaskSDKDAGNode
from airflow.sdk.definitions.operator_resources import Resources
from airflow.serialization.definitions.dag import SerializedDAG
- from airflow.serialization.serialized_objects import XComOperatorLink
+ from airflow.serialization.definitions.operatorlink import XComOperatorLink
from airflow.task.trigger_rule import TriggerRule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 3e8293156c0..0f1c1faf980 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -42,8 +42,8 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
from airflow.sdk import Context
from airflow.serialization.definitions.dag import SerializedDAG
+ from airflow.serialization.definitions.operatorlink import XComOperatorLink
from airflow.serialization.definitions.taskgroup import
SerializedMappedTaskGroup, SerializedTaskGroup
- from airflow.serialization.serialized_objects import XComOperatorLink
from airflow.task.trigger_rule import TriggerRule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.triggers.base import StartTriggerArgs
diff --git a/airflow-core/src/airflow/serialization/definitions/operatorlink.py
b/airflow-core/src/airflow/serialization/definitions/operatorlink.py
new file mode 100644
index 00000000000..16a39a9a039
--- /dev/null
+++ b/airflow-core/src/airflow/serialization/definitions/operatorlink.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, TypeAlias
+
+import attrs
+
+from airflow.models.xcom import XComModel
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.session import create_session
+
+if TYPE_CHECKING:
+ from airflow.models.mappedoperator import MappedOperator as
SerializedMappedOperator
+ from airflow.models.taskinstancekey import TaskInstanceKey
+ from airflow.serialization.definitions.baseoperator import
SerializedBaseOperator
+
+ Operator: TypeAlias = "SerializedMappedOperator | SerializedBaseOperator"
+
+
[email protected]()
+class XComOperatorLink(LoggingMixin):
+ """
+ Generic operator link class that can retrieve link only using XCOMs.
+
+ Used while deserializing operators.
+ """
+
+ name: str
+ xcom_key: str
+
+ def get_link(self, operator: Operator, *, ti_key: TaskInstanceKey) -> str:
+ """
+ Retrieve the link from the XComs.
+
+ :param operator: The Airflow operator object this link is associated
to.
+ :param ti_key: TaskInstance ID to return link for.
+ :return: link to external system, but by pulling it from XComs
+ """
+ self.log.info(
+ "Attempting to retrieve link from XComs with key: %s for task id:
%s", self.xcom_key, ti_key
+ )
+ with create_session() as session:
+ value = session.execute(
+ XComModel.get_many(
+ key=self.xcom_key,
+ run_id=ti_key.run_id,
+ dag_ids=ti_key.dag_id,
+ task_ids=ti_key.task_id,
+ map_indexes=ti_key.map_index,
+ ).with_only_columns(XComModel.value)
+ ).first()
+ if not value:
+ self.log.debug(
+ "No link with name: %s present in XCom as key: %s, returning
empty link",
+ self.name,
+ self.xcom_key,
+ )
+ return ""
+ return XComModel.deserialize_value(value)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 5bf74f6ee10..c70deab2e0a 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -48,7 +48,6 @@ from airflow.exceptions import AirflowException,
DeserializationError, Serializa
from airflow.models.connection import Connection
from airflow.models.expandinput import create_expand_input
from airflow.models.taskinstancekey import TaskInstanceKey
-from airflow.models.xcom import XComModel
from airflow.models.xcom_arg import SchedulerXComArg, deserialize_xcom_arg
from airflow.sdk import DAG, Asset, AssetAlias, BaseOperator, XComArg
from airflow.sdk.bases.operator import OPERATOR_DEFAULTS # TODO: Copy this
into the scheduler?
@@ -76,6 +75,7 @@ from airflow.serialization.definitions.assets import (
from airflow.serialization.definitions.baseoperator import
SerializedBaseOperator
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.serialization.definitions.node import DAGNode
+from airflow.serialization.definitions.operatorlink import XComOperatorLink
from airflow.serialization.definitions.param import SerializedParam,
SerializedParamsDict
from airflow.serialization.definitions.taskgroup import
SerializedMappedTaskGroup, SerializedTaskGroup
from airflow.serialization.encoders import (
@@ -100,8 +100,6 @@ from airflow.triggers.base import BaseTrigger,
StartTriggerArgs
from airflow.utils.code_utils import get_python_source
from airflow.utils.context import ConnectionAccessor, Context, VariableAccessor
from airflow.utils.db import LazySelectSequence
-from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import create_session
if TYPE_CHECKING:
from inspect import Parameter
@@ -2363,48 +2361,6 @@ class LazyDeserializedDAG(pydantic.BaseModel):
)
[email protected]()
-class XComOperatorLink(LoggingMixin):
- """
- Generic operator link class that can retrieve link only using XCOMs.
-
- Used while deserializing operators.
- """
-
- name: str
- xcom_key: str
-
- def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) ->
str:
- """
- Retrieve the link from the XComs.
-
- :param operator: The Airflow operator object this link is associated
to.
- :param ti_key: TaskInstance ID to return link for.
- :return: link to external system, but by pulling it from XComs
- """
- self.log.info(
- "Attempting to retrieve link from XComs with key: %s for task id:
%s", self.xcom_key, ti_key
- )
- with create_session() as session:
- value = session.execute(
- XComModel.get_many(
- key=self.xcom_key,
- run_id=ti_key.run_id,
- dag_ids=ti_key.dag_id,
- task_ids=ti_key.task_id,
- map_indexes=ti_key.map_index,
- ).with_only_columns(XComModel.value)
- ).first()
- if not value:
- self.log.debug(
- "No link with name: %s present in XCom as key: %s, returning
empty link",
- self.name,
- self.xcom_key,
- )
- return ""
- return XComModel.deserialize_value(value)
-
-
@overload
def create_scheduler_operator(op: BaseOperator | SerializedBaseOperator) ->
SerializedBaseOperator: ...
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index cb9883c2d7f..a59e2efe5f3 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -69,8 +69,11 @@ from airflow.sdk.definitions.operator_resources import
Resources
from airflow.sdk.definitions.param import Param, ParamsDict
from airflow.security import permissions
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
+from airflow.serialization.definitions.baseoperator import
SerializedBaseOperator
from airflow.serialization.definitions.dag import SerializedDAG
from airflow.serialization.definitions.notset import NOTSET
+from airflow.serialization.definitions.operatorlink import XComOperatorLink
+from airflow.serialization.definitions.param import SerializedParam
from airflow.serialization.encoders import ensure_serialized_asset
from airflow.serialization.enums import Encoding
from airflow.serialization.json_schema import load_dag_schema_dict
@@ -78,9 +81,6 @@ from airflow.serialization.serialized_objects import (
BaseSerialization,
DagSerialization,
OperatorSerialization,
- SerializedBaseOperator,
- SerializedParam,
- XComOperatorLink,
)
from airflow.task.priority_strategy import _AbsolutePriorityWeightStrategy,
_DownstreamPriorityWeightStrategy
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep