potiuk commented on code in PR #25780:
URL: https://github.com/apache/airflow/pull/25780#discussion_r948537177


##########
airflow/operators/python.py:
##########
@@ -281,7 +283,143 @@ def execute(self, context: Context) -> Any:
         self.log.info("Done.")
 
 
-class PythonVirtualenvOperator(PythonOperator):
+class _BasePythonVirtualenvOperator(PythonOperator):
+    BASE_SERIALIZABLE_CONTEXT_KEYS = {
+        'ds',
+        'ds_nodash',
+        'inlets',
+        'next_ds',
+        'next_ds_nodash',
+        'outlets',
+        'prev_ds',
+        'prev_ds_nodash',
+        'run_id',
+        'task_instance_key_str',
+        'test_mode',
+        'tomorrow_ds',
+        'tomorrow_ds_nodash',
+        'ts',
+        'ts_nodash',
+        'ts_nodash_with_tz',
+        'yesterday_ds',
+        'yesterday_ds_nodash',
+    }
+    PENDULUM_SERIALIZABLE_CONTEXT_KEYS = {
+        'data_interval_end',
+        'data_interval_start',
+        'execution_date',
+        'logical_date',
+        'next_execution_date',
+        'prev_data_interval_end_success',
+        'prev_data_interval_start_success',
+        'prev_execution_date',
+        'prev_execution_date_success',
+        'prev_start_date_success',
+    }
+    AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'macros', 'conf', 'dag', 'dag_run', 
'task', 'params'}
+
+    def __init__(
+        self,
+        *,
+        python_callable: Callable,
+        use_dill: bool = False,
+        op_args: Optional[Collection[Any]] = None,
+        op_kwargs: Optional[MutableMapping[str, Any]] = None,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        if (
+            not isinstance(python_callable, types.FunctionType)
+            or isinstance(python_callable, types.LambdaType)
+            and python_callable.__name__ == "<lambda>"
+        ):
+            raise AirflowException('PythonVirtualenvOperator only supports 
functions for python_callable arg')
+        super().__init__(
+            python_callable=python_callable,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            **kwargs,
+        )
+        self.string_args = string_args or []
+        self.use_dill = use_dill
+        self.pickling_library = dill if self.use_dill else pickle
+
+    def execute(self, context: Context) -> Any:
+        serializable_keys = set(self._iter_serializable_context_keys())
+        serializable_context = context_copy_partial(context, serializable_keys)
+        return super().execute(context=serializable_context)
+
+    def get_python_source(self):
+        """
+        Returns the source of self.python_callable
+        @return:
+        """
+        return dedent(inspect.getsource(self.python_callable))
+
+    def _write_args(self, file: Path):
+        if self.op_args or self.op_kwargs:
+            file.write_bytes(self.pickling_library.dumps({'args': 
self.op_args, 'kwargs': self.op_kwargs}))
+
+    def _iter_serializable_context_keys(self):
+        yield from self.BASE_SERIALIZABLE_CONTEXT_KEYS
+
+    def _write_string_args(self, file: Path):
+        file.write_text('\n'.join(map(str, self.string_args)))
+
+    def _read_result(self, path: Path):
+        if path.stat().st_size == 0:
+            return None
+        try:
+            return self.pickling_library.loads(path.read_bytes())
+        except ValueError:
+            self.log.error(
+                "Error deserializing result. Note that result deserialization "
+                "is not supported across major Python versions."
+            )
+            raise
+
+    def __deepcopy__(self, memo):
+        # module objects can't be copied _at all__
+        memo[id(self.pickling_library)] = self.pickling_library
+        return super().__deepcopy__(memo)
+
+    def _execute_python_callable_in_subprocess(self, python_path: Path, 
tmp_dir: Path):
+        if self.templates_dict:
+            self.op_kwargs['templates_dict'] = self.templates_dict
+        input_path = tmp_dir / 'script.in'

Review Comment:
   I  converted to Pathlib 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to