This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3ff4f2dc248 Removing feature: send context in venv operators (using 
`use_airflow_context`) (#46306)
3ff4f2dc248 is described below

commit 3ff4f2dc248bd633ede6b4eb5b7d38e40d404157
Author: Amogh Desai <amoghrajesh1...@gmail.com>
AuthorDate: Mon Feb 3 11:41:19 2025 +0530

    Removing feature: send context in venv operators (using 
`use_airflow_context`) (#46306)
---
 airflow/decorators/__init__.pyi                    |   6 -
 .../example_python_context_decorator.py            |  92 ------------
 .../example_python_context_operator.py             |  95 ------------
 providers/standard/docs/operators/python.rst       |  81 ----------
 .../airflow/providers/standard/operators/python.py |  48 ------
 .../standard/utils/python_virtualenv_script.jinja2 |  24 ---
 .../standard/operators/test_python.py              | 165 ---------------------
 7 files changed, 511 deletions(-)

diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index 49504fa388a..ee42f103fb3 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -126,7 +126,6 @@ class TaskDecoratorCollection:
         show_return_value_in_logs: bool = True,
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -172,7 +171,6 @@ class TaskDecoratorCollection:
             environment. If set to ``True``, the virtual environment will 
inherit the environment variables
             of the parent process (``os.environ``). If set to ``False``, the 
virtual environment will be
             executed with a clean environment.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> 
Task[FParams, FReturn]: ...
@@ -188,7 +186,6 @@ class TaskDecoratorCollection:
         show_return_value_in_logs: bool = True,
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -219,7 +216,6 @@ class TaskDecoratorCollection:
             environment. If set to ``True``, the virtual environment will 
inherit the environment variables
             of the parent process (``os.environ``). If set to ``False``, the 
virtual environment will be
             executed with a clean environment.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def branch(  # type: ignore[misc]
@@ -252,7 +248,6 @@ class TaskDecoratorCollection:
         index_urls: None | Collection[str] | str = None,
         venv_cache_path: None | str = None,
         show_return_value_in_logs: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to wrap the decorated callable into a 
BranchPythonVirtualenvOperator.
@@ -291,7 +286,6 @@ class TaskDecoratorCollection:
             logs. Defaults to True, which allows return value log output.
             It can be set to False to prevent log output of return value when 
you return huge data
             such as transmission a large amount of XCom to TaskAPI.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) 
-> Task[FParams, FReturn]: ...
diff --git a/airflow/example_dags/example_python_context_decorator.py 
b/airflow/example_dags/example_python_context_decorator.py
deleted file mode 100644
index 9cfc3187574..00000000000
--- a/airflow/example_dags/example_python_context_decorator.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG demonstrating the usage of the PythonOperator with 
`get_current_context()` to get the current context.
-
-Also, demonstrates the usage of the TaskFlow API.
-"""
-
-from __future__ import annotations
-
-import sys
-
-import pendulum
-
-from airflow.decorators import dag, task
-
-SOME_EXTERNAL_PYTHON = sys.executable
-
-
-@dag(
-    schedule=None,
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    tags=["example"],
-)
-def example_python_context_decorator():
-    # [START get_current_context]
-    @task(task_id="print_the_context")
-    def print_context() -> str:
-        """Print the Airflow context."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context = print_context()
-    # [END get_current_context]
-
-    # [START get_current_context_venv]
-    @task.virtualenv(task_id="print_the_context_venv", 
use_airflow_context=True)
-    def print_context_venv() -> str:
-        """Print the Airflow context in venv."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_venv = print_context_venv()
-    # [END get_current_context_venv]
-
-    # [START get_current_context_external]
-    @task.external_python(
-        task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, 
use_airflow_context=True
-    )
-    def print_context_external() -> str:
-        """Print the Airflow context in external python."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_external = print_context_external()
-    # [END get_current_context_external]
-
-    _ = print_the_context >> [print_the_context_venv, 
print_the_context_external]
-
-
-example_python_context_decorator()
diff --git a/airflow/example_dags/example_python_context_operator.py 
b/airflow/example_dags/example_python_context_operator.py
deleted file mode 100644
index 4dc9383dd06..00000000000
--- a/airflow/example_dags/example_python_context_operator.py
+++ /dev/null
@@ -1,95 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG demonstrating the usage of the PythonOperator with 
`get_current_context()` to get the current context.
-
-Also, demonstrates the usage of the classic Python operators.
-"""
-
-from __future__ import annotations
-
-import sys
-
-import pendulum
-
-from airflow import DAG
-from airflow.providers.standard.operators.python import (
-    ExternalPythonOperator,
-    PythonOperator,
-    PythonVirtualenvOperator,
-)
-
-SOME_EXTERNAL_PYTHON = sys.executable
-
-with DAG(
-    dag_id="example_python_context_operator",
-    schedule=None,
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    tags=["example"],
-) as dag:
-    # [START get_current_context]
-    def print_context() -> str:
-        """Print the Airflow context."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context = PythonOperator(task_id="print_the_context", 
python_callable=print_context)
-    # [END get_current_context]
-
-    # [START get_current_context_venv]
-    def print_context_venv() -> str:
-        """Print the Airflow context in venv."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_venv = PythonVirtualenvOperator(
-        task_id="print_the_context_venv", python_callable=print_context_venv, 
use_airflow_context=True
-    )
-    # [END get_current_context_venv]
-
-    # [START get_current_context_external]
-    def print_context_external() -> str:
-        """Print the Airflow context in external python."""
-        from pprint import pprint
-
-        from airflow.providers.standard.operators.python import 
get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_external = ExternalPythonOperator(
-        task_id="print_the_context_external",
-        python_callable=print_context_external,
-        python=SOME_EXTERNAL_PYTHON,
-        use_airflow_context=True,
-    )
-    # [END get_current_context_external]
-
-    _ = print_the_context >> [print_the_context_venv, 
print_the_context_external]
diff --git a/providers/standard/docs/operators/python.rst 
b/providers/standard/docs/operators/python.rst
index 44cc4a43341..968c5b37a3e 100644
--- a/providers/standard/docs/operators/python.rst
+++ b/providers/standard/docs/operators/python.rst
@@ -113,25 +113,6 @@ It can be used implicitly, such as with ``**kwargs``,
 but can also be used explicitly with ``get_current_context()``.
 In this case, the type hint can be used for static analysis.
 
-.. tab-set::
-
-    .. tab-item:: @task
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context]
-            :end-before: [END get_current_context]
-
-    .. tab-item:: PythonOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context]
-            :end-before: [END get_current_context]
 
 .. _howto/operator:PythonVirtualenvOperator:
 
@@ -240,44 +221,6 @@ In case you have problems during runtime with broken 
cached virtual environments
 Note that any modification of a cached virtual environment (like temp files in 
binary path, post-installing further requirements) might pollute a cached 
virtual environment and the
 operator is not maintaining or cleaning the cache path.
 
-Context
-^^^^^^^
-
-With some limitations, you can also use ``Context`` in virtual environments.
-
-.. important::
-    Using ``Context`` in a virtual environment is a bit of a challenge
-    because it involves library dependencies and serialization issues.
-
-    You can bypass this to some extent by using :ref:`Jinja template variables 
<templates:variables>` and explicitly passing it as a parameter.
-
-    You can also use ``get_current_context()`` in the same way as before, but 
with some limitations.
-
-    * Requires ``apache-airflow>=3.0.0``.
-
-    * Set ``use_airflow_context`` to ``True`` to call 
``get_current_context()`` in the virtual environment.
-
-    * Set ``system_site_packages`` to ``True`` or set ``expect_airflow`` to 
``True``
-
-.. tab-set::
-
-    .. tab-item:: @task.virtualenv
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_venv]
-            :end-before: [END get_current_context_venv]
-
-    .. tab-item:: PythonVirtualenvOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_venv]
-            :end-before: [END get_current_context_venv]
 
 .. _howto/operator:ExternalPythonOperator:
 
@@ -347,30 +290,6 @@ Templating
 
 Jinja templating can be used in same way as described for the 
:ref:`howto/operator:PythonOperator`.
 
-Context
-^^^^^^^
-
-You can use ``Context`` under the same conditions as 
``PythonVirtualenvOperator``.
-
-.. tab-set::
-
-    .. tab-item:: @task.external_python
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_external]
-            :end-before: [END get_current_context_external]
-
-    .. tab-item:: ExternalPythonOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_external]
-            :end-before: [END get_current_context_external]
 
 .. _howto/operator:PythonBranchOperator:
 
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 7afaaecfaee..6b89398ef77 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -418,7 +418,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         skip_on_exit_code: int | Container[int] | None = None,
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if (
@@ -460,7 +459,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         )
         self.env_vars = env_vars
         self.inherit_env = inherit_env
-        self.use_airflow_context = use_airflow_context
 
     @abstractmethod
     def _iter_serializable_context_keys(self):
@@ -519,7 +517,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                 "pickling_library": self.serializer,
                 "python_callable": self.python_callable.__name__,
                 "python_callable_source": self.get_python_source(),
-                "use_airflow_context": self.use_airflow_context,
             }
 
             if inspect.getfile(self.python_callable) == self.dag.fileloc:
@@ -530,20 +527,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                 filename=os.fspath(script_path),
                 
render_template_as_native_obj=self.dag.render_template_as_native_obj,
             )
-            if self.use_airflow_context:
-                # TODO: replace with commented code when context serialization 
is implemented in AIP-72
-                raise AirflowException(
-                    "The `use_airflow_context=True` is not yet implemented. "
-                    "It will work in Airflow 3 after AIP-72 context "
-                    "serialization is ready."
-                )
-                # context = get_current_context()
-                # with create_session() as session:
-                #     dag_run, task_instance = context["dag_run"], 
context["task_instance"]
-                #     session.add_all([dag_run, task_instance])
-                #     serializable_context: dict[Encoding, Any] = # Get 
serializable context here
-                # with airflow_context_path.open("w+") as file:
-                #     json.dump(serializable_context, file)
 
             env_vars = dict(os.environ) if self.inherit_env else {}
             if self.env_vars:
@@ -658,8 +641,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         environment. If set to ``True``, the virtual environment will inherit 
the environment variables
         of the parent process (``os.environ``). If set to ``False``, the 
virtual environment will be
         executed with a clean environment.
-    :param use_airflow_context: Whether to provide ``get_current_context()`` 
to the python_callable.
-        NOT YET IMPLEMENTED - waits for AIP-72 context serialization.
     """
 
     template_fields: Sequence[str] = tuple(
@@ -687,7 +668,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         venv_cache_path: None | os.PathLike[str] = None,
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if (
@@ -704,18 +684,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             raise AirflowException(
                 "Passing non-string types (e.g. int or float) as 
python_version not supported"
             )
-        if use_airflow_context and (not expect_airflow and not 
system_site_packages):
-            raise AirflowException(
-                "The `use_airflow_context` parameter is set to True, but "
-                "expect_airflow and system_site_packages are set to False."
-            )
-        # TODO: remove when context serialization is implemented in AIP-72
-        if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
-            raise AirflowException(
-                "The `use_airflow_context=True` is not yet implemented. "
-                "It will work in Airflow 3 after AIP-72 context "
-                "serialization is ready."
-            )
         if not requirements:
             self.requirements: list[str] = []
         elif isinstance(requirements, str):
@@ -744,7 +712,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             skip_on_exit_code=skip_on_exit_code,
             env_vars=env_vars,
             inherit_env=inherit_env,
-            use_airflow_context=use_airflow_context,
             **kwargs,
         )
 
@@ -960,8 +927,6 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
         environment. If set to ``True``, the virtual environment will inherit 
the environment variables
         of the parent process (``os.environ``). If set to ``False``, the 
virtual environment will be
         executed with a clean environment.
-    :param use_airflow_context: Whether to provide ``get_current_context()`` 
to the python_callable.
-        NOT YET IMPLEMENTED - waits for AIP-72 context serialization.
     """
 
     template_fields: Sequence[str] = 
tuple({"python"}.union(PythonOperator.template_fields))
@@ -982,22 +947,10 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
         skip_on_exit_code: int | Container[int] | None = None,
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if not python:
             raise ValueError("Python Path must be defined in 
ExternalPythonOperator")
-        if use_airflow_context and not expect_airflow:
-            raise AirflowException(
-                "The `use_airflow_context` parameter is set to True, but 
expect_airflow is set to False."
-            )
-        # TODO: remove when context serialization is implemented in AIP-72
-        if use_airflow_context:
-            raise AirflowException(
-                "The `use_airflow_context=True` is not yet implemented. "
-                "It will work in Airflow 3 after AIP-72 context "
-                "serialization is ready."
-            )
         self.python = python
         self.expect_pendulum = expect_pendulum
         super().__init__(
@@ -1012,7 +965,6 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
             skip_on_exit_code=skip_on_exit_code,
             env_vars=env_vars,
             inherit_env=inherit_env,
-            use_airflow_context=use_airflow_context,
             **kwargs,
         )
 
diff --git 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
index 6b803b408f2..da5b2cbf9e1 100644
--- 
a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
+++ 
b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2
@@ -64,30 +64,6 @@ with open(sys.argv[3], "r") as file:
     virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
 {% endif %}
 
-{% if use_airflow_context | default(false) -%}
-if len(sys.argv) > 5:
-    import json
-    from types import ModuleType
-
-    from airflow.providers.standard.operators import python as airflow_python
-
-
-    class _MockPython(ModuleType):
-        @staticmethod
-        def get_current_context():
-            with open(sys.argv[5]) as file:
-                context = json.load(file)
-                raise Exception("Not yet implemented")
-                # TODO: return deserialized context
-
-        def __getattr__(self, name: str):
-            return getattr(airflow_python, name)
-
-
-    MockPython = _MockPython("MockPython")
-    sys.modules["airflow.providers.standard.operators.python"] = MockPython
-{% endif %}
-
 try:
     res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
 except Exception as e:
diff --git 
a/providers/standard/tests/provider_tests/standard/operators/test_python.py 
b/providers/standard/tests/provider_tests/standard/operators/test_python.py
index b4320e7a193..8ec569c8d22 100644
--- a/providers/standard/tests/provider_tests/standard/operators/test_python.py
+++ b/providers/standard/tests/provider_tests/standard/operators/test_python.py
@@ -91,8 +91,6 @@ DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, 
reason="`dill` is not insta
 CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None
 CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, 
reason="`cloudpickle` is not installed")
 
-AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE = r"The `use_airflow_context=True` 
is not yet implemented."
-
 
 class BasePythonTest:
     """Base test class for TestPythonOperator and TestPythonSensor classes"""
@@ -1042,91 +1040,6 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, 
inherit_env=True)
         assert task.execute_callable() == "EFGHI"
 
-    def test_current_context(self):
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            try:
-                from airflow.sdk.definitions.context import Context
-            except ImportError:
-                # TODO: Remove once provider drops support for Airflow 2
-                from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got 
{type(context)}:{context!r}"
-                raise TypeError(error_msg)
-
-            return []
-
-        # TODO: replace with commented code when context serialization is 
implemented in AIP-72
-        with pytest.raises(Exception, 
match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE):
-            self.run_as_task(f, return_ti=True, use_airflow_context=True)
-
-        # ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True)
-        # assert ti.state == TaskInstanceState.SUCCESS
-
-    def test_current_context_not_found_error(self):
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            get_current_context()
-            return []
-
-        with pytest.raises(
-            AirflowException,
-            match="Current context was requested but no context was found! "
-            "Are you running within an Airflow task?",
-        ):
-            self.run_as_task(f, return_ti=True, use_airflow_context=False)
-
-    def test_current_context_airflow_not_found_error(self):
-        airflow_flag: dict[str, bool] = {"expect_airflow": False}
-
-        error_msg = r"The `use_airflow_context` parameter is set to True, but 
expect_airflow is set to False."
-
-        if not issubclass(self.opcls, ExternalPythonOperator):
-            airflow_flag["system_site_packages"] = False
-            error_msg = (
-                r"The `use_airflow_context` parameter is set to True, but "
-                r"expect_airflow and system_site_packages are set to False."
-            )
-
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            get_current_context()
-            return []
-
-        with pytest.raises(AirflowException, match=error_msg):
-            self.run_as_task(
-                f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True, **airflow_flag
-            )
-
-    def test_use_airflow_context_touch_other_variables(self):
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            try:
-                from airflow.sdk.definitions.context import Context
-            except ImportError:
-                # TODO: Remove once provider drops support for Airflow 2
-                from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got 
{type(context)}:{context!r}"
-                raise TypeError(error_msg)
-
-            return []
-
-        # TODO: replace with commented code when context serialization is 
implemented in AIP-72
-        with pytest.raises(AirflowException, 
match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE):
-            self.run_as_task(f, return_ti=True, use_airflow_context=True)
-
-        # ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True)
-        # assert ti.state == TaskInstanceState.SUCCESS
-
 
 venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path")
 
@@ -1486,45 +1399,6 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
 
         self.run_as_task(f, serializer=serializer, system_site_packages=False, 
requirements=None)
 
-    def test_current_context_system_site_packages(self, session):
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            try:
-                from airflow.sdk.definitions.context import Context
-            except ImportError:
-                # TODO: Remove once provider drops support for Airflow 2
-                from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got 
{type(context)}:{context!r}"
-                raise TypeError(error_msg)
-
-            return []
-
-        # TODO: replace with commented code when context serialization is 
implemented in AIP-72
-        with pytest.raises(AirflowException, 
match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE):
-            self.run_as_task(
-                f,
-                return_ti=True,
-                use_airflow_context=True,
-                session=session,
-                expect_airflow=False,
-                system_site_packages=True,
-            )
-
-        # ti = self.run_as_task(
-        #     f,
-        #     return_ti=True,
-        #     multiple_outputs=False,
-        #     use_airflow_context=True,
-        #     session=session,
-        #     expect_airflow=False,
-        #     system_site_packages=True,
-        # )
-        # assert ti.state == TaskInstanceState.SUCCESS
-
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers
@@ -1854,45 +1728,6 @@ class 
TestBranchPythonVirtualenvOperator(BaseTestBranchPythonVirtualenvOperator)
                 kwargs["venv_cache_path"] = venv_cache_path
         return kwargs
 
-    def test_current_context_system_site_packages(self, session):
-        def f():
-            from airflow.providers.standard.operators.python import 
get_current_context
-
-            try:
-                from airflow.sdk.definitions.context import Context
-            except ImportError:
-                # TODO: Remove once provider drops support for Airflow 2
-                from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got 
{type(context)}:{context!r}"
-                raise TypeError(error_msg)
-
-            return []
-
-        # TODO: replace with commented code when context serialization is 
implemented in AIP-72
-        with pytest.raises(AirflowException, 
match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE):
-            self.run_as_task(
-                f,
-                return_ti=True,
-                use_airflow_context=True,
-                session=session,
-                expect_airflow=False,
-                system_site_packages=True,
-            )
-
-        # ti = self.run_as_task(
-        #     f,
-        #     return_ti=True,
-        #     multiple_outputs=False,
-        #     use_airflow_context=True,
-        #     session=session,
-        #     expect_airflow=False,
-        #     system_site_packages=True,
-        # )
-        # assert ti.state == TaskInstanceState.SUCCESS
-
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers

Reply via email to