This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ee7aff654876bb23a5ba0c73d81859cf3b01ce9
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Aug 9 17:59:26 2024 +0100

    Revert "Send context using in venv operator (#41039)" (#41362)
    
    This reverts commit da553935d248f22695124c40777d3ea29e04d57f.
    
    (cherry picked from commit 0a06cc683a877d63305060766ae78f9ffe1fba0e)
---
 airflow/decorators/__init__.pyi                    |   6 -
 .../example_python_context_decorator.py            |  92 ----------------
 .../example_python_context_operator.py             |  91 ---------------
 airflow/operators/python.py                        |  36 ------
 airflow/utils/python_virtualenv_script.jinja2      |  23 ----
 docs/apache-airflow/howto/operator/python.rst      |  92 ----------------
 tests/operators/test_python.py                     | 122 +--------------------
 7 files changed, 1 insertion(+), 461 deletions(-)

diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi
index 089e453d02..faf77e8240 100644
--- a/airflow/decorators/__init__.pyi
+++ b/airflow/decorators/__init__.pyi
@@ -125,7 +125,6 @@ class TaskDecoratorCollection:
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -177,7 +176,6 @@ class TaskDecoratorCollection:
         :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
             the args and result (pickle is default). This allows more complex 
types
             but requires you to include dill in your requirements.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> 
Task[FParams, FReturn]: ...
@@ -194,7 +192,6 @@ class TaskDecoratorCollection:
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to convert the decorated callable to a virtual 
environment task.
@@ -228,7 +225,6 @@ class TaskDecoratorCollection:
         :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
             the args and result (pickle is default). This allows more complex 
types
             but requires you to include dill in your requirements.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def branch(  # type: ignore[misc]
@@ -262,7 +258,6 @@ class TaskDecoratorCollection:
         venv_cache_path: None | str = None,
         show_return_value_in_logs: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ) -> TaskDecorator:
         """Create a decorator to wrap the decorated callable into a 
BranchPythonVirtualenvOperator.
@@ -304,7 +299,6 @@ class TaskDecoratorCollection:
         :param use_dill: Deprecated, use ``serializer`` instead. Whether to 
use dill to serialize
             the args and result (pickle is default). This allows more complex 
types
             but requires you to include dill in your requirements.
-        :param use_airflow_context: Whether to provide 
``get_current_context()`` to the python_callable.
         """
     @overload
     def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) 
