This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 05fc4b24482b9127f54925b859de02b9c533b780 Author: Tomek Urbaszek <[email protected]> AuthorDate: Tue Apr 28 16:55:05 2020 +0200 Allow to define custom XCom class (#8560) * Allow to define custom XCom class closes: #8059 (cherry picked from commit 6c6d6611d2aa112a947a9ebc7200446f51d0ac4c) --- airflow/config_templates/config.yml | 7 ++++ airflow/config_templates/default_airflow.cfg | 4 +++ airflow/configuration.py | 21 ++++++++++++ airflow/models/xcom.py | 19 ++++++++++- docs/concepts.rst | 9 +++++ tests/models/test_xcom.py | 50 ++++++++++++++++++++++++++++ 6 files changed, 109 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d1c2c90..f54255e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -476,6 +476,13 @@ type: string example: ~ default: "True" + - name: xcom_backend + description: | + Path to custom XCom class that will be used to store and resolve operators results + version_added: 1.10.12 + type: string + example: "path.to.CustomXCom" + default: "airflow.models.xcom.BaseXCom" - name: secrets description: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf83b34..e18e538 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -252,6 +252,10 @@ max_num_rendered_ti_fields_per_task = 30 # On each dagrun check against defined SLAs check_slas = True +# Path to custom XCom class that will be used to store and resolve operators results +# Example: xcom_backend = path.to.CustomXCom +xcom_backend = airflow.models.xcom.BaseXCom + [secrets] # Full class name of secrets backend to enable (will precede env vars and metastore in search path) # Example: backend = airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend diff --git a/airflow/configuration.py b/airflow/configuration.py index d912898..8720d77 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -42,6 +42,7 @@ import yaml from zope.deprecation import deprecated from airflow.exceptions import AirflowConfigException +from airflow.utils.module_loading import import_string standard_library.install_aliases() @@ -342,6 +343,26 @@ class AirflowConfigParser(ConfigParser): "section/key [{section}/{key}] not found " "in config".format(section=section, key=key)) + def getimport(self, section, key, **kwargs): + """ + Reads options, imports the full qualified name, and returns the object. + In case of failure, it throws an exception a clear message with the key aad the section names + :return: The object or None, if the option is empty + """ + full_qualified_path = conf.get(section=section, key=key, **kwargs) + if not full_qualified_path: + return None + + try: + return import_string(full_qualified_path) + except ImportError as e: + log.error(e) + raise AirflowConfigException( + 'The object could not be loaded. Please check "{key}" key in "{section}" section. ' + 'Current value: "{full_qualified_path}".'.format( + key=key, section=section, full_qualified_path=full_qualified_path) + ) + def getboolean(self, section, key, **kwargs): val = str(self.get(section, key, **kwargs)).lower().strip() if '#' in val: diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py index f4522b5..ea902be 100644 --- a/airflow/models/xcom.py +++ b/airflow/models/xcom.py @@ -40,7 +40,7 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -class XCom(Base, LoggingMixin): +class BaseXCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -232,3 +232,20 @@ class XCom(Base, LoggingMixin): "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise + + +def resolve_xcom_backend(): + """Resolves custom XCom class""" + clazz = conf.getimport("core", "xcom_backend", fallback="airflow.models.xcom.{}" + .format(BaseXCom.__name__)) + if clazz: + if not issubclass(clazz, BaseXCom): + raise TypeError( + "Your custom XCom class `{class_name}` is not a subclass of `{base_name}`." + .format(class_name=clazz.__name__, base_name=BaseXCom.__name__) + ) + return clazz + return BaseXCom + + +XCom = resolve_xcom_backend() diff --git a/docs/concepts.rst b/docs/concepts.rst index e85c5b3..dd48003 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -660,6 +660,15 @@ of what this may look like: Note that XComs are similar to `Variables`_, but are specifically designed for inter-task communication rather than global settings. +Custom XCom backend +''''''''''''''''''' + +It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result. +To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point +to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization / +deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value`` +methods. + .. _concepts:variables: Variables diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py new file mode 100644 index 0000000..206b074 --- /dev/null +++ b/tests/models/test_xcom.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow.configuration import conf +from airflow.models.xcom import BaseXCom, resolve_xcom_backend +from tests.test_utils.config import conf_vars + + +class CustomXCom(BaseXCom): + @staticmethod + def serialize_value(_): + return "custom_value" + + +class TestXCom: + @conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"}) + def test_resolve_xcom_class(self): + cls = resolve_xcom_backend() + assert issubclass(cls, CustomXCom) + assert cls().serialize_value(None) == "custom_value" + + @conf_vars( + {("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"} + ) + def test_resolve_xcom_class_fallback_to_basexcom(self): + cls = resolve_xcom_backend() + assert issubclass(cls, BaseXCom) + assert cls().serialize_value([1]) == b"[1]" + + @conf_vars({("core", "enable_xcom_pickling"): "False"}) + def test_resolve_xcom_class_fallback_to_basexcom_no_config(self): + init = conf.get("core", "xcom_backend") + conf.remove_option("core", "xcom_backend") + cls = resolve_xcom_backend() + assert issubclass(cls, BaseXCom) + assert cls().serialize_value([1]) == b"[1]" + conf.set("core", "xcom_backend", init)
