ashb commented on code in PR #25780: URL: https://github.com/apache/airflow/pull/25780#discussion_r962836873
########## tests/decorators/test_external_python.py: ########## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +import sys +from datetime import timedelta +from subprocess import CalledProcessError + +import pytest + +from airflow.decorators import task +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) +INTERVAL = timedelta(hours=12) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) + +TI_CONTEXT_ENV_VARS = [ + 'AIRFLOW_CTX_DAG_ID', + 'AIRFLOW_CTX_TASK_ID', + 'AIRFLOW_CTX_EXECUTION_DATE', + 'AIRFLOW_CTX_DAG_RUN_ID', +] + + +PYTHON_VERSION = sys.version_info[0] + +# Technically Not a separate virtualenv but should be good enough for unit tests +PYTHON = sys.executable + + +class TestExternalPythonDecorator: + def test_add_dill(self, dag_maker): + @task.external_python(python=PYTHON, use_dill=True) + def f(): + """Ensure dill is correctly installed.""" + import dill # noqa: F401 Review Comment: Does this test actually cover anything? For creating a venv I can see this makes sense but not sure about for this operator. (At least the comment here is incorrect anyway) ########## airflow/operators/python.py: ########## @@ -501,27 +555,152 @@ def _iter_serializable_context_keys(self): elif 'pendulum' in self.requirements: yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - def _write_string_args(self, filename): - with open(filename, 'w') as file: - file.write('\n'.join(map(str, self.string_args))) - def _read_result(self, filename): - if os.stat(filename).st_size == 0: - return None - with open(filename, 'rb') as file: - try: - return self.pickling_library.load(file) - except ValueError: - self.log.error( - "Error deserializing result. Note that result deserialization " - "is not supported across major Python versions." +class ExternalPythonOperator(_BasePythonVirtualenvOperator): + """ + Allows one to run a function in a virtualenv that is not re-created but used as is + without the overhead of creating the virtualenv (with certain caveats). + + The function must be defined using def, and not be + part of a class. All imports must happen inside the function + and no variables outside the scope may be referenced. A global scope + variable named virtualenv_string_args will be available (populated by + string_args). In addition, one can pass stuff through op_args and op_kwargs, and one + can use a return value. + Note that if your virtualenv runs in a different Python major version than Airflow, + you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to + Airflow through plugins. You can use string_args though. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ExternalPythonOperator` + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param python_callable: A python function with no references to outside variables, + defined with def, which will be run in a virtualenv + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but if dill is not preinstalled in your venv, the task will fail with use_dill enabled. + :param op_args: A list of positional arguments to pass to python_callable. + :param op_kwargs: A dict of keyword arguments to pass to python_callable. + :param string_args: Strings that are present in the global var virtualenv_string_args, + available to python_callable at runtime as a list[str]. Note that args are split + by newline. + :param templates_dict: a dictionary where the values are templates that + will get templated by the Airflow engine sometime between + ``__init__`` and ``execute`` takes place and are made available + in your callable's context after the template has been applied + :param templates_exts: a list of file extensions to resolve while + processing templated fields, for examples ``['.sql', '.hql']`` + """ + + template_fields: Sequence[str] = tuple({'python_path'} | set(PythonOperator.template_fields)) + + def __init__( + self, + *, + python: str, + python_callable: Callable, + use_dill: bool = False, + op_args: Optional[Collection[Any]] = None, + op_kwargs: Optional[Mapping[str, Any]] = None, + string_args: Optional[Iterable[str]] = None, + templates_dict: Optional[Dict] = None, + templates_exts: Optional[List[str]] = None, + **kwargs, + ): + if not python: + raise ValueError("Python Path must be defined in ExternalPythonOperator") + self.python = python + super().__init__( + python_callable=python_callable, + use_dill=use_dill, + op_args=op_args, + op_kwargs=op_kwargs, + string_args=string_args, + templates_dict=templates_dict, + templates_exts=templates_exts, + **kwargs, + ) + + def execute_callable(self): + python_path = Path(self.python) + if not python_path.exists(): + raise ValueError(f"Python Path '{python_path}' must exists") + if not python_path.is_file(): + raise ValueError(f"Python Path '{python_path}' must be a file") + if not python_path.is_absolute(): + raise ValueError(f"Python Path '{python_path}' must be an absolute path.") + python_version_as_list_of_strings = self._get_python_version_from_venv() + if ( + python_version_as_list_of_strings + and str(python_version_as_list_of_strings[0]) != str(sys.version_info.major) + and (self.op_args or self.op_kwargs) + ): + raise AirflowException( + "Passing op_args or op_kwargs is not supported across different Python " + "major versions for ExternalPythonOperator. Please use string_args." + f"Sys version: {sys.version_info}. Venv version: {python_version_as_list_of_strings}" + ) + with TemporaryDirectory(prefix='tmd') as tmp_dir: + tmp_path = Path(tmp_dir) + return self._execute_python_callable_in_subprocess(python_path, tmp_path) + + def _get_virtualenv_path(self) -> Path: + return Path(self.python).parents[1] + + def _get_python_version_from_venv(self) -> List[str]: + try: + result = subprocess.check_output([self.python, "--version"], text=True) + return result.strip().split(" ")[-1].split(".") + except Exception as e: + raise ValueError(f"Error while executing {self.python}: {e}") + + def _get_airflow_version_from_venv(self) -> Optional[str]: + try: + result = subprocess.check_output( + [self.python, "-c", "from airflow import version; print(version.version)"], text=True + ) + venv_airflow_version = result.strip() + if venv_airflow_version != airflow_version: + raise AirflowConfigException( + f"The version of Airflow installed in the virtualenv {self._get_virtualenv_path()}: " + f"{venv_airflow_version} is different than the runtime Airflow version: " + f"{airflow_version}. Make sure your environment has the same Airflow version " + f"installed as the Airflow runtime." ) - raise + return venv_airflow_version + except Exception as e: + self.log.info("When checking for Airflow installed in venv got %s", e) + self.log.info( + f"This means that Airflow is not properly installed in the virtualenv " + f"{self._get_virtualenv_path()}. Airflow context keys will not be available. " + f"Please Install Airflow {airflow_version} in your venv to access them." + ) Review Comment: One thought here: Part of the primary reason for using an external Python is to be able to run a task which has conflicts with Core Airflow (for instance some dbt-core libs are tricky to import with airflow right now I think) so is it worth being able to silence this warning somehow? ########## airflow/operators/python.py: ########## @@ -501,27 +555,152 @@ def _iter_serializable_context_keys(self): elif 'pendulum' in self.requirements: yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS - def _write_string_args(self, filename): - with open(filename, 'w') as file: - file.write('\n'.join(map(str, self.string_args))) - def _read_result(self, filename): - if os.stat(filename).st_size == 0: - return None - with open(filename, 'rb') as file: - try: - return self.pickling_library.load(file) - except ValueError: - self.log.error( - "Error deserializing result. Note that result deserialization " - "is not supported across major Python versions." +class ExternalPythonOperator(_BasePythonVirtualenvOperator): + """ + Allows one to run a function in a virtualenv that is not re-created but used as is + without the overhead of creating the virtualenv (with certain caveats). + + The function must be defined using def, and not be + part of a class. All imports must happen inside the function + and no variables outside the scope may be referenced. A global scope + variable named virtualenv_string_args will be available (populated by + string_args). In addition, one can pass stuff through op_args and op_kwargs, and one + can use a return value. + Note that if your virtualenv runs in a different Python major version than Airflow, + you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to + Airflow through plugins. You can use string_args though. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ExternalPythonOperator` + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param python_callable: A python function with no references to outside variables, + defined with def, which will be run in a virtualenv + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but if dill is not preinstalled in your venv, the task will fail with use_dill enabled. + :param op_args: A list of positional arguments to pass to python_callable. + :param op_kwargs: A dict of keyword arguments to pass to python_callable. + :param string_args: Strings that are present in the global var virtualenv_string_args, + available to python_callable at runtime as a list[str]. Note that args are split + by newline. Review Comment: This whole string_args thing was always a bit odd/confusing to me. I wonder if we should not add it to this new operator? ########## airflow/example_dags/example_python_operator.py: ########## @@ -32,6 +35,14 @@ log = logging.getLogger(__name__) +PYTHON = sys.executable + +BASE_DIR = tempfile.gettempdir() +EXTERNAL_PYTHON_ENV = Path(BASE_DIR, "venv-for-system-tests") Review Comment: Nit(ish): The parameterization of this probably should be in tests/dags, not `airflow/example_dags` Reason: example_dags often show up for new users, and seeing this env var here would be a bit confusing to them Edit: Is this even used anywhere? ########## tests/decorators/test_external_python.py: ########## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +import sys +from datetime import timedelta +from subprocess import CalledProcessError + +import pytest + +from airflow.decorators import task +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2016, 1, 1) +END_DATE = timezone.datetime(2016, 1, 2) +INTERVAL = timedelta(hours=12) +FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) + +TI_CONTEXT_ENV_VARS = [ + 'AIRFLOW_CTX_DAG_ID', + 'AIRFLOW_CTX_TASK_ID', + 'AIRFLOW_CTX_EXECUTION_DATE', + 'AIRFLOW_CTX_DAG_RUN_ID', +] + + +PYTHON_VERSION = sys.version_info[0] + +# Technically Not a separate virtualenv but should be good enough for unit tests +PYTHON = sys.executable + + +class TestExternalPythonDecorator: + def test_add_dill(self, dag_maker): + @task.external_python(python=PYTHON, use_dill=True) + def f(): + """Ensure dill is correctly installed.""" + import dill # noqa: F401 + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_fail(self, dag_maker): + @task.external_python(python=PYTHON) + def f(): + raise Exception + + with dag_maker(): + ret = f() + + with pytest.raises(CalledProcessError): + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_with_args(self, dag_maker): + @task.external_python(python=PYTHON) + def f(a, b, c=False, d=False): + if a == 0 and b == 1 and c and not d: + return True + else: + raise Exception + + with dag_maker(): + ret = f(0, 1, c=True) + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_return_none(self, dag_maker): + @task.external_python(python=PYTHON) + def f(): + return None + + with dag_maker(): + ret = f() + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_nonimported_as_arg(self, dag_maker): Review Comment: Don't this case is needed either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
