turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452831967
##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__( # pylint: disable=too-many-arguments
templates_exts=templates_exts,
*args,
**kwargs)
- self.requirements = requirements or []
+ self.requirements = list(requirements or [])
self.string_args = string_args or []
self.python_version = python_version
self.use_dill = use_dill
self.system_site_packages = system_site_packages
- # check that dill is present if needed
- dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
- self.requirements)
- if (not system_site_packages) and use_dill and not
any(dill_in_requirements):
- raise AirflowException('If using dill, dill must be in the
environment ' +
- 'either via system_site_packages or
requirements')
- # check that a function is passed, and that it is not a lambda
- if (not isinstance(self.python_callable,
- types.FunctionType) or
(self.python_callable.__name__ ==
- (lambda x: 0).__name__)):
- raise AirflowException('{} only supports functions for
python_callable arg'.format(
- self.__class__.__name__))
- # check that args are passed iff python major version matches
- if (python_version is not None and
- str(python_version)[0] != str(sys.version_info[0]) and
- self._pass_op_args()):
- raise AirflowException("Passing op_args or op_kwargs is not
supported across "
- "different Python major versions "
- "for PythonVirtualenvOperator. "
- "Please use string_args.")
+ if not self.system_site_packages and self.use_dill and 'dill' not in
self.requirements:
+ self.requirements.append('dill')
+ self.pickling_library = dill if self.use_dill else pickle
+
+ def execute(self, context: Dict):
+ serializable_context = {key: context[key] for key in
self._get_serializable_context_keys()}
+ super().execute(context=serializable_context)
def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
if self.templates_dict:
self.op_kwargs['templates_dict'] = self.templates_dict
- # generate filenames
+
input_filename = os.path.join(tmp_dir, 'script.in')
output_filename = os.path.join(tmp_dir, 'script.out')
string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
script_filename = os.path.join(tmp_dir, 'script.py')
- # set up virtualenv
- python_bin = 'python' + str(self.python_version) if
self.python_version else None
prepare_virtualenv(
venv_directory=tmp_dir,
- python_bin=python_bin,
+ python_bin=f'python{self.python_version}' if
self.python_version else None,
system_site_packages=self.system_site_packages,
- requirements=self.requirements,
+ requirements=self.requirements
)
self._write_args(input_filename)
- self._write_script(script_filename)
self._write_string_args(string_args_filename)
+ write_python_script(
+ jinja_context=dict(
+ op_args=self.op_args,
+ op_kwargs=self.op_kwargs,
+ pickling_library=self.pickling_library.__name__,
+ python_callable=self.python_callable.__name__,
+
python_callable_source=dedent(inspect.getsource(self.python_callable))
+ ),
+ filename=script_filename
+ )
+
+ execute_in_subprocess(cmd=[
+ f'{tmp_dir}/bin/python',
+ script_filename,
+ input_filename,
+ output_filename,
+ string_args_filename
+ ])
- # execute command in virtualenv
- execute_in_subprocess(
- self._generate_python_cmd(tmp_dir,
- script_filename,
- input_filename,
- output_filename,
- string_args_filename))
return self._read_result(output_filename)
- def _pass_op_args(self):
- # we should only pass op_args if any are given to us
- return len(self.op_args) + len(self.op_kwargs) > 0
+ def _write_args(self, filename):
+ if self.op_args or self.op_kwargs:
+ with open(filename, 'wb') as file:
+ self.pickling_library.dump({'args': self.op_args, 'kwargs':
self.op_kwargs}, file)
+
+ def _get_serializable_context_keys(self):
+ def _is_airflow_env():
+ return self.system_site_packages or 'apache-airflow' in
self.requirements
+
+ def _is_pendulum_env():
+ return 'pendulum' in self.requirements and 'lazy_object_proxy' in
self.requirements
+
+ serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+ if _is_airflow_env():
+
serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+ if _is_pendulum_env() or _is_airflow_env():
+
serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
Review comment:
```suggestion
if _is_pendulum_env() or _is_airflow_env():
serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
```
Should give the same result?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]