Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test e1a2d74c0 -> 14e6d7bf4
[AIRFLOW-1512] Add PythonVirtualenvOperator Closes #2446 from saguziel/aguziel-virtualenv (cherry picked from commit 8e253c750d81da4e472049473767aaea0c504465) Signed-off-by: Alex Guziel <alex.guz...@airbnb.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/14e6d7bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/14e6d7bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/14e6d7bf Branch: refs/heads/v1-9-test Commit: 14e6d7bf4c1af5b38be347cb143af056f7abdd70 Parents: e1a2d74 Author: Alex Guziel <alex.guz...@airbnb.com> Authored: Mon Sep 18 14:17:21 2017 -0700 Committer: Alex Guziel <alex.guz...@airbnb.com> Committed: Mon Sep 18 14:17:45 2017 -0700 ---------------------------------------------------------------------- airflow/operators/python_operator.py | 215 ++++++++++++++++++++++- docs/code.rst | 1 + tests/operators/test_virtualenv_operator.py | 188 ++++++++++++++++++++ 3 files changed, 403 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14e6d7bf/airflow/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 552996f..56837ec 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -11,9 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from builtins import str +import dill +import inspect +import os +import pickle +import subprocess +import sys +import types + from airflow.exceptions import AirflowException from airflow.models import BaseOperator, SkipMixin from airflow.utils.decorators import apply_defaults +from airflow.utils.file import TemporaryDirectory + +from textwrap import dedent class PythonOperator(BaseOperator): @@ -73,10 +86,13 @@ class PythonOperator(BaseOperator): context['templates_dict'] = self.templates_dict self.op_kwargs = context - return_value = self.python_callable(*self.op_args, **self.op_kwargs) + return_value = self.execute_callable() self.logger.info("Done. Returned value was: %s", return_value) return return_value + def execute_callable(self): + return self.python_callable(*self.op_args, **self.op_kwargs) + class BranchPythonOperator(PythonOperator, SkipMixin): """ @@ -141,3 +157,200 @@ class ShortCircuitOperator(PythonOperator, SkipMixin): self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) self.logger.info("Done.") + +class PythonVirtualenvOperator(PythonOperator): + """ + Allows one to run a function in a virtualenv that is created and destroyed + automatically (with certain caveats). + + The function must be defined using def, and not be part of a class. All imports + must happen inside the function and no variables outside of the scope may be referenced. + A global scope variable named virtualenv_string_args will be available (populated by + string_args). In addition, one can pass stuff through op_args and op_kwargs, and one + can use a return value. + + Note that if your virtualenv runs in a different Python major version than Airflow, + you cannot use return values, op_args, or op_kwargs. You can use string_args though. + + :param python_callable: A python function with no references to outside variables, + defined with def, which will be run in a virtualenv + :type python_callable: function + :param requirements: A list of requirements as specified in a pip install command + :type requirements: list(str) + :param python_version: The Python version to run the virtualenv with. Note that + both 2 and 2.7 are acceptable forms. + :type python_version: str + :param use_dill: Whether to use dill to serialize the args and result (pickle is default). + This allow more complex types but requires you to include dill in your requirements. + :type use_dill: bool + :param system_site_packages: Whether to include system_site_packages in your virtualenv. + See virtualenv documentation for more information. + :type system_site_packages: bool + :param op_args: A list of positional arguments to pass to python_callable. + :type op_kwargs: list + :param op_kwargs: A dict of keyword arguments to pass to python_callable. + :type op_kwargs: dict + :param string_args: Strings that are present in the global var virtualenv_string_args, + available to python_callable at runtime as a list(str). Note that args are split + by newline. + :type string_args: list(str) + + """ + def __init__(self, python_callable, requirements=None, python_version=None, use_dill=False, + system_site_packages=True, op_args=None, op_kwargs=None, string_args=None, + *args, **kwargs): + super(PythonVirtualenvOperator, self).__init__( + python_callable=python_callable, + op_args=op_args, + op_kwargs=op_kwargs, + *args, + **kwargs) + self.requirements = requirements or [] + self.string_args = string_args or [] + self.python_version = python_version + self.use_dill = use_dill + self.system_site_packages = system_site_packages + # check that dill is present if needed + dill_in_requirements = map(lambda x: x.lower().startswith('dill'), self.requirements) + if (not system_site_packages) and use_dill and not any(dill_in_requirements): + raise AirflowException('If using dill, dill must be in the environment ' + + 'either via system_site_packages or requirements') + # check that a function is passed, and that it is not a lambda + if (not isinstance(self.python_callable, types.FunctionType) + or self.python_callable.__name__ == (lambda x: 0).__name__): + raise AirflowException('{} only supports functions for python_callable arg', + self.__class__.__name__) + # check that args are passed iff python major version matches + if (python_version is not None + and str(python_version)[0] != str(sys.version_info[0]) + and self._pass_op_args()): + raise AirflowException("Passing op_args or op_kwargs is not supported across " + "different Python major versions " + "for PythonVirtualenvOperator. Please use string_args.") + + def execute_callable(self): + with TemporaryDirectory(prefix='venv') as tmp_dir: + # generate filenames + input_filename = os.path.join(tmp_dir, 'script.in') + output_filename = os.path.join(tmp_dir, 'script.out') + string_args_filename = os.path.join(tmp_dir, 'string_args.txt') + script_filename = os.path.join(tmp_dir, 'script.py') + + # set up virtualenv + self._execute_in_subprocess(self._generate_virtualenv_cmd(tmp_dir)) + cmd = self._generate_pip_install_cmd(tmp_dir) + if cmd: + self._execute_in_subprocess(cmd) + + self._write_args(input_filename) + self._write_script(script_filename) + self._write_string_args(string_args_filename) + + # execute command in virtualenv + self._execute_in_subprocess( + self._generate_python_cmd(tmp_dir, + script_filename, + input_filename, + output_filename, + string_args_filename)) + return self._read_result(output_filename) + + def _pass_op_args(self): + # we should only pass op_args if any are given to us + return len(self.op_args) + len(self.op_kwargs) > 0 + + def _execute_in_subprocess(self, cmd): + try: + self.logger.info("Executing cmd\n{}".format(cmd)) + output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) + if output: + self.logger.info("Got output\n{}".format(output)) + except subprocess.CalledProcessError as e: + self.logger.info("Got error output\n{}".format(e.output)) + raise + + def _write_string_args(self, filename): + # writes string_args to a file, which are read line by line + with open(filename, 'w') as f: + f.write('\n'.join(map(str, self.string_args))) + + def _write_args(self, input_filename): + # serialize args to file + if self._pass_op_args(): + with open(input_filename, 'wb') as f: + arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs}) + if self.use_dill: + dill.dump(arg_dict, f) + else: + pickle.dump(arg_dict, f) + + def _read_result(self, output_filename): + if os.stat(output_filename).st_size == 0: + return None + with open(output_filename, 'rb') as f: + try: + if self.use_dill: + return dill.load(f) + else: + return pickle.load(f) + except ValueError: + self.logger.error("Error deserializing result. Note that result deserialization " + "is not supported across major Python versions.") + raise + + def _write_script(self, script_filename): + with open(script_filename, 'w') as f: + python_code = self._generate_python_code() + self.logger.debug('Writing code to file\n{}'.format(python_code)) + f.write(python_code) + + def _generate_virtualenv_cmd(self, tmp_dir): + cmd = ['virtualenv', tmp_dir] + if self.system_site_packages: + cmd.append('--system-site-packages') + if self.python_version is not None: + cmd.append('--python=python{}'.format(self.python_version)) + return cmd + + def _generate_pip_install_cmd(self, tmp_dir): + if len(self.requirements) == 0: + return [] + else: + # direct path alleviates need to activate + cmd = ['{}/bin/pip'.format(tmp_dir), 'install'] + return cmd + self.requirements + + def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename): + # direct path alleviates need to activate + return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename] + + def _generate_python_code(self): + if self.use_dill: + pickling_library = 'dill' + else: + pickling_library = 'pickle' + fn = self.python_callable + # dont try to read pickle if we didnt pass anything + if self._pass_op_args(): + load_args_line = 'with open(sys.argv[1], "rb") as f: arg_dict = {}.load(f)'.format(pickling_library) + else: + load_args_line = 'arg_dict = {"args": [], "kwargs": {}}' + + # no indents in original code so we can accept any type of indents in the original function + # we deserialize args, call function, serialize result if necessary + return dedent("""\ + import {pickling_library} + import sys + {load_args_code} + args = arg_dict["args"] + kwargs = arg_dict["kwargs"] + with open(sys.argv[3], 'r') as f: virtualenv_string_args = list(map(lambda x: x.strip(), list(f))) + {python_callable_lines} + res = {python_callable_name}(*args, **kwargs) + with open(sys.argv[2], 'wb') as f: res is not None and {pickling_library}.dump(res, f) + """).format( + load_args_code=load_args_line, + python_callable_lines=dedent(inspect.getsource(fn)), + python_callable_name=fn.__name__, + pickling_library=pickling_library) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14e6d7bf/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index a1980f2..4a6718f 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -72,6 +72,7 @@ Operator API PrestoIntervalCheckOperator, PrestoValueCheckOperator, PythonOperator, + PythonVirtualenvOperator, S3KeySensor, S3ToHiveTransfer, ShortCircuitOperator, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14e6d7bf/tests/operators/test_virtualenv_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_virtualenv_operator.py b/tests/operators/test_virtualenv_operator.py new file mode 100644 index 0000000..9231d39 --- /dev/null +++ b/tests/operators/test_virtualenv_operator.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function, unicode_literals + +import datetime +import funcsigs +import sys +import unittest + +from subprocess import CalledProcessError + +from airflow import configuration, DAG +from airflow.models import TaskInstance +from airflow.operators.python_operator import PythonVirtualenvOperator +from airflow.settings import Session +from airflow.utils.state import State + +from airflow.exceptions import AirflowException +import logging + +DEFAULT_DATE = datetime.datetime(2016, 1, 1) +END_DATE = datetime.datetime(2016, 1, 2) +INTERVAL = datetime.timedelta(hours=12) +FROZEN_NOW = datetime.datetime(2016, 1, 2, 12, 1, 1) + + +class TestPythonVirtualenvOperator(unittest.TestCase): + + def setUp(self): + super(TestPythonVirtualenvOperator, self).setUp() + configuration.load_test_config() + self.dag = DAG( + 'test_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE}, + schedule_interval=INTERVAL) + self.addCleanup(self.dag.clear) + + def _run_as_operator(self, fn, **kwargs): + task = PythonVirtualenvOperator( + python_callable=fn, + task_id='task', + dag=self.dag, + **kwargs) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_dill_warning(self): + def f(): + pass + with self.assertRaises(AirflowException): + PythonVirtualenvOperator( + python_callable=f, + task_id='task', + dag=self.dag, + use_dill=True, + system_site_packages=False) + + def test_no_requirements(self): + """Tests that the python callable is invoked on task run.""" + def f(): + pass + self._run_as_operator(f) + + def test_no_system_site_packages(self): + def f(): + try: + import funcsigs + except ImportError: + return True + raise Exception + self._run_as_operator(f, system_site_packages=False, requirements=['dill']) + + def test_system_site_packages(self): + def f(): + import funcsigs + self._run_as_operator(f, requirements=['funcsigs'], system_site_packages=True) + + def test_with_requirements_pinned(self): + self.assertNotEqual('0.4', funcsigs.__version__, 'Please update this string if this fails') + def f(): + import funcsigs + if funcsigs.__version__ != '0.4': + raise Exception + self._run_as_operator(f, requirements=['funcsigs==0.4']) + + def test_unpinned_requirements(self): + def f(): + import funcsigs + self._run_as_operator(f, requirements=['funcsigs', 'dill'], system_site_packages=False) + + def test_range_requirements(self): + def f(): + import funcsigs + self._run_as_operator(f, requirements=['funcsigs>1.0', 'dill'], system_site_packages=False) + + def test_fail(self): + def f(): + raise Exception + with self.assertRaises(CalledProcessError): + self._run_as_operator(f) + + def test_python_2(self): + def f(): + {}.iteritems() + self._run_as_operator(f, python_version=2, requirements=['dill']) + + def test_python_2_7(self): + def f(): + {}.iteritems() + return True + self._run_as_operator(f, python_version='2.7', requirements=['dill']) + + def test_python_3(self): + def f(): + import sys + print(sys.version) + try: + {}.iteritems() + except AttributeError: + return + raise Exception + self._run_as_operator(f, python_version=3, use_dill=False, requirements=['dill']) + + def _invert_python_major_version(self): + if sys.version_info[0] == 2: + return 3 + else: + return 2 + + def test_wrong_python_op_args(self): + if sys.version_info[0] == 2: + version = 3 + else: + version = 2 + def f(): + pass + with self.assertRaises(AirflowException): + self._run_as_operator(f, python_version=version, op_args=[1]) + + def test_without_dill(self): + def f(a): + return a + self._run_as_operator(f, system_site_packages=False, use_dill=False, op_args=[4]) + + def test_string_args(self): + def f(): + print(virtualenv_string_args) + if virtualenv_string_args[0] != virtualenv_string_args[2]: + raise Exception + self._run_as_operator(f, python_version=self._invert_python_major_version(), string_args=[1,2,1]) + + def test_with_args(self): + def f(a, b, c=False, d=False): + if a==0 and b==1 and c and not d: + return True + else: + raise Exception + self._run_as_operator(f, op_args=[0, 1], op_kwargs={'c': True}) + + def test_return_none(self): + def f(): + return None + self._run_as_operator(f) + + def test_lambda(self): + with self.assertRaises(AirflowException): + PythonVirtualenvOperator( + python_callable=lambda x: 4, + task_id='task', + dag=self.dag) + + def test_nonimported_as_arg(self): + def f(a): + return None + self._run_as_operator(f, op_args=[datetime.datetime.now()])