This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1ab105ab65 Refactor out xcom constants from models (#30180)
1ab105ab65 is described below
commit 1ab105ab6596e1f0a023a307df8ab1f42ade8282
Author: Vladimir Mikhaylov <[email protected]>
AuthorDate: Tue Mar 21 13:01:23 2023 +0000
Refactor out xcom constants from models (#30180)
---
airflow/models/__init__.py | 4 +---
airflow/models/baseoperator.py | 2 +-
airflow/models/mappedoperator.py | 3 ++-
airflow/models/taskinstance.py | 3 ++-
airflow/models/xcom.py | 12 +++++++-----
airflow/models/xcom_arg.py | 2 +-
airflow/utils/xcom.py | 24 ++++++++++++++++++++++++
tests/decorators/test_python.py | 2 +-
tests/models/test_mappedoperator.py | 2 +-
tests/models/test_taskinstance.py | 5 +++--
tests/models/test_xcom.py | 3 ++-
tests/serialization/test_dag_serialization.py | 3 ++-
12 files changed, 47 insertions(+), 18 deletions(-)
diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py
index c750757cb0..281ffcfd67 100644
--- a/airflow/models/__init__.py
+++ b/airflow/models/__init__.py
@@ -22,7 +22,6 @@ from __future__ import annotations
__all__ = [
"DAG",
"ID_LEN",
- "XCOM_RETURN_KEY",
"Base",
"BaseOperator",
"BaseOperatorLink",
@@ -89,7 +88,6 @@ def __getattr__(name):
__lazy_imports = {
"DAG": "airflow.models.dag",
"ID_LEN": "airflow.models.base",
- "XCOM_RETURN_KEY": "airflow.models.xcom",
"Base": "airflow.models.base",
"BaseOperator": "airflow.models.baseoperator",
"BaseOperatorLink": "airflow.models.baseoperator",
@@ -143,4 +141,4 @@ if TYPE_CHECKING:
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.trigger import Trigger
from airflow.models.variable import Variable
- from airflow.models.xcom import XCOM_RETURN_KEY, XCom
+ from airflow.models.xcom import XCom
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index a120168685..d78152d990 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -75,7 +75,6 @@ from airflow.models.param import ParamsDict
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskmixin import DAGNode, DependencyMixin
-from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.serialization.enums import DagAttributeTypes
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
@@ -92,6 +91,7 @@ from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.setup_teardown import SetupTeardownContext
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weight_rule import WeightRule
+from airflow.utils.xcom import XCOM_RETURN_KEY
if TYPE_CHECKING:
import jinja2 # Slow import.
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2cbd46cd57..a10fd10cdc 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -66,6 +66,7 @@ from airflow.utils.helpers import is_container,
prevent_duplicates
from airflow.utils.operator_resources import Resources
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET
+from airflow.utils.xcom import XCOM_RETURN_KEY
if TYPE_CHECKING:
import jinja2 # Slow import.
@@ -111,7 +112,7 @@ def validate_mapping_kwargs(op: type[BaseOperator], func:
ValidationSource, valu
def ensure_xcomarg_return_value(arg: Any) -> None:
- from airflow.models.xcom_arg import XCOM_RETURN_KEY, XComArg
+ from airflow.models.xcom_arg import XComArg
if isinstance(arg, XComArg):
for operator, key in arg.iter_references():
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 0406d2158b..e4dbcb0e5e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.models.param import process_params
from airflow.models.taskfail import TaskFail
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
-from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
+from airflow.models.xcom import LazyXComAccess, XCom
from airflow.plugins_manager import integrate_macros_plugins
from airflow.sentry import Sentry
from airflow.stats import Stats
@@ -122,6 +122,7 @@ from airflow.utils.sqlalchemy import (
)
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.timeout import timeout
+from airflow.utils.xcom import XCOM_RETURN_KEY
TR = TaskReschedule
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 62e773c617..476b170deb 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -58,12 +58,14 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime
-log = logging.getLogger(__name__)
+# XCom constants below are needed for providers backward compatibility,
+# which should import the constants directly after apache-airflow>=2.6.0
+from airflow.utils.xcom import (
+ MAX_XCOM_SIZE, # noqa: F401
+ XCOM_RETURN_KEY,
+)
-# MAX XCOM Size is 48KB
-# https://github.com/apache/airflow/pull/1618#discussion_r68249677
-MAX_XCOM_SIZE = 49344
-XCOM_RETURN_KEY = "return_value"
+log = logging.getLogger(__name__)
if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstanceKey
diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 133fd4280b..7f65cd3d94 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -28,12 +28,12 @@ from airflow.exceptions import XComNotFound
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskmixin import DAGNode, DependencyMixin
-from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.utils.context import Context
from airflow.utils.edgemodifier import EdgeModifier
from airflow.utils.mixins import ResolveMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import NOTSET, ArgNotSet
+from airflow.utils.xcom import XCOM_RETURN_KEY
if TYPE_CHECKING:
from airflow.models.dag import DAG
diff --git a/airflow/utils/xcom.py b/airflow/utils/xcom.py
new file mode 100644
index 0000000000..b05b881fa6
--- /dev/null
+++ b/airflow/utils/xcom.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# MAX XCOM Size is 48KB
+# https://github.com/apache/airflow/pull/1618#discussion_r68249677
+from __future__ import annotations
+
+MAX_XCOM_SIZE = 49344
+XCOM_RETURN_KEY = "return_value"
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 354ac5fc2e..163a220e75 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -32,12 +32,12 @@ from airflow.models.expandinput import
DictOfListsExpandInput
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
-from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.models.xcom_arg import PlainXComArg, XComArg
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.operators.test_python import BasePythonTest
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
diff --git a/tests/models/test_mappedoperator.py
b/tests/models/test_mappedoperator.py
index bdfcf8bc7f..931d262d24 100644
--- a/tests/models/test_mappedoperator.py
+++ b/tests/models/test_mappedoperator.py
@@ -28,10 +28,10 @@ from airflow.models.mappedoperator import MappedOperator
from airflow.models.param import ParamsDict
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskmap import TaskMap
-from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.models.xcom_arg import XComArg
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.models import DEFAULT_DATE
from tests.test_utils.mapping import expand_mapped_task
from tests.test_utils.mock_operators import MockOperator,
MockOperatorWithNestedFields, NestedFields
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 6c331e5ae7..2d86cdfde5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -63,7 +63,7 @@ from airflow.models.taskinstance import TaskInstance,
TaskInstance as TI
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.variable import Variable
-from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComAccess, XCom
+from airflow.models.xcom import LazyXComAccess, XCom
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
@@ -84,6 +84,7 @@ from airflow.utils.session import create_session,
provide_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
from airflow.version import version
from tests.models import DEFAULT_DATE, TEST_DAGS_FOLDER
from tests.test_utils import db
@@ -1483,7 +1484,7 @@ class TestTaskInstance:
ti =
dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
ti.task = task
ti.run()
- assert ti.xcom_pull(task_ids=task_id, key=models.XCOM_RETURN_KEY) is
None
+ assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None
def test_post_execute_hook(self, dag_maker):
"""
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 5dc153dee1..6abdd344ac 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -28,11 +28,12 @@ from sqlalchemy.orm import Session
from airflow.configuration import conf
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
-from airflow.models.xcom import XCOM_RETURN_KEY, BaseXCom, XCom,
resolve_xcom_backend
+from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
+from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.config import conf_vars
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 622cd9f5fc..96cf19aa25 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -47,7 +47,7 @@ from airflow.models.baseoperator import BaseOperator,
BaseOperatorLink
from airflow.models.expandinput import EXPAND_INPUT_EMPTY
from airflow.models.mappedoperator import MappedOperator
from airflow.models.param import Param, ParamsDict
-from airflow.models.xcom import XCOM_RETURN_KEY, XCom
+from airflow.models.xcom import XCom
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
@@ -65,6 +65,7 @@ from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import TaskGroup
+from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_operators import CustomOperator, GoogleLink,
MockOperator
from tests.test_utils.timetables import CustomSerializationTimetable,
cron_timetable, delta_timetable