-> Task[FParams, FReturn]: ...
diff --git a/airflow/example_dags/example_python_context_decorator.py 
b/airflow/example_dags/example_python_context_decorator.py
deleted file mode 100644
index 497ee08e17..0000000000
--- a/airflow/example_dags/example_python_context_decorator.py
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG demonstrating the usage of the PythonOperator with 
`get_current_context()` to get the current context.
-
-Also, demonstrates the usage of the TaskFlow API.
-"""
-
-from __future__ import annotations
-
-import sys
-
-import pendulum
-
-from airflow.decorators import dag, task
-
-SOME_EXTERNAL_PYTHON = sys.executable
-
-
-@dag(
-    schedule=None,
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    tags=["example"],
-)
-def example_python_context_decorator():
-    # [START get_current_context]
-    @task(task_id="print_the_context")
-    def print_context() -> str:
-        """Print the Airflow context."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context = print_context()
-    # [END get_current_context]
-
-    # [START get_current_context_venv]
-    @task.virtualenv(task_id="print_the_context_venv", 
use_airflow_context=True)
-    def print_context_venv() -> str:
-        """Print the Airflow context in venv."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_venv = print_context_venv()
-    # [END get_current_context_venv]
-
-    # [START get_current_context_external]
-    @task.external_python(
-        task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, 
use_airflow_context=True
-    )
-    def print_context_external() -> str:
-        """Print the Airflow context in external python."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_external = print_context_external()
-    # [END get_current_context_external]
-
-    _ = print_the_context >> [print_the_context_venv, 
print_the_context_external]
-
-
-example_python_context_decorator()
diff --git a/airflow/example_dags/example_python_context_operator.py 
b/airflow/example_dags/example_python_context_operator.py
deleted file mode 100644
index f1b76c527c..0000000000
--- a/airflow/example_dags/example_python_context_operator.py
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-Example DAG demonstrating the usage of the PythonOperator with 
`get_current_context()` to get the current context.
-
-Also, demonstrates the usage of the classic Python operators.
-"""
-
-from __future__ import annotations
-
-import sys
-
-import pendulum
-
-from airflow import DAG
-from airflow.operators.python import ExternalPythonOperator, PythonOperator, 
PythonVirtualenvOperator
-
-SOME_EXTERNAL_PYTHON = sys.executable
-
-with DAG(
-    dag_id="example_python_context_operator",
-    schedule=None,
-    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
-    catchup=False,
-    tags=["example"],
-) as dag:
-    # [START get_current_context]
-    def print_context() -> str:
-        """Print the Airflow context."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context = PythonOperator(task_id="print_the_context", 
python_callable=print_context)
-    # [END get_current_context]
-
-    # [START get_current_context_venv]
-    def print_context_venv() -> str:
-        """Print the Airflow context in venv."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_venv = PythonVirtualenvOperator(
-        task_id="print_the_context_venv", python_callable=print_context_venv, 
use_airflow_context=True
-    )
-    # [END get_current_context_venv]
-
-    # [START get_current_context_external]
-    def print_context_external() -> str:
-        """Print the Airflow context in external python."""
-        from pprint import pprint
-
-        from airflow.operators.python import get_current_context
-
-        context = get_current_context()
-        pprint(context)
-        return "Whatever you return gets printed in the logs"
-
-    print_the_context_external = ExternalPythonOperator(
-        task_id="print_the_context_external",
-        python_callable=print_context_external,
-        python=SOME_EXTERNAL_PYTHON,
-        use_airflow_context=True,
-    )
-    # [END get_current_context_external]
-
-    _ = print_the_context >> [print_the_context_venv, 
print_the_context_external]
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index ce6ccd3a40..fdfe575fb9 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -56,14 +56,12 @@ from airflow.utils.file import get_unique_dag_module_name
 from airflow.utils.operator_helpers import ExecutionCallableRunner, 
KeywordParameters
 from airflow.utils.process_utils import execute_in_subprocess
 from airflow.utils.python_virtualenv import prepare_virtualenv, 
write_python_script
-from airflow.utils.session import create_session
 
 log = logging.getLogger(__name__)
 
 if TYPE_CHECKING:
     from pendulum.datetime import DateTime
 
-    from airflow.serialization.enums import Encoding
     from airflow.utils.context import Context
 
 
@@ -444,7 +442,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if (
@@ -497,7 +494,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         )
         self.env_vars = env_vars
         self.inherit_env = inherit_env
-        self.use_airflow_context = use_airflow_context
 
     @abstractmethod
     def _iter_serializable_context_keys(self):
@@ -544,7 +540,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
             string_args_path = tmp_dir / "string_args.txt"
             script_path = tmp_dir / "script.py"
             termination_log_path = tmp_dir / "termination.log"
-            airflow_context_path = tmp_dir / "airflow_context.json"
 
             self._write_args(input_path)
             self._write_string_args(string_args_path)
@@ -556,7 +551,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                 "pickling_library": self.serializer,
                 "python_callable": self.python_callable.__name__,
                 "python_callable_source": self.get_python_source(),
-                "use_airflow_context": self.use_airflow_context,
             }
 
             if inspect.getfile(self.python_callable) == self.dag.fileloc:
@@ -567,23 +561,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                 filename=os.fspath(script_path),
                 
render_template_as_native_obj=self.dag.render_template_as_native_obj,
             )
-            if self.use_airflow_context:
-                from airflow.serialization.serialized_objects import 
BaseSerialization
-
-                context = get_current_context()
-                # TODO: `TaskInstance`` will also soon be serialized as 
expected.
-                # see more:
-                #   https://github.com/apache/airflow/issues/40974
-                #   https://github.com/apache/airflow/pull/41067
-                with create_session() as session:
-                    # FIXME: DetachedInstanceError
-                    dag_run, task_instance = context["dag_run"], 
context["task_instance"]
-                    session.add_all([dag_run, task_instance])
-                    serializable_context: dict[Encoding, Any] = 
BaseSerialization.serialize(
-                        context, use_pydantic_models=True
-                    )
-                with airflow_context_path.open("w+") as file:
-                    json.dump(serializable_context, file)
 
             env_vars = dict(os.environ) if self.inherit_env else {}
             if self.env_vars:
@@ -598,7 +575,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                         os.fspath(output_path),
                         os.fspath(string_args_path),
                         os.fspath(termination_log_path),
-                        os.fspath(airflow_context_path),
                     ],
                     env=env_vars,
                 )
@@ -690,7 +666,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
     :param use_dill: Deprecated, use ``serializer`` instead. Whether to use 
dill to serialize
         the args and result (pickle is default). This allows more complex types
         but requires you to include dill in your requirements.
-    :param use_airflow_context: Whether to provide ``get_current_context()`` 
to the python_callable.
     """
 
     template_fields: Sequence[str] = tuple(
@@ -719,7 +694,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if (
@@ -741,9 +715,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             )
         if not is_venv_installed():
             raise AirflowException("PythonVirtualenvOperator requires 
virtualenv, please install it.")
-        if use_airflow_context and (not expect_airflow and not 
system_site_packages):
-            error_msg = "use_airflow_context is set to True, but 
expect_airflow and system_site_packages are set to False."
-            raise AirflowException(error_msg)
         if not requirements:
             self.requirements: list[str] = []
         elif isinstance(requirements, str):
@@ -773,7 +744,6 @@ class 
PythonVirtualenvOperator(_BasePythonVirtualenvOperator):
             env_vars=env_vars,
             inherit_env=inherit_env,
             use_dill=use_dill,
-            use_airflow_context=use_airflow_context,
             **kwargs,
         )
 
@@ -992,7 +962,6 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator):
     :param use_dill: Deprecated, use ``serializer`` instead. Whether to use 
