o-nikolas commented on code in PR #25780:
URL: https://github.com/apache/airflow/pull/25780#discussion_r952871169


##########
airflow/operators/python.py:
##########
@@ -501,27 +561,150 @@ def _iter_serializable_context_keys(self):
         elif 'pendulum' in self.requirements:
             yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
 
-    def _write_string_args(self, filename):
-        with open(filename, 'w') as file:
-            file.write('\n'.join(map(str, self.string_args)))
 
-    def _read_result(self, filename):
-        if os.stat(filename).st_size == 0:
-            return None
-        with open(filename, 'rb') as file:
-            try:
-                return self.pickling_library.load(file)
-            except ValueError:
-                self.log.error(
-                    "Error deserializing result. Note that result 
deserialization "
-                    "is not supported across major Python versions."
+class PythonPreexistingVirtualenvOperator(_BasePythonVirtualenvOperator):
+    """
+    Allows one to run a function in a virtualenv that is not re-created but 
used as is
+    without the overhead necessary overhead to create the virtualenv (with 
certain caveats).
+
+    The function must be defined using def, and not be
+    part of a class. All imports must happen inside the function
+    and no variables outside the scope may be referenced. A global scope
+    variable named virtualenv_string_args will be available (populated by
+    string_args). In addition, one can pass stuff through op_args and 
op_kwargs, and one
+    can use a return value.
+    Note that if your virtualenv runs in a different Python major version than 
Airflow,
+    you cannot use return values, op_args, op_kwargs, or use any macros that 
are being provided to
+    Airflow through plugins. You can use string_args though.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PythonPreexistingVirtualenvOperator`
+
+    :param python: Full path string (file-system specific) that points to a 
Python binary inside
+        a virtualenv that should be used (in ``VENV/bin`` folder). Should be 
absolute path
+        (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
+    :param python_callable: A python function with no references to outside 
variables,
+        defined with def, which will be run in a virtualenv
+    :param use_dill: Whether to use dill to serialize
+        the args and result (pickle is default). This allow more complex types
+        but if dill is not preinstalled in your venv, the task will fail with 
use_dill enabled.
+    :param op_args: A list of positional arguments to pass to python_callable.
+    :param op_kwargs: A dict of keyword arguments to pass to python_callable.
+    :param string_args: Strings that are present in the global var 
virtualenv_string_args,
+        available to python_callable at runtime as a list[str]. Note that args 
are split
+        by newline.
+    :param templates_dict: a dictionary where the values are templates that
+        will get templated by the Airflow engine sometime between
+        ``__init__`` and ``execute`` takes place and are made available
+        in your callable's context after the template has been applied
+    :param templates_exts: a list of file extensions to resolve while
+        processing templated fields, for examples ``['.sql', '.hql']``
+    """
+
+    template_fields: Sequence[str] = tuple({'python_path'} | 
set(PythonOperator.template_fields))
+
+    def __init__(
+        self,
+        *,
+        python: str,
+        python_callable: Callable,
+        use_dill: bool = False,
+        op_args: Optional[Collection[Any]] = None,
+        op_kwargs: Optional[Mapping[str, Any]] = None,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        if not python:
+            raise ValueError("Python Path must be defined in 
PythonPreexistingVirtualenvOperator")
+        self.python_path = Path(python)
+        if not self.python_path.exists():
+            raise ValueError(f"Python Path '{self.python_path}' must exists")
+        if not self.python_path.is_file():
+            raise ValueError(f"Python Path '{self.python_path}' must be a 
file")
+        if not self.python_path.is_absolute():
+            raise ValueError(f"Python Path '{self.python_path}' must be an 
absolute path.")
+        super().__init__(
+            python_callable=python_callable,
+            use_dill=use_dill,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            string_args=string_args,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            **kwargs,
+        )
+
+    def execute_callable(self):
+        python_version_as_list_of_strings = 
self._get_python_version_from_venv()
+        if (
+            python_version_as_list_of_strings
+            and str(python_version_as_list_of_strings[0]) != 
str(sys.version_info.major)
+            and (self.op_args or self.op_kwargs)
+        ):
+            raise AirflowException(
+                "Passing op_args or op_kwargs is not supported across 
different Python "
+                "major versions for PythonVirtualenvOperator. Please use 
string_args."

Review Comment:
   ```suggestion
                   "major versions for PythonPreexistingVirtualenvOperator. 
Please use string_args."
   ```



##########
airflow/operators/python.py:
##########
@@ -501,27 +561,150 @@ def _iter_serializable_context_keys(self):
         elif 'pendulum' in self.requirements:
             yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
 
-    def _write_string_args(self, filename):
-        with open(filename, 'w') as file:
-            file.write('\n'.join(map(str, self.string_args)))
 
-    def _read_result(self, filename):
-        if os.stat(filename).st_size == 0:
-            return None
-        with open(filename, 'rb') as file:
-            try:
-                return self.pickling_library.load(file)
-            except ValueError:
-                self.log.error(
-                    "Error deserializing result. Note that result 
deserialization "
-                    "is not supported across major Python versions."
+class PythonPreexistingVirtualenvOperator(_BasePythonVirtualenvOperator):
+    """
+    Allows one to run a function in a virtualenv that is not re-created but 
used as is
+    without the overhead necessary overhead to create the virtualenv (with 
certain caveats).

Review Comment:
   ```suggestion
       without the overhead of creating the virtualenv (with certain caveats).
   ```



##########
airflow/operators/python.py:
##########
@@ -501,27 +561,150 @@ def _iter_serializable_context_keys(self):
         elif 'pendulum' in self.requirements:
             yield from self.PENDULUM_SERIALIZABLE_CONTEXT_KEYS
 
-    def _write_string_args(self, filename):
-        with open(filename, 'w') as file:
-            file.write('\n'.join(map(str, self.string_args)))
 
-    def _read_result(self, filename):
-        if os.stat(filename).st_size == 0:
-            return None
-        with open(filename, 'rb') as file:
-            try:
-                return self.pickling_library.load(file)
-            except ValueError:
-                self.log.error(
-                    "Error deserializing result. Note that result 
deserialization "
-                    "is not supported across major Python versions."
+class PythonPreexistingVirtualenvOperator(_BasePythonVirtualenvOperator):
+    """
+    Allows one to run a function in a virtualenv that is not re-created but 
used as is
+    without the overhead necessary overhead to create the virtualenv (with 
certain caveats).
+
+    The function must be defined using def, and not be
+    part of a class. All imports must happen inside the function
+    and no variables outside the scope may be referenced. A global scope
+    variable named virtualenv_string_args will be available (populated by
+    string_args). In addition, one can pass stuff through op_args and 
op_kwargs, and one
+    can use a return value.
+    Note that if your virtualenv runs in a different Python major version than 
Airflow,
+    you cannot use return values, op_args, op_kwargs, or use any macros that 
are being provided to
+    Airflow through plugins. You can use string_args though.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:PythonPreexistingVirtualenvOperator`
+
+    :param python: Full path string (file-system specific) that points to a 
Python binary inside
+        a virtualenv that should be used (in ``VENV/bin`` folder). Should be 
absolute path
+        (so usually start with "/" or "X:/" depending on the filesystem/os 
used).
+    :param python_callable: A python function with no references to outside 
variables,
+        defined with def, which will be run in a virtualenv
+    :param use_dill: Whether to use dill to serialize
+        the args and result (pickle is default). This allow more complex types
+        but if dill is not preinstalled in your venv, the task will fail with 
use_dill enabled.
+    :param op_args: A list of positional arguments to pass to python_callable.
+    :param op_kwargs: A dict of keyword arguments to pass to python_callable.
+    :param string_args: Strings that are present in the global var 
virtualenv_string_args,
+        available to python_callable at runtime as a list[str]. Note that args 
are split
+        by newline.
+    :param templates_dict: a dictionary where the values are templates that
+        will get templated by the Airflow engine sometime between
+        ``__init__`` and ``execute`` takes place and are made available
+        in your callable's context after the template has been applied
+    :param templates_exts: a list of file extensions to resolve while
+        processing templated fields, for examples ``['.sql', '.hql']``
+    """
+
+    template_fields: Sequence[str] = tuple({'python_path'} | 
set(PythonOperator.template_fields))
+
+    def __init__(
+        self,
+        *,
+        python: str,
+        python_callable: Callable,
+        use_dill: bool = False,
+        op_args: Optional[Collection[Any]] = None,
+        op_kwargs: Optional[Mapping[str, Any]] = None,
+        string_args: Optional[Iterable[str]] = None,
+        templates_dict: Optional[Dict] = None,
+        templates_exts: Optional[List[str]] = None,
+        **kwargs,
+    ):
+        if not python:
+            raise ValueError("Python Path must be defined in 
PythonPreexistingVirtualenvOperator")
+        self.python_path = Path(python)
+        if not self.python_path.exists():
+            raise ValueError(f"Python Path '{self.python_path}' must exists")
+        if not self.python_path.is_file():
+            raise ValueError(f"Python Path '{self.python_path}' must be a 
file")
+        if not self.python_path.is_absolute():
+            raise ValueError(f"Python Path '{self.python_path}' must be an 
absolute path.")
+        super().__init__(
+            python_callable=python_callable,
+            use_dill=use_dill,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            string_args=string_args,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            **kwargs,
+        )
+
+    def execute_callable(self):
+        python_version_as_list_of_strings = 
self._get_python_version_from_venv()
+        if (
+            python_version_as_list_of_strings
+            and str(python_version_as_list_of_strings[0]) != 
str(sys.version_info.major)
+            and (self.op_args or self.op_kwargs)
+        ):
+            raise AirflowException(
+                "Passing op_args or op_kwargs is not supported across 
different Python "
+                "major versions for PythonVirtualenvOperator. Please use 
string_args."
+                f"Sys version: {sys.version_info}. Venv version: 
{python_version_as_list_of_strings}"
+            )
+        with TemporaryDirectory(prefix='tmd') as tmp_dir:
+            tmp_path = Path(tmp_dir)
+            return 
self._execute_python_callable_in_subprocess(self.python_path, tmp_path)
+
+    def _get_virtualenv_path(self) -> Path:
+        return self.python_path.parents[1]
+
+    def _get_python_version_from_venv(self) -> List[str]:
+        try:
+            result = subprocess.check_output([self.python_path, "--version"], 
text=True)
+            return result.strip().split(" ")[-1].split(".")
+        except Exception as e:
+            raise ValueError(f"Error while executing {self.python_path}: {e}")
+
+    def _get_airflow_version_from_venv(self) -> Optional[str]:
+        try:
+            result = subprocess.check_output(
+                [self.python_path, "-c", "from airflow import version; 
print(version.version)"], text=True
+            )
+            venv_airflow_version = result.strip()
+            if venv_airflow_version != airflow_version:
+                raise AirflowConfigException(
+                    f"The version of airflow installed in virtualenv 
{self._get_virtualenv_path()} is "
+                    f"different than runtime Airflow error: {airflow_version}. 
Make sure your venv"
+                    f" has the same airflow version installed as Airflow 
runtime."

Review Comment:
   ```suggestion
                       f"The version of Airflow installed in the virtualenv 
{self._get_virtualenv_path()}: {venv_airflow_version} is "
                       f"different than the runtime Airflow version: 
{airflow_version}. Make sure your venv"
                       f" has the same Airflow version installed as the Airflow 
runtime."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to