turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444027117
##########
File path: airflow/operators/python.py
##########
@@ -269,140 +281,132 @@ def __init__( # pylint: disable=too-many-arguments
templates_exts=templates_exts,
*args,
**kwargs)
- self.requirements = requirements or []
+ self.requirements = list(requirements or [])
self.string_args = string_args or []
self.python_version = python_version
self.use_dill = use_dill
self.system_site_packages = system_site_packages
- # check that dill is present if needed
- dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
- self.requirements)
- if (not system_site_packages) and use_dill and not
any(dill_in_requirements):
- raise AirflowException('If using dill, dill must be in the
environment ' +
- 'either via system_site_packages or
requirements')
- # check that a function is passed, and that it is not a lambda
- if (not isinstance(self.python_callable,
- types.FunctionType) or
(self.python_callable.__name__ ==
- (lambda x: 0).__name__)):
- raise AirflowException('{} only supports functions for
python_callable arg'.format(
- self.__class__.__name__))
- # check that args are passed iff python major version matches
- if (python_version is not None and
- str(python_version)[0] != str(sys.version_info[0]) and
- self._pass_op_args()):
- raise AirflowException("Passing op_args or op_kwargs is not
supported across "
- "different Python major versions "
- "for PythonVirtualenvOperator. "
- "Please use string_args.")
+ if not self.system_site_packages and self.use_dill and 'dill' not in
self.requirements:
+ self.requirements.append('dill')
+ self.pickling_library = dill if self.use_dill else pickle
def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
if self.templates_dict:
self.op_kwargs['templates_dict'] = self.templates_dict
- # generate filenames
+
input_filename = os.path.join(tmp_dir, 'script.in')
output_filename = os.path.join(tmp_dir, 'script.out')
string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
script_filename = os.path.join(tmp_dir, 'script.py')
- # set up virtualenv
- python_bin = 'python' + str(self.python_version) if
self.python_version else None
prepare_virtualenv(
venv_directory=tmp_dir,
- python_bin=python_bin,
+ python_bin=f'python{self.python_version}' if
self.python_version else None,
system_site_packages=self.system_site_packages,
- requirements=self.requirements,
+ requirements=self.requirements
)
self._write_args(input_filename)
- self._write_script(script_filename)
self._write_string_args(string_args_filename)
+ self._write_script(script_filename)
+
+ execute_in_subprocess(cmd=[
+ f'{tmp_dir}/bin/python',
+ script_filename,
+ input_filename,
+ output_filename,
+ string_args_filename
+ ])
- # execute command in virtualenv
- execute_in_subprocess(
- self._generate_python_cmd(tmp_dir,
- script_filename,
- input_filename,
- output_filename,
- string_args_filename))
return self._read_result(output_filename)
- def _pass_op_args(self):
- # we should only pass op_args if any are given to us
- return len(self.op_args) + len(self.op_kwargs) > 0
+ def _write_args(self, filename):
+ if self.op_args or self.op_kwargs:
+ if self.op_kwargs:
+ # some items from context can't be loaded in virtual env
+ self._keep_serializable_op_kwargs()
+ print(self.op_kwargs)
+ with open(filename, 'wb') as file:
+ self.pickling_library.dump({'args': self.op_args, 'kwargs':
self.op_kwargs}, file)
+
+ def _keep_serializable_op_kwargs(self):
+ # Remove unserializable objects
+ # otherwise "KeyError: 'Variable __getstate__ does not exist'" would
be raised.
+ self.op_kwargs.pop('var', None)
+ # otherwise "TypeError: cannot serialize '_io.FileIO' object" would be
raised.
+ self.op_kwargs.pop('task_instance', None)
+ self.op_kwargs.pop('ti', None)
+
+ if self.system_site_packages or 'apache-airflow' in self.requirements:
+ # All can be serialized expecting it to run in an airflow env.
+ return
+
+ # Not access to host packages and no apache-airflow installed.
+ # Remove airflow specific context
+ # otherwise "ModuleNotFoundError: No module named 'airflow'" would be
raised.
+ self.op_kwargs.pop('macros', None)
+ self.op_kwargs.pop('conf', None)
+ self.op_kwargs.pop('dag', None)
+ self.op_kwargs.pop('dag_run', None)
+ self.op_kwargs.pop('task', None)
+
+ if 'pendulum' in self.requirements and 'lazy_object_proxy' in
self.requirements:
+ # ..but pendulum is installed so keep pendulum date objects
+ # Note: 'lazy_object_proxy' is needed to work.
+ return
+
+ # No pendulum is installed either. So remove pendulum specific context.
+ # otherwise "ModuleNotFoundError: No module named 'pendulum'" would be
raised.
+ self.op_kwargs.pop('execution_date', None)
+ self.op_kwargs.pop('next_execution_date', None)
+ self.op_kwargs.pop('prev_execution_date', None)
+ self.op_kwargs.pop('prev_execution_date_success', None)
+ self.op_kwargs.pop('prev_start_date_success', None)
def _write_string_args(self, filename):
- # writes string_args to a file, which are read line by line
with open(filename, 'w') as file:
file.write('\n'.join(map(str, self.string_args)))
- def _write_args(self, input_filename):
- # serialize args to file
- if self._pass_op_args():
- with open(input_filename, 'wb') as file:
- arg_dict = ({'args': self.op_args, 'kwargs': self.op_kwargs})
- if self.use_dill:
- dill.dump(arg_dict, file)
- else:
- pickle.dump(arg_dict, file)
-
- def _read_result(self, output_filename):
- if os.stat(output_filename).st_size == 0:
+ def _code__imports(self):
+ return f'import {self.pickling_library.__name__}\n' \
+ 'import sys\n'
+
+ def _code__read_args(self, filename):
+ if self.op_args or self.op_kwargs:
+ return f'with open({filename}, "rb") as file:\n' \
+ f' arg_dict =
{self.pickling_library.__name__}.load(file)\n'
+ return 'arg_dict = {"args": [], "kwargs": {}}\n'
+
+ def _code__read_string_args(self, filename):
+ return f'with open({filename}, "r") as file:\n' \
+ ' virtualenv_string_args = list(map(lambda x: x.strip(),
list(file)))\n'
+
+ def _code__write_output(self, filename):
+ return f'with open({filename}, "wb") as file:\n' \
+ f' if res: {self.pickling_library.__name__}.dump(res,
file)\n'
+
+ def _code__call_script(self, arg_dict):
+ return f'{dedent(inspect.getsource(self.python_callable))}\n' \
+ f'res = {self.python_callable.__name__}(*{arg_dict}["args"],
**{arg_dict}["kwargs"])\n'
+
+ def _write_script(self, filename):
+ with open(filename, 'w') as file:
+ python_code = f"{self._code__imports()}" \
+ f"{self._code__read_args(filename='sys.argv[1]')}" \
+
f"{self._code__read_string_args(filename='sys.argv[3]')}" \
+ f"{self._code__call_script(arg_dict='arg_dict')}" \
+ f"{self._code__write_output(filename='sys.argv[2]')}"
+ self.log.debug('Writing code to file\n %s', python_code)
+ file.write(python_code)
Review comment:
@feluelle what would you say to use jinja template? I think it would be
easier to understand how the `python_code` looks like. We used a similar
approach in operator generator:
https://github.com/PolideaInternal/airflow-munchkin/blob/master/airflow_munchkin/template/discovery/operator_class.tpl
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]