dill to serialize
         the args and result (pickle is default). This allows more complex types
         but requires you to include dill in your requirements.
-    :param use_airflow_context: Whether to provide ``get_current_context()`` 
to the python_callable.
     """
 
     template_fields: Sequence[str] = 
tuple({"python"}.union(PythonOperator.template_fields))
@@ -1014,14 +983,10 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
         env_vars: dict[str, str] | None = None,
         inherit_env: bool = True,
         use_dill: bool = False,
-        use_airflow_context: bool = False,
         **kwargs,
     ):
         if not python:
             raise ValueError("Python Path must be defined in 
ExternalPythonOperator")
-        if use_airflow_context and not expect_airflow:
-            error_msg = "use_airflow_context is set to True, but 
expect_airflow is set to False."
-            raise AirflowException(error_msg)
         self.python = python
         self.expect_pendulum = expect_pendulum
         super().__init__(
@@ -1037,7 +1002,6 @@ class 
ExternalPythonOperator(_BasePythonVirtualenvOperator):
             env_vars=env_vars,
             inherit_env=inherit_env,
             use_dill=use_dill,
-            use_airflow_context=use_airflow_context,
             **kwargs,
         )
 
diff --git a/airflow/utils/python_virtualenv_script.jinja2 
b/airflow/utils/python_virtualenv_script.jinja2
index 22d68acd75..2ff417985e 100644
--- a/airflow/utils/python_virtualenv_script.jinja2
+++ b/airflow/utils/python_virtualenv_script.jinja2
@@ -64,29 +64,6 @@ with open(sys.argv[3], "r") as file:
     virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
 {% endif %}
 
-{% if use_airflow_context | default(false) -%}
-if len(sys.argv) > 5:
-    import json
-    from types import ModuleType
-
-    from airflow.operators import python as airflow_python
-    from airflow.serialization.serialized_objects import BaseSerialization
-
-
-    class _MockPython(ModuleType):
-        @staticmethod
-        def get_current_context():
-            with open(sys.argv[5]) as file:
-                context = json.load(file)
-                return BaseSerialization.deserialize(context, 
use_pydantic_models=True)
-
-        def __getattr__(self, name: str):
-            return getattr(airflow_python, name)
-
-
-    MockPython = _MockPython("MockPython")
-    sys.modules["airflow.operators.python"] = MockPython
-{% endif %}
 
 try:
     res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
diff --git a/docs/apache-airflow/howto/operator/python.rst 
b/docs/apache-airflow/howto/operator/python.rst
index 5b5a60b6bc..b8619cd38b 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -102,37 +102,6 @@ is evaluated as a :ref:`Jinja template 
<concepts:jinja-templating>`.
             :start-after: [START howto_operator_python_render_sql]
             :end-before: [END howto_operator_python_render_sql]
 
-Context
-^^^^^^^
-
-The ``Context`` is a dictionary object that contains information
-about the environment of the ``DagRun``.
-For example, selecting ``task_instance`` will get the currently running 
``TaskInstance`` object.
-
-It can be used implicitly, such as with ``**kwargs``,
-but can also be used explicitly with ``get_current_context()``.
-In this case, the type hint can be used for static analysis.
-
-.. tab-set::
-
-    .. tab-item:: @task
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context]
-            :end-before: [END get_current_context]
-
-    .. tab-item:: PythonOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context]
-            :end-before: [END get_current_context]
-
 .. _howto/operator:PythonVirtualenvOperator:
 
 PythonVirtualenvOperator
@@ -234,42 +203,6 @@ In case you have problems during runtime with broken 
cached virtual environments
 Note that any modification of a cached virtual environment (like temp files in 
binary path, post-installing further requirements) might pollute a cached 
virtual environment and the
 operator is not maintaining or cleaning the cache path.
 
-Context
-^^^^^^^
-
-With some limitations, you can also use ``Context`` in virtual environments.
-
-.. important::
-    Using ``Context`` in a virtual environment is a bit of a challenge
-    because it involves library dependencies and serialization issues.
-
-    You can bypass this to some extent by using :ref:`Jinja template variables 
<templates:variables>` and explicitly passing it as a parameter.
-
-    You can also use ``get_current_context()`` in the same way as before, but 
with some limitations.
-
-    * set ``use_airflow_context`` to ``True`` to call 
``get_current_context()`` in the virtual environment.
-
-    * set ``system_site_packages`` to ``True`` or set ``expect_airflow`` to 
``True``
-
-.. tab-set::
-
-    .. tab-item:: @task.virtualenv
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_venv]
-            :end-before: [END get_current_context_venv]
-
-    .. tab-item:: PythonVirtualenvOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_venv]
-            :end-before: [END get_current_context_venv]
 
 .. _howto/operator:ExternalPythonOperator:
 
@@ -334,31 +267,6 @@ If you want the context related to datetime objects like 
``data_interval_start``
     If you want to pass variables into the classic 
:class:`~airflow.operators.python.ExternalPythonOperator` use
     ``op_args`` and ``op_kwargs``.
 
-Context
-^^^^^^^
-
-You can use ``Context`` under the same conditions as 
``PythonVirtualenvOperator``.
-
-.. tab-set::
-
-    .. tab-item:: @task.external_python
-        :sync: taskflow
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_decorator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_external]
-            :end-before: [END get_current_context_external]
-
-    .. tab-item:: ExternalPythonOperator
-        :sync: operator
-
-        .. exampleinclude:: 
/../../airflow/example_dags/example_python_context_operator.py
-            :language: python
-            :dedent: 4
-            :start-after: [START get_current_context_external]
-            :end-before: [END get_current_context_external]
-
 .. _howto/operator:PythonBranchOperator:
 
 PythonBranchOperator
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 9148ae18b7..993d70cad3 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -39,15 +39,10 @@ import pytest
 from slugify import slugify
 
 from airflow.decorators import task_group
-from airflow.exceptions import (
-    AirflowException,
-    DeserializingResultError,
-    RemovedInAirflow3Warning,
-)
+from airflow.exceptions import AirflowException, DeserializingResultError, 
RemovedInAirflow3Warning
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import TaskInstance, clear_task_instances, 
set_current_context
-from airflow.operators.branch import BranchMixIn
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import (
     BranchExternalPythonOperator,
@@ -1010,75 +1005,6 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
         task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, 
inherit_env=True)
         assert task.execute_callable() == "EFGHI"
 
-    def test_branch_current_context(self):
-        if not issubclass(self.opcls, BranchMixIn):
-            pytest.skip("This test is only applicable to BranchMixIn")
-
-    def test_current_context(self):
-        def f():
-            from airflow.operators.python import get_current_context
-            from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got {type(context)}"
-                raise TypeError(error_msg)
-
-            return []
-
-        ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True)
-        assert ti.state == TaskInstanceState.SUCCESS
-
-    def test_current_context_not_found_error(self):
-        def f():
-            from airflow.operators.python import get_current_context
-
-            get_current_context()
-            return []
-
-        with pytest.raises(
-            AirflowException,
-            match="Current context was requested but no context was found! "
-            "Are you running within an airflow task?",
-        ):
-            self.run_as_task(f, return_ti=True, multiple_outputs=False, 
use_airflow_context=False)
-
-    def test_current_context_airflow_not_found_error(self):
-        airflow_flag: dict[str, bool] = {"expect_airflow": False}
-        error_msg = "use_airflow_context is set to True, but expect_airflow is 
set to False."
-
-        if not issubclass(self.opcls, ExternalPythonOperator):
-            airflow_flag["system_site_packages"] = False
-            error_msg = "use_airflow_context is set to True, but 
expect_airflow and system_site_packages are set to False."
-
-        def f():
-            from airflow.operators.python import get_current_context
-
-            get_current_context()
-            return []
-
-        with pytest.raises(AirflowException, match=error_msg):
-            self.run_as_task(
-                f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True, **airflow_flag
-            )
-
-    def test_use_airflow_context_touch_other_variables(self):
-        def f():
-            from airflow.operators.python import get_current_context
-            from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got {type(context)}"
-                raise TypeError(error_msg)
-
-            from airflow.operators.python import PythonOperator  # noqa: F401
-
-            return []
-
-        ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, 
use_airflow_context=True)
-        assert ti.state == TaskInstanceState.SUCCESS
-
 
 venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path")
 
@@ -1500,29 +1426,6 @@ class 
TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator):
 
         self.run_as_task(f, serializer=serializer, system_site_packages=False, 
requirements=None)
 
-    def test_current_context_system_site_packages(self, session):
-        def f():
-            from airflow.operators.python import get_current_context
-            from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got {type(context)}"
-                raise TypeError(error_msg)
-
-            return []
-
-        ti = self.run_as_task(
-            f,
-            return_ti=True,
-            multiple_outputs=False,
-            use_airflow_context=True,
-            session=session,
-            expect_airflow=False,
-            system_site_packages=True,
-        )
-        assert ti.state == TaskInstanceState.SUCCESS
-
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers
@@ -1842,29 +1745,6 @@ class 
TestBranchPythonVirtualenvOperator(BaseTestBranchPythonVirtualenvOperator)
                 kwargs["venv_cache_path"] = venv_cache_path
         return kwargs
 
-    def test_current_context_system_site_packages(self, session):
-        def f():
-            from airflow.operators.python import get_current_context
-            from airflow.utils.context import Context
-
-            context = get_current_context()
-            if not isinstance(context, Context):  # type: ignore[misc]
-                error_msg = f"Expected Context, got {type(context)}"
-                raise TypeError(error_msg)
-
-            return []
-
-        ti = self.run_as_task(
-            f,
-            return_ti=True,
-            multiple_outputs=False,
-            use_airflow_context=True,
-            session=session,
-            expect_airflow=False,
-            system_site_packages=True,
-        )
-        assert ti.state == TaskInstanceState.SUCCESS
-
 
 # when venv tests are run in parallel to other test they create new processes 
and this might take
 # quite some time in shared docker environment and get some contention even 
between different containers

Reply via email to