turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r452896934



##########
File path: airflow/operators/python.py
##########
@@ -413,140 +463,89 @@ def __init__(  # pylint: disable=too-many-arguments
             templates_exts=templates_exts,
             *args,
             **kwargs)
-        self.requirements = requirements or []
+        self.requirements = list(requirements or [])
         self.string_args = string_args or []
         self.python_version = python_version
         self.use_dill = use_dill
         self.system_site_packages = system_site_packages
-        # check that dill is present if needed
-        dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-                                   self.requirements)
-        if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-            raise AirflowException('If using dill, dill must be in the 
environment ' +
-                                   'either via system_site_packages or 
requirements')
-        # check that a function is passed, and that it is not a lambda
-        if (not isinstance(self.python_callable,
-                           types.FunctionType) or 
(self.python_callable.__name__ ==
-                                                   (lambda x: 0).__name__)):
-            raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-                self.__class__.__name__))
-        # check that args are passed iff python major version matches
-        if (python_version is not None and
-           str(python_version)[0] != str(sys.version_info[0]) and
-           self._pass_op_args()):
-            raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-                                   "different Python major versions "
-                                   "for PythonVirtualenvOperator. "
-                                   "Please use string_args.")
+        if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+            self.requirements.append('dill')
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Dict):
+        serializable_context = {key: context[key] for key in 
self._get_serializable_context_keys()}
+        super().execute(context=serializable_context)
 
     def execute_callable(self):
         with TemporaryDirectory(prefix='venv') as tmp_dir:
             if self.templates_dict:
                 self.op_kwargs['templates_dict'] = self.templates_dict
-            # generate filenames
+
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
             string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
-            # set up virtualenv
-            python_bin = 'python' + str(self.python_version) if 
self.python_version else None
             prepare_virtualenv(
                 venv_directory=tmp_dir,
-                python_bin=python_bin,
+                python_bin=f'python{self.python_version}' if 
self.python_version else None,
                 system_site_packages=self.system_site_packages,
-                requirements=self.requirements,
+                requirements=self.requirements
             )
 
             self._write_args(input_filename)
-            self._write_script(script_filename)
             self._write_string_args(string_args_filename)
+            write_python_script(
+                jinja_context=dict(
+                    op_args=self.op_args,
+                    op_kwargs=self.op_kwargs,
+                    pickling_library=self.pickling_library.__name__,
+                    python_callable=self.python_callable.__name__,
+                    
python_callable_source=dedent(inspect.getsource(self.python_callable))
+                ),
+                filename=script_filename
+            )
+
+            execute_in_subprocess(cmd=[
+                f'{tmp_dir}/bin/python',
+                script_filename,
+                input_filename,
+                output_filename,
+                string_args_filename
+            ])
 
-            # execute command in virtualenv
-            execute_in_subprocess(
-                self._generate_python_cmd(tmp_dir,
-                                          script_filename,
-                                          input_filename,
-                                          output_filename,
-                                          string_args_filename))
             return self._read_result(output_filename)
 
-    def _pass_op_args(self):
-        # we should only pass op_args if any are given to us
-        return len(self.op_args) + len(self.op_kwargs) > 0
+    def _write_args(self, filename):
+        if self.op_args or self.op_kwargs:
+            with open(filename, 'wb') as file:
+                self.pickling_library.dump({'args': self.op_args, 'kwargs': 
self.op_kwargs}, file)
+
+    def _get_serializable_context_keys(self):
+        def _is_airflow_env():
+            return self.system_site_packages or 'apache-airflow' in 
self.requirements
+
+        def _is_pendulum_env():
+            return 'pendulum' in self.requirements and 'lazy_object_proxy' in 
self.requirements
+
+        serializable_context_keys = self.BASE_SERIALIZABLE_CONTEXT_KEYS.copy()
+        if _is_airflow_env():
+            
serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
+        if _is_pendulum_env() or _is_airflow_env():
+            
serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)

Review comment:
       Oh I see. Will this be more readable?
   ```python
   if _is_airflow_env():
       serializable_context_keys.update(self.AIRFLOW_SERIALIZABLE_CONTEXT_KEYS)
       serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
   elif _is_pendulum_env():
       serializable_context_keys.update(self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS)
   ```
   because if `_is_airflow_env()` is true then both updates are performed. The 
single update is done only when `_is_airflow_env() == False` and 
`_is_pendulum_env()==True`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to