This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ab105ab65 Refactor out xcom constants from models (#30180)
1ab105ab65 is described below

commit 1ab105ab6596e1f0a023a307df8ab1f42ade8282
Author: Vladimir Mikhaylov <[email protected]>
AuthorDate: Tue Mar 21 13:01:23 2023 +0000

    Refactor out xcom constants from models (#30180)
---
 airflow/models/__init__.py                    |  4 +---
 airflow/models/baseoperator.py                |  2 +-
 airflow/models/mappedoperator.py              |  3 ++-
 airflow/models/taskinstance.py                |  3 ++-
 airflow/models/xcom.py                        | 12 +++++++-----
 airflow/models/xcom_arg.py                    |  2 +-
 airflow/utils/xcom.py                         | 24 ++++++++++++++++++++++++
 tests/decorators/test_python.py               |  2 +-
 tests/models/test_mappedoperator.py           |  2 +-
 tests/models/test_taskinstance.py             |  5 +++--
 tests/models/test_xcom.py                     |  3 ++-
 tests/serialization/test_dag_serialization.py |  3 ++-
 12 files changed, 47 insertions(+), 18 deletions(-)

diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index c750757cb0..281ffcfd67 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -22,7 +22,6 @@ from __future__ import annotations
 __all__ = [
     "DAG",
     "ID_LEN",
-    "XCOM_RETURN_KEY",
     "Base",
     "BaseOperator",
     "BaseOperatorLink",
@@ -89,7 +88,6 @@ def __getattr__(name):
 __lazy_imports = {
     "DAG": "airflow.models.dag",
     "ID_LEN": "airflow.models.base",
-    "XCOM_RETURN_KEY": "airflow.models.xcom",
     "Base": "airflow.models.base",
     "BaseOperator": "airflow.models.baseoperator",
     "BaseOperatorLink": "airflow.models.baseoperator",
@@ -143,4 +141,4 @@ if TYPE_CHECKING:
     from airflow.models.taskreschedule import TaskReschedule
     from airflow.models.trigger import Trigger
     from airflow.models.variable import Variable
-    from airflow.models.xcom import XCOM_RETURN_KEY, XCom
+    from airflow.models.xcom import XCom
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index a120168685..d78152d990 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -75,7 +75,6 @@ from airflow.models.param import ParamsDict
 from airflow.models.pool import Pool
 from airflow.models.taskinstance import TaskInstance, clear_task_instances
 from airflow.models.taskmixin import DAGNode, DependencyMixin
-from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.serialization.enums import DagAttributeTypes
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
@@ -92,6 +91,7 @@ from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.setup_teardown import SetupTeardownContext
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import WeightRule
+from airflow.utils.xcom import XCOM_RETURN_KEY
 
 if TYPE_CHECKING:
     import jinja2  # Slow import.
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2cbd46cd57..a10fd10cdc 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -66,6 +66,7 @@ from airflow.utils.helpers import is_container, 
prevent_duplicates
 from airflow.utils.operator_resources import Resources
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import NOTSET
+from airflow.utils.xcom import XCOM_RETURN_KEY
 
 if TYPE_CHECKING:
     import jinja2  # Slow import.
@@ -111,7 +112,7 @@ def validate_mapping_kwargs(op: type[BaseOperator], func: 
ValidationSource, valu
 
 
 def ensure_xcomarg_return_value(arg: Any) -> None:
-    from airflow.models.xcom_arg import XCOM_RETURN_KEY, XComArg
+    from airflow.models.xcom_arg import XComArg
 
     if isinstance(arg, XComArg):
         for operator, key in arg.iter_references():
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 0406d2158b..e4dbcb0e5e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.models.param import process_params
 from airflow.models.taskfail import TaskFail
 from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
-from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
+from airflow.models.xcom import LazyXComAccess, XCom
 from airflow.plugins_manager import integrate_macros_plugins
 from airflow.sentry import Sentry
 from airflow.stats import Stats
@@ -122,6 +122,7 @@ from airflow.utils.sqlalchemy import (
 )
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.timeout import timeout
+from airflow.utils.xcom import XCOM_RETURN_KEY
 
 TR = TaskReschedule
 
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 62e773c617..476b170deb 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -58,12 +58,14 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
-log = logging.getLogger(__name__)
+# XCom constants below are needed for providers backward compatibility,
+# which should import the constants directly after apache-airflow>=2.6.0
+from airflow.utils.xcom import (
+    MAX_XCOM_SIZE,  # noqa: F401
+    XCOM_RETURN_KEY,
+)
 
-# MAX XCOM Size is 48KB
-# https://github.com/apache/airflow/pull/1618#discussion_r68249677
-MAX_XCOM_SIZE = 49344
-XCOM_RETURN_KEY = "return_value"
+log = logging.getLogger(__name__)
 
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstanceKey
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 133fd4280b..7f65cd3d94 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -28,12 +28,12 @@ from airflow.exceptions import XComNotFound
 from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.mappedoperator import MappedOperator
 from airflow.models.taskmixin import DAGNode, DependencyMixin
-from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.utils.context import Context
 from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.mixins import ResolveMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.types import NOTSET, ArgNotSet
+from airflow.utils.xcom import XCOM_RETURN_KEY
 
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
diff --git a/airflow/utils/xcom.py b/airflow/utils/xcom.py
new file mode 100644
index 0000000000..b05b881fa6
--- /dev/null
+++ b/airflow/utils/xcom.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# MAX XCOM Size is 48KB
+# https://github.com/apache/airflow/pull/1618#discussion_r68249677
+from __future__ import annotations
+
+MAX_XCOM_SIZE = 49344
+XCOM_RETURN_KEY = "return_value"
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 354ac5fc2e..163a220e75 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -32,12 +32,12 @@ from airflow.models.expandinput import 
DictOfListsExpandInput
 from airflow.models.mappedoperator import MappedOperator
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskmap import TaskMap
-from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.models.xcom_arg import PlainXComArg, XComArg
 from airflow.utils import timezone
 from airflow.utils.state import State
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.operators.test_python import BasePythonTest
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git a/tests/models/test_mappedoperator.py 
b/tests/models/test_mappedoperator.py
index bdfcf8bc7f..931d262d24 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -28,10 +28,10 @@ from airflow.models.mappedoperator import MappedOperator
 from airflow.models.param import ParamsDict
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskmap import TaskMap
-from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.models.xcom_arg import XComArg
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.models import DEFAULT_DATE
 from tests.test_utils.mapping import expand_mapped_task
 from tests.test_utils.mock_operators import MockOperator, 
MockOperatorWithNestedFields, NestedFields
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 6c331e5ae7..2d86cdfde5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -63,7 +63,7 @@ from airflow.models.taskinstance import TaskInstance, 
TaskInstance as TI
 from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.variable import Variable
-from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
+from airflow.models.xcom import LazyXComAccess, XCom
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
@@ -84,6 +84,7 @@ from airflow.utils.session import create_session, 
provide_session
 from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from airflow.version import version
 from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER
 from tests.test_utils import db
@@ -1483,7 +1484,7 @@ class TestTaskInstance:
         ti = 
dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
         ti.task = task
         ti.run()
-        assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is 
None
+        assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None
 
     def test_post_execute_hook(self, dag_maker):
         """
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 5dc153dee1..6abdd344ac 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -28,11 +28,12 @@ from sqlalchemy.orm import Session
 from airflow.configuration import conf
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-from airflow.models.xcom import XCOM_RETURN_KEY, BaseXCom, XCom, 
resolve_xcom_backend
+from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
 from airflow.operators.empty import EmptyOperator
 from airflow.settings import json
 from airflow.utils import timezone
 from airflow.utils.session import create_session
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.test_utils.config import conf_vars
 
 
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 622cd9f5fc..96cf19aa25 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -47,7 +47,7 @@ from airflow.models.baseoperator import BaseOperator, 
BaseOperatorLink
 from airflow.models.expandinput import EXPAND_INPUT_EMPTY
 from airflow.models.mappedoperator import MappedOperator
 from airflow.models.param import Param, ParamsDict
-from airflow.models.xcom import XCOM_RETURN_KEY, XCom
+from airflow.models.xcom import XCom
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.security import permissions
@@ -65,6 +65,7 @@ from airflow.utils import timezone
 from airflow.utils.context import Context
 from airflow.utils.operator_resources import Resources
 from airflow.utils.task_group import TaskGroup
+from airflow.utils.xcom import XCOM_RETURN_KEY
 from tests.test_utils.config import conf_vars
 from tests.test_utils.mock_operators import CustomOperator, GoogleLink, 
MockOperator
 from tests.test_utils.timetables import CustomSerializationTimetable, 
cron_timetable, delta_timetable

Reply via email to