This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 3ff4f2dc248 Removing feature: send context in venv operators (using `use_airflow_context`) (#46306) 3ff4f2dc248 is described below commit 3ff4f2dc248bd633ede6b4eb5b7d38e40d404157 Author: Amogh Desai <amoghrajesh1...@gmail.com> AuthorDate: Mon Feb 3 11:41:19 2025 +0530 Removing feature: send context in venv operators (using `use_airflow_context`) (#46306) --- airflow/decorators/__init__.pyi | 6 - .../example_python_context_decorator.py | 92 ------------ .../example_python_context_operator.py | 95 ------------ providers/standard/docs/operators/python.rst | 81 ---------- .../airflow/providers/standard/operators/python.py | 48 ------ .../standard/utils/python_virtualenv_script.jinja2 | 24 --- .../standard/operators/test_python.py | 165 --------------------- 7 files changed, 511 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 49504fa388a..ee42f103fb3 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -126,7 +126,6 @@ class TaskDecoratorCollection: show_return_value_in_logs: bool = True, env_vars: dict[str, str] | None = None, inherit_env: bool = True, - use_airflow_context: bool = False, **kwargs, ) -> TaskDecorator: """Create a decorator to convert the decorated callable to a virtual environment task. @@ -172,7 +171,6 @@ class TaskDecoratorCollection: environment. If set to ``True``, the virtual environment will inherit the environment variables of the parent process (``os.environ``). If set to ``False``, the virtual environment will be executed with a clean environment. - :param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable. """ @overload def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @@ -188,7 +186,6 @@ class TaskDecoratorCollection: show_return_value_in_logs: bool = True, env_vars: dict[str, str] | None = None, inherit_env: bool = True, - use_airflow_context: bool = False, **kwargs, ) -> TaskDecorator: """Create a decorator to convert the decorated callable to a virtual environment task. @@ -219,7 +216,6 @@ class TaskDecoratorCollection: environment. If set to ``True``, the virtual environment will inherit the environment variables of the parent process (``os.environ``). If set to ``False``, the virtual environment will be executed with a clean environment. - :param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable. """ @overload def branch( # type: ignore[misc] @@ -252,7 +248,6 @@ class TaskDecoratorCollection: index_urls: None | Collection[str] | str = None, venv_cache_path: None | str = None, show_return_value_in_logs: bool = True, - use_airflow_context: bool = False, **kwargs, ) -> TaskDecorator: """Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator. @@ -291,7 +286,6 @@ class TaskDecoratorCollection: logs. Defaults to True, which allows return value log output. It can be set to False to prevent log output of return value when you return huge data such as transmission a large amount of XCom to TaskAPI. - :param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable. """ @overload def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... diff --git a/airflow/example_dags/example_python_context_decorator.py b/airflow/example_dags/example_python_context_decorator.py deleted file mode 100644 index 9cfc3187574..00000000000 --- a/airflow/example_dags/example_python_context_decorator.py +++ /dev/null @@ -1,92 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example DAG demonstrating the usage of the PythonOperator with `get_current_context()` to get the current context. - -Also, demonstrates the usage of the TaskFlow API. -""" - -from __future__ import annotations - -import sys - -import pendulum - -from airflow.decorators import dag, task - -SOME_EXTERNAL_PYTHON = sys.executable - - -@dag( - schedule=None, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - tags=["example"], -) -def example_python_context_decorator(): - # [START get_current_context] - @task(task_id="print_the_context") - def print_context() -> str: - """Print the Airflow context.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context = print_context() - # [END get_current_context] - - # [START get_current_context_venv] - @task.virtualenv(task_id="print_the_context_venv", use_airflow_context=True) - def print_context_venv() -> str: - """Print the Airflow context in venv.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context_venv = print_context_venv() - # [END get_current_context_venv] - - # [START get_current_context_external] - @task.external_python( - task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, use_airflow_context=True - ) - def print_context_external() -> str: - """Print the Airflow context in external python.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context_external = print_context_external() - # [END get_current_context_external] - - _ = print_the_context >> [print_the_context_venv, print_the_context_external] - - -example_python_context_decorator() diff --git a/airflow/example_dags/example_python_context_operator.py b/airflow/example_dags/example_python_context_operator.py deleted file mode 100644 index 4dc9383dd06..00000000000 --- a/airflow/example_dags/example_python_context_operator.py +++ /dev/null @@ -1,95 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example DAG demonstrating the usage of the PythonOperator with `get_current_context()` to get the current context. - -Also, demonstrates the usage of the classic Python operators. -""" - -from __future__ import annotations - -import sys - -import pendulum - -from airflow import DAG -from airflow.providers.standard.operators.python import ( - ExternalPythonOperator, - PythonOperator, - PythonVirtualenvOperator, -) - -SOME_EXTERNAL_PYTHON = sys.executable - -with DAG( - dag_id="example_python_context_operator", - schedule=None, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - tags=["example"], -) as dag: - # [START get_current_context] - def print_context() -> str: - """Print the Airflow context.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context = PythonOperator(task_id="print_the_context", python_callable=print_context) - # [END get_current_context] - - # [START get_current_context_venv] - def print_context_venv() -> str: - """Print the Airflow context in venv.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context_venv = PythonVirtualenvOperator( - task_id="print_the_context_venv", python_callable=print_context_venv, use_airflow_context=True - ) - # [END get_current_context_venv] - - # [START get_current_context_external] - def print_context_external() -> str: - """Print the Airflow context in external python.""" - from pprint import pprint - - from airflow.providers.standard.operators.python import get_current_context - - context = get_current_context() - pprint(context) - return "Whatever you return gets printed in the logs" - - print_the_context_external = ExternalPythonOperator( - task_id="print_the_context_external", - python_callable=print_context_external, - python=SOME_EXTERNAL_PYTHON, - use_airflow_context=True, - ) - # [END get_current_context_external] - - _ = print_the_context >> [print_the_context_venv, print_the_context_external] diff --git a/providers/standard/docs/operators/python.rst b/providers/standard/docs/operators/python.rst index 44cc4a43341..968c5b37a3e 100644 --- a/providers/standard/docs/operators/python.rst +++ b/providers/standard/docs/operators/python.rst @@ -113,25 +113,6 @@ It can be used implicitly, such as with ``**kwargs``, but can also be used explicitly with ``get_current_context()``. In this case, the type hint can be used for static analysis. -.. tab-set:: - - .. tab-item:: @task - :sync: taskflow - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_decorator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context] - :end-before: [END get_current_context] - - .. tab-item:: PythonOperator - :sync: operator - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_operator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context] - :end-before: [END get_current_context] .. _howto/operator:PythonVirtualenvOperator: @@ -240,44 +221,6 @@ In case you have problems during runtime with broken cached virtual environments Note that any modification of a cached virtual environment (like temp files in binary path, post-installing further requirements) might pollute a cached virtual environment and the operator is not maintaining or cleaning the cache path. -Context -^^^^^^^ - -With some limitations, you can also use ``Context`` in virtual environments. - -.. important:: - Using ``Context`` in a virtual environment is a bit of a challenge - because it involves library dependencies and serialization issues. - - You can bypass this to some extent by using :ref:`Jinja template variables <templates:variables>` and explicitly passing it as a parameter. - - You can also use ``get_current_context()`` in the same way as before, but with some limitations. - - * Requires ``apache-airflow>=3.0.0``. - - * Set ``use_airflow_context`` to ``True`` to call ``get_current_context()`` in the virtual environment. - - * Set ``system_site_packages`` to ``True`` or set ``expect_airflow`` to ``True`` - -.. tab-set:: - - .. tab-item:: @task.virtualenv - :sync: taskflow - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_decorator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context_venv] - :end-before: [END get_current_context_venv] - - .. tab-item:: PythonVirtualenvOperator - :sync: operator - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_operator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context_venv] - :end-before: [END get_current_context_venv] .. _howto/operator:ExternalPythonOperator: @@ -347,30 +290,6 @@ Templating Jinja templating can be used in same way as described for the :ref:`howto/operator:PythonOperator`. -Context -^^^^^^^ - -You can use ``Context`` under the same conditions as ``PythonVirtualenvOperator``. - -.. tab-set:: - - .. tab-item:: @task.external_python - :sync: taskflow - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_decorator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context_external] - :end-before: [END get_current_context_external] - - .. tab-item:: ExternalPythonOperator - :sync: operator - - .. exampleinclude:: /../../airflow/example_dags/example_python_context_operator.py - :language: python - :dedent: 4 - :start-after: [START get_current_context_external] - :end-before: [END get_current_context_external] .. _howto/operator:PythonBranchOperator: diff --git a/providers/standard/src/airflow/providers/standard/operators/python.py b/providers/standard/src/airflow/providers/standard/operators/python.py index 7afaaecfaee..6b89398ef77 100644 --- a/providers/standard/src/airflow/providers/standard/operators/python.py +++ b/providers/standard/src/airflow/providers/standard/operators/python.py @@ -418,7 +418,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): skip_on_exit_code: int | Container[int] | None = None, env_vars: dict[str, str] | None = None, inherit_env: bool = True, - use_airflow_context: bool = False, **kwargs, ): if ( @@ -460,7 +459,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): ) self.env_vars = env_vars self.inherit_env = inherit_env - self.use_airflow_context = use_airflow_context @abstractmethod def _iter_serializable_context_keys(self): @@ -519,7 +517,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): "pickling_library": self.serializer, "python_callable": self.python_callable.__name__, "python_callable_source": self.get_python_source(), - "use_airflow_context": self.use_airflow_context, } if inspect.getfile(self.python_callable) == self.dag.fileloc: @@ -530,20 +527,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): filename=os.fspath(script_path), render_template_as_native_obj=self.dag.render_template_as_native_obj, ) - if self.use_airflow_context: - # TODO: replace with commented code when context serialization is implemented in AIP-72 - raise AirflowException( - "The `use_airflow_context=True` is not yet implemented. " - "It will work in Airflow 3 after AIP-72 context " - "serialization is ready." - ) - # context = get_current_context() - # with create_session() as session: - # dag_run, task_instance = context["dag_run"], context["task_instance"] - # session.add_all([dag_run, task_instance]) - # serializable_context: dict[Encoding, Any] = # Get serializable context here - # with airflow_context_path.open("w+") as file: - # json.dump(serializable_context, file) env_vars = dict(os.environ) if self.inherit_env else {} if self.env_vars: @@ -658,8 +641,6 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): environment. If set to ``True``, the virtual environment will inherit the environment variables of the parent process (``os.environ``). If set to ``False``, the virtual environment will be executed with a clean environment. - :param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable. - NOT YET IMPLEMENTED - waits for AIP-72 context serialization. """ template_fields: Sequence[str] = tuple( @@ -687,7 +668,6 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): venv_cache_path: None | os.PathLike[str] = None, env_vars: dict[str, str] | None = None, inherit_env: bool = True, - use_airflow_context: bool = False, **kwargs, ): if ( @@ -704,18 +684,6 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): raise AirflowException( "Passing non-string types (e.g. int or float) as python_version not supported" ) - if use_airflow_context and (not expect_airflow and not system_site_packages): - raise AirflowException( - "The `use_airflow_context` parameter is set to True, but " - "expect_airflow and system_site_packages are set to False." - ) - # TODO: remove when context serialization is implemented in AIP-72 - if use_airflow_context and not AIRFLOW_V_3_0_PLUS: - raise AirflowException( - "The `use_airflow_context=True` is not yet implemented. " - "It will work in Airflow 3 after AIP-72 context " - "serialization is ready." - ) if not requirements: self.requirements: list[str] = [] elif isinstance(requirements, str): @@ -744,7 +712,6 @@ class PythonVirtualenvOperator(_BasePythonVirtualenvOperator): skip_on_exit_code=skip_on_exit_code, env_vars=env_vars, inherit_env=inherit_env, - use_airflow_context=use_airflow_context, **kwargs, ) @@ -960,8 +927,6 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator): environment. If set to ``True``, the virtual environment will inherit the environment variables of the parent process (``os.environ``). If set to ``False``, the virtual environment will be executed with a clean environment. - :param use_airflow_context: Whether to provide ``get_current_context()`` to the python_callable. - NOT YET IMPLEMENTED - waits for AIP-72 context serialization. """ template_fields: Sequence[str] = tuple({"python"}.union(PythonOperator.template_fields)) @@ -982,22 +947,10 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator): skip_on_exit_code: int | Container[int] | None = None, env_vars: dict[str, str] | None = None, inherit_env: bool = True, - use_airflow_context: bool = False, **kwargs, ): if not python: raise ValueError("Python Path must be defined in ExternalPythonOperator") - if use_airflow_context and not expect_airflow: - raise AirflowException( - "The `use_airflow_context` parameter is set to True, but expect_airflow is set to False." - ) - # TODO: remove when context serialization is implemented in AIP-72 - if use_airflow_context: - raise AirflowException( - "The `use_airflow_context=True` is not yet implemented. " - "It will work in Airflow 3 after AIP-72 context " - "serialization is ready." - ) self.python = python self.expect_pendulum = expect_pendulum super().__init__( @@ -1012,7 +965,6 @@ class ExternalPythonOperator(_BasePythonVirtualenvOperator): skip_on_exit_code=skip_on_exit_code, env_vars=env_vars, inherit_env=inherit_env, - use_airflow_context=use_airflow_context, **kwargs, ) diff --git a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 index 6b803b408f2..da5b2cbf9e1 100644 --- a/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 +++ b/providers/standard/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 @@ -64,30 +64,6 @@ with open(sys.argv[3], "r") as file: virtualenv_string_args = list(map(lambda x: x.strip(), list(file))) {% endif %} -{% if use_airflow_context | default(false) -%} -if len(sys.argv) > 5: - import json - from types import ModuleType - - from airflow.providers.standard.operators import python as airflow_python - - - class _MockPython(ModuleType): - @staticmethod - def get_current_context(): - with open(sys.argv[5]) as file: - context = json.load(file) - raise Exception("Not yet implemented") - # TODO: return deserialized context - - def __getattr__(self, name: str): - return getattr(airflow_python, name) - - - MockPython = _MockPython("MockPython") - sys.modules["airflow.providers.standard.operators.python"] = MockPython -{% endif %} - try: res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"]) except Exception as e: diff --git a/providers/standard/tests/provider_tests/standard/operators/test_python.py b/providers/standard/tests/provider_tests/standard/operators/test_python.py index b4320e7a193..8ec569c8d22 100644 --- a/providers/standard/tests/provider_tests/standard/operators/test_python.py +++ b/providers/standard/tests/provider_tests/standard/operators/test_python.py @@ -91,8 +91,6 @@ DILL_MARKER = pytest.mark.skipif(not DILL_INSTALLED, reason="`dill` is not insta CLOUDPICKLE_INSTALLED = find_spec("cloudpickle") is not None CLOUDPICKLE_MARKER = pytest.mark.skipif(not CLOUDPICKLE_INSTALLED, reason="`cloudpickle` is not installed") -AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE = r"The `use_airflow_context=True` is not yet implemented." - class BasePythonTest: """Base test class for TestPythonOperator and TestPythonSensor classes""" @@ -1042,91 +1040,6 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest): task = self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, inherit_env=True) assert task.execute_callable() == "EFGHI" - def test_current_context(self): - def f(): - from airflow.providers.standard.operators.python import get_current_context - - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context - - context = get_current_context() - if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}:{context!r}" - raise TypeError(error_msg) - - return [] - - # TODO: replace with commented code when context serialization is implemented in AIP-72 - with pytest.raises(Exception, match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE): - self.run_as_task(f, return_ti=True, use_airflow_context=True) - - # ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, use_airflow_context=True) - # assert ti.state == TaskInstanceState.SUCCESS - - def test_current_context_not_found_error(self): - def f(): - from airflow.providers.standard.operators.python import get_current_context - - get_current_context() - return [] - - with pytest.raises( - AirflowException, - match="Current context was requested but no context was found! " - "Are you running within an Airflow task?", - ): - self.run_as_task(f, return_ti=True, use_airflow_context=False) - - def test_current_context_airflow_not_found_error(self): - airflow_flag: dict[str, bool] = {"expect_airflow": False} - - error_msg = r"The `use_airflow_context` parameter is set to True, but expect_airflow is set to False." - - if not issubclass(self.opcls, ExternalPythonOperator): - airflow_flag["system_site_packages"] = False - error_msg = ( - r"The `use_airflow_context` parameter is set to True, but " - r"expect_airflow and system_site_packages are set to False." - ) - - def f(): - from airflow.providers.standard.operators.python import get_current_context - - get_current_context() - return [] - - with pytest.raises(AirflowException, match=error_msg): - self.run_as_task( - f, return_ti=True, multiple_outputs=False, use_airflow_context=True, **airflow_flag - ) - - def test_use_airflow_context_touch_other_variables(self): - def f(): - from airflow.providers.standard.operators.python import get_current_context - - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context - - context = get_current_context() - if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}:{context!r}" - raise TypeError(error_msg) - - return [] - - # TODO: replace with commented code when context serialization is implemented in AIP-72 - with pytest.raises(AirflowException, match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE): - self.run_as_task(f, return_ti=True, use_airflow_context=True) - - # ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, use_airflow_context=True) - # assert ti.state == TaskInstanceState.SUCCESS - venv_cache_path = tempfile.mkdtemp(prefix="venv_cache_path") @@ -1486,45 +1399,6 @@ class TestPythonVirtualenvOperator(BaseTestPythonVirtualenvOperator): self.run_as_task(f, serializer=serializer, system_site_packages=False, requirements=None) - def test_current_context_system_site_packages(self, session): - def f(): - from airflow.providers.standard.operators.python import get_current_context - - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context - - context = get_current_context() - if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}:{context!r}" - raise TypeError(error_msg) - - return [] - - # TODO: replace with commented code when context serialization is implemented in AIP-72 - with pytest.raises(AirflowException, match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE): - self.run_as_task( - f, - return_ti=True, - use_airflow_context=True, - session=session, - expect_airflow=False, - system_site_packages=True, - ) - - # ti = self.run_as_task( - # f, - # return_ti=True, - # multiple_outputs=False, - # use_airflow_context=True, - # session=session, - # expect_airflow=False, - # system_site_packages=True, - # ) - # assert ti.state == TaskInstanceState.SUCCESS - # when venv tests are run in parallel to other test they create new processes and this might take # quite some time in shared docker environment and get some contention even between different containers @@ -1854,45 +1728,6 @@ class TestBranchPythonVirtualenvOperator(BaseTestBranchPythonVirtualenvOperator) kwargs["venv_cache_path"] = venv_cache_path return kwargs - def test_current_context_system_site_packages(self, session): - def f(): - from airflow.providers.standard.operators.python import get_current_context - - try: - from airflow.sdk.definitions.context import Context - except ImportError: - # TODO: Remove once provider drops support for Airflow 2 - from airflow.utils.context import Context - - context = get_current_context() - if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}:{context!r}" - raise TypeError(error_msg) - - return [] - - # TODO: replace with commented code when context serialization is implemented in AIP-72 - with pytest.raises(AirflowException, match=AIRFLOW_CONTEXT_NOT_IMPLEMENTED_YET_MESSAGE): - self.run_as_task( - f, - return_ti=True, - use_airflow_context=True, - session=session, - expect_airflow=False, - system_site_packages=True, - ) - - # ti = self.run_as_task( - # f, - # return_ti=True, - # multiple_outputs=False, - # use_airflow_context=True, - # session=session, - # expect_airflow=False, - # system_site_packages=True, - # ) - # assert ti.state == TaskInstanceState.SUCCESS - # when venv tests are run in parallel to other test they create new processes and this might take # quite some time in shared docker environment and get some contention even between different containers