This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 24d95ec2c69 Remove global from plugins_manager (#59851)
24d95ec2c69 is described below

commit 24d95ec2c69ee7ccb9a1e99314469e4057963e54
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun Dec 28 20:38:57 2025 +0100

    Remove global from plugins_manager (#59851)
    
    * Remove global from plaungs_manager
    
    * Further rework and cleanup
    
    * Prevent re-build of Fab assets, change exclusions
    
    * Fixes
    
    * Fixes
    
    * Fixes
    
    * Fixes
---
 airflow-core/src/airflow/api_fastapi/app.py        |  12 +-
 .../src/airflow/api_fastapi/core_api/app.py        |  15 +-
 .../api_fastapi/core_api/routes/public/plugins.py  |   9 +-
 .../src/airflow/cli/commands/plugins_command.py    |  21 +-
 airflow-core/src/airflow/lineage/hook.py           |   3 +-
 airflow-core/src/airflow/plugins_manager.py        | 499 ++++++++-------------
 .../serialization/definitions/baseoperator.py      |   6 +-
 .../serialization/definitions/mappedoperator.py    |  14 +-
 airflow-core/src/airflow/serialization/helpers.py  |   7 +-
 .../airflow/serialization/serialized_objects.py    |  18 +-
 .../core_api/routes/public/test_dag_run.py         |   5 +-
 .../core_api/routes/public/test_plugins.py         |   4 +-
 airflow-core/tests/unit/api_fastapi/test_app.py    |   2 +-
 .../tests/unit/plugins/test_plugins_manager.py     | 101 +++--
 .../unit/serialization/test_dag_serialization.py   |   9 +-
 devel-common/src/tests_common/pytest_plugin.py     |   2 +-
 .../src/tests_common/test_utils/mock_plugins.py    |  82 ++--
 providers/fab/.pre-commit-config.yaml              |   7 +-
 .../src/airflow/providers/fab/version_compat.py    |   1 +
 .../providers/fab/www/extensions/init_views.py     |  18 +-
 providers/fab/www-hash.txt                         |   2 +-
 task-sdk/tests/conftest.py                         |   9 +-
 22 files changed, 366 insertions(+), 480 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/app.py 
b/airflow-core/src/airflow/api_fastapi/app.py
index 3261f073f19..e2b43733024 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 import logging
 from contextlib import AsyncExitStack, asynccontextmanager
 from functools import cache
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING
 from urllib.parse import urlsplit
 
 from fastapi import FastAPI
@@ -31,7 +31,6 @@ from airflow.api_fastapi.core_api.app import (
     init_error_handlers,
     init_flask_plugins,
     init_middlewares,
-    init_ui_plugins,
     init_views,
 )
 from airflow.api_fastapi.execution_api.app import create_task_execution_api_app
@@ -99,7 +98,6 @@ def create_app(apps: str = "all") -> FastAPI:
         init_plugins(app)
         init_auth_manager(app)
         init_flask_plugins(app)
-        init_ui_plugins(app)
         init_views(app)  # Core views need to be the last routes added - it 
has a catch all route
         init_error_handlers(app)
         init_middlewares(app)
@@ -171,10 +169,9 @@ def init_plugins(app: FastAPI) -> None:
     """Integrate FastAPI app, middlewares and UI plugins."""
     from airflow import plugins_manager
 
-    plugins_manager.initialize_fastapi_plugins()
+    apps, root_middlewares = plugins_manager.get_fastapi_plugins()
 
-    # After calling initialize_fastapi_plugins, fastapi_apps cannot be None 
anymore.
-    for subapp_dict in cast("list", plugins_manager.fastapi_apps):
+    for subapp_dict in apps:
         name = subapp_dict.get("name")
         subapp = subapp_dict.get("app")
         if subapp is None:
@@ -194,8 +191,7 @@ def init_plugins(app: FastAPI) -> None:
         log.debug("Adding subapplication %s under prefix %s", name, url_prefix)
         app.mount(url_prefix, subapp)
 
-    # After calling initialize_fastapi_plugins, fastapi_root_middlewares 
cannot be None anymore.
-    for middleware_dict in cast("list", 
plugins_manager.fastapi_root_middlewares):
+    for middleware_dict in root_middlewares:
         name = middleware_dict.get("name")
         middleware = middleware_dict.get("middleware")
         args = middleware_dict.get("args", [])
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/app.py 
b/airflow-core/src/airflow/api_fastapi/core_api/app.py
index 1f370f5844d..8a8f525265a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/app.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/app.py
@@ -113,14 +113,10 @@ def init_flask_plugins(app: FastAPI) -> None:
     """Integrate Flask plugins (plugins from Airflow 2)."""
     from airflow import plugins_manager
 
-    plugins_manager.initialize_flask_plugins()
+    blueprints, appbuilder_views, appbuilder_menu_links = 
plugins_manager.get_flask_plugins()
 
     # If no Airflow 2.x plugin is in the environment, no need to go further
-    if (
-        not plugins_manager.flask_blueprints
-        and not plugins_manager.flask_appbuilder_views
-        and not plugins_manager.flask_appbuilder_menu_links
-    ):
+    if not blueprints and not appbuilder_views and not appbuilder_menu_links:
         return
 
     from fastapi.middleware.wsgi import WSGIMiddleware
@@ -190,10 +186,3 @@ def init_middlewares(app: FastAPI) -> None:
         from airflow.api_fastapi.auth.managers.simple.middleware import 
SimpleAllAdminMiddleware
 
         app.add_middleware(SimpleAllAdminMiddleware)
-
-
-def init_ui_plugins(app: FastAPI) -> None:
-    """Initialize UI plugins."""
-    from airflow import plugins_manager
-
-    plugins_manager.initialize_ui_plugins()
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/plugins.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/plugins.py
index 3b5368f50d2..57c27a99b68 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/plugins.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/plugins.py
@@ -75,13 +75,10 @@ def get_plugins(
     dependencies=[Depends(requires_access_view(AccessView.PLUGINS))],
 )
 def import_errors() -> PluginImportErrorCollectionResponse:
-    plugins_manager.ensure_plugins_loaded()  # make sure import_errors are 
loaded
-
+    import_errors = plugins_manager.get_import_errors()
     return PluginImportErrorCollectionResponse.model_validate(
         {
-            "import_errors": [
-                {"source": source, "error": error} for source, error in 
plugins_manager.import_errors.items()
-            ],
-            "total_entries": len(plugins_manager.import_errors),
+            "import_errors": [{"source": source, "error": error} for source, 
error in import_errors.items()],
+            "total_entries": len(import_errors),
         }
     )
diff --git a/airflow-core/src/airflow/cli/commands/plugins_command.py 
b/airflow-core/src/airflow/cli/commands/plugins_command.py
index 29dd75674af..595ec7aba10 100644
--- a/airflow-core/src/airflow/cli/commands/plugins_command.py
+++ b/airflow-core/src/airflow/cli/commands/plugins_command.py
@@ -16,35 +16,18 @@
 # under the License.
 from __future__ import annotations
 
-import inspect
-from typing import Any
-
-from airflow import plugins_manager
 from airflow.cli.simple_table import AirflowConsole
-from airflow.plugins_manager import PluginsDirectorySource, get_plugin_info
+from airflow.plugins_manager import get_plugin_info
 from airflow.utils.cli import suppress_logs_and_warning
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
-def _get_name(class_like_object) -> str:
-    if isinstance(class_like_object, (str, PluginsDirectorySource)):
-        return str(class_like_object)
-    if inspect.isclass(class_like_object):
-        return class_like_object.__name__
-    return class_like_object.__class__.__name__
-
-
-def _join_plugins_names(value: list[Any] | Any) -> str:
-    value = value if isinstance(value, list) else [value]
-    return ",".join(_get_name(v) for v in value)
-
-
 @suppress_logs_and_warning
 @providers_configuration_loaded
 def dump_plugins(args):
     """Dump plugins information."""
     plugins_info: list[dict[str, str]] = get_plugin_info()
-    if not plugins_manager.plugins:
+    if not plugins_info:
         print("No plugins loaded")
         return
 
diff --git a/airflow-core/src/airflow/lineage/hook.py 
b/airflow-core/src/airflow/lineage/hook.py
index 41c0e2bd4bc..7c22a367006 100644
--- a/airflow-core/src/airflow/lineage/hook.py
+++ b/airflow-core/src/airflow/lineage/hook.py
@@ -337,7 +337,6 @@ def get_hook_lineage_collector() -> HookLineageCollector:
     """Get singleton lineage collector."""
     from airflow import plugins_manager
 
-    plugins_manager.initialize_hook_lineage_readers_plugins()
-    if plugins_manager.hook_lineage_reader_classes:
+    if plugins_manager.get_hook_lineage_readers_plugins():
         return HookLineageCollector()
     return NoOpCollector()
diff --git a/airflow-core/src/airflow/plugins_manager.py 
b/airflow-core/src/airflow/plugins_manager.py
index c32bcae8ea5..8fdb3f06356 100644
--- a/airflow-core/src/airflow/plugins_manager.py
+++ b/airflow-core/src/airflow/plugins_manager.py
@@ -27,6 +27,7 @@ import os
 import sys
 import types
 from collections.abc import Iterable
+from functools import cache
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
@@ -43,10 +44,10 @@ from airflow.utils.file import find_path_from_directory
 if TYPE_CHECKING:
     from airflow.lineage.hook import HookLineageReader
 
-    try:
+    if sys.version_info >= (3, 12):
+        from importlib import metadata
+    else:
         import importlib_metadata as metadata
-    except ImportError:
-        from importlib import metadata  # type: ignore[no-redef]
     from collections.abc import Generator
     from types import ModuleType
 
@@ -55,55 +56,6 @@ if TYPE_CHECKING:
 
 log = logging.getLogger(__name__)
 
-import_errors: dict[str, str] = {}
-
-plugins: list[AirflowPlugin] | None = None
-loaded_plugins: set[str] = set()
-
-# Plugin components to integrate as modules
-macros_modules: list[Any] | None = None
-
-# Plugin components to integrate directly
-admin_views: list[Any] | None = None
-flask_blueprints: list[Any] | None = None
-fastapi_apps: list[Any] | None = None
-fastapi_root_middlewares: list[Any] | None = None
-external_views: list[Any] | None = None
-react_apps: list[Any] | None = None
-menu_links: list[Any] | None = None
-flask_appbuilder_views: list[Any] | None = None
-flask_appbuilder_menu_links: list[Any] | None = None
-global_operator_extra_links: list[Any] | None = None
-operator_extra_links: list[Any] | None = None
-registered_operator_link_classes: dict[str, type] | None = None
-timetable_classes: dict[str, type[Timetable]] | None = None
-hook_lineage_reader_classes: list[type[HookLineageReader]] | None = None
-priority_weight_strategy_classes: dict[str, type[PriorityWeightStrategy]] | 
None = None
-"""
-Mapping of class names to class of OperatorLinks registered by plugins.
-
-Used by the DAG serialization code to only allow specific classes to be created
-during deserialization
-"""
-PLUGINS_ATTRIBUTES_TO_DUMP = {
-    "macros",
-    "admin_views",
-    "flask_blueprints",
-    "fastapi_apps",
-    "fastapi_root_middlewares",
-    "external_views",
-    "react_apps",
-    "menu_links",
-    "appbuilder_views",
-    "appbuilder_menu_items",
-    "global_operator_extra_links",
-    "operator_extra_links",
-    "source",
-    "timetables",
-    "listeners",
-    "priority_weight_strategies",
-}
-
 
 class AirflowPluginSource:
     """Class used to define an AirflowPluginSource."""
@@ -205,7 +157,7 @@ class AirflowPlugin:
         """
 
 
-def is_valid_plugin(plugin_obj):
+def is_valid_plugin(plugin_obj) -> bool:
     """
     Check whether a potential object is a subclass of the AirflowPlugin class.
 
@@ -219,27 +171,11 @@ def is_valid_plugin(plugin_obj):
         and (plugin_obj is not AirflowPlugin)
     ):
         plugin_obj.validate()
-        return plugin_obj not in plugins
+        return True
     return False
 
 
-def register_plugin(plugin_instance):
-    """
-    Start plugin load and register it after success initialization.
-
-    If plugin is already registered, do nothing.
-
-    :param plugin_instance: subclass of AirflowPlugin
-    """
-    if plugin_instance.name in loaded_plugins:
-        return
-
-    loaded_plugins.add(plugin_instance.name)
-    plugin_instance.on_load()
-    plugins.append(plugin_instance)
-
-
-def load_entrypoint_plugins():
+def _load_entrypoint_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
     """
     Load and register plugins AirflowPlugin subclasses from the entrypoints.
 
@@ -247,6 +183,8 @@ def load_entrypoint_plugins():
     """
     log.debug("Loading plugins from entrypoints")
 
+    plugins: list[AirflowPlugin] = []
+    import_errors: dict[str, str] = {}
     for entry_point, dist in entry_points_with_dist("airflow.plugins"):
         log.debug("Importing entry_point plugin %s", entry_point.name)
         try:
@@ -254,28 +192,33 @@ def load_entrypoint_plugins():
             if not is_valid_plugin(plugin_class):
                 continue
 
-            plugin_instance = plugin_class()
+            plugin_instance: AirflowPlugin = plugin_class()
             plugin_instance.source = EntryPointSource(entry_point, dist)
-            register_plugin(plugin_instance)
+            plugins.append(plugin_instance)
         except Exception as e:
             log.exception("Failed to import plugin %s", entry_point.name)
             import_errors[entry_point.module] = str(e)
+    return plugins, import_errors
 
 
-def load_plugins_from_plugin_directory():
+def _load_plugins_from_plugin_directory() -> tuple[list[AirflowPlugin], 
dict[str, str]]:
     """Load and register Airflow Plugins from plugins directory."""
+    if settings.PLUGINS_FOLDER is None:
+        raise ValueError("Plugins folder is not set")
     log.debug("Loading plugins from directory: %s", settings.PLUGINS_FOLDER)
     files = find_path_from_directory(settings.PLUGINS_FOLDER, ".airflowignore")
     plugin_search_locations: list[tuple[str, Generator[str, None, None]]] = 
[("", files)]
 
     if conf.getboolean("core", "LOAD_EXAMPLES"):
         log.debug("Note: Loading plugins from examples as well: %s", 
settings.PLUGINS_FOLDER)
-        from airflow.example_dags import plugins
+        from airflow.example_dags import plugins as example_plugins
 
-        example_plugins_folder = next(iter(plugins.__path__))
+        example_plugins_folder = next(iter(example_plugins.__path__))
         example_files = find_path_from_directory(example_plugins_folder, 
".airflowignore")
-        plugin_search_locations.append((plugins.__name__, example_files))
+        plugin_search_locations.append((example_plugins.__name__, 
example_files))
 
+    plugins: list[AirflowPlugin] = []
+    import_errors: dict[str, str] = {}
     for module_prefix, plugin_files in plugin_search_locations:
         for file_path in plugin_files:
             path = Path(file_path)
@@ -286,39 +229,47 @@ def load_plugins_from_plugin_directory():
             try:
                 loader = importlib.machinery.SourceFileLoader(mod_name, 
file_path)
                 spec = importlib.util.spec_from_loader(mod_name, loader)
+                if not spec:
+                    log.error("Could not load spec for module %s at %s", 
mod_name, file_path)
+                    continue
                 mod = importlib.util.module_from_spec(spec)
                 sys.modules[spec.name] = mod
                 loader.exec_module(mod)
 
                 for mod_attr_value in (m for m in mod.__dict__.values() if 
is_valid_plugin(m)):
-                    plugin_instance = mod_attr_value()
+                    plugin_instance: AirflowPlugin = mod_attr_value()
                     plugin_instance.source = PluginsDirectorySource(file_path)
-                    register_plugin(plugin_instance)
+                    plugins.append(plugin_instance)
             except Exception as e:
                 log.exception("Failed to import plugin %s", file_path)
                 import_errors[file_path] = str(e)
+    return plugins, import_errors
 
 
-def load_providers_plugins():
+def _load_providers_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
     from airflow.providers_manager import ProvidersManager
 
     log.debug("Loading plugins from providers")
     providers_manager = ProvidersManager()
     providers_manager.initialize_providers_plugins()
+
+    plugins: list[AirflowPlugin] = []
+    import_errors: dict[str, str] = {}
     for plugin in providers_manager.plugins:
         log.debug("Importing plugin %s from class %s", plugin.name, 
plugin.plugin_class)
 
         try:
             plugin_instance = import_string(plugin.plugin_class)
             if is_valid_plugin(plugin_instance):
-                register_plugin(plugin_instance)
+                plugins.append(plugin_instance)
             else:
                 log.warning("Plugin %s is not a valid plugin", plugin.name)
         except ImportError:
             log.exception("Failed to load plugin %s from class name %s", 
plugin.name, plugin.plugin_class)
+    return plugins, import_errors
 
 
-def make_module(name: str, objects: list[Any]):
+def make_module(name: str, objects: list[Any]) -> ModuleType | None:
     """Create new module."""
     if not objects:
         return None
@@ -331,64 +282,69 @@ def make_module(name: str, objects: list[Any]):
     return module
 
 
-def ensure_plugins_loaded():
+def ensure_plugins_loaded() -> None:
     """
     Load plugins from plugins directory and entrypoints.
 
     Plugins are only loaded if they have not been previously loaded.
     """
-    from airflow.observability.stats import Stats
+    _get_plugins()
 
-    global plugins
 
-    if plugins is not None:
-        log.debug("Plugins are already loaded. Skipping.")
-        return
+@cache
+def _get_plugins() -> tuple[list[AirflowPlugin], dict[str, str]]:
+    """
+    Load plugins from plugins directory and entrypoints.
+
+    Plugins are only loaded if they have not been previously loaded.
+    """
+    from airflow.observability.stats import Stats
 
     if not settings.PLUGINS_FOLDER:
         raise ValueError("Plugins folder is not set")
 
     log.debug("Loading plugins")
 
-    with Stats.timer() as timer:
-        plugins = []
+    plugins: list[AirflowPlugin] = []
+    import_errors: dict[str, str] = {}
+    loaded_plugins: set[str | None] = set()
 
-        load_plugins_from_plugin_directory()
-        load_entrypoint_plugins()
-
-        if not settings.LAZY_LOAD_PROVIDERS:
-            load_providers_plugins()
-
-    if plugins:
-        log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), 
timer.duration)
+    def __register_plugins(plugin_instances: list[AirflowPlugin], errors: 
dict[str, str]) -> None:
+        for plugin_instance in plugin_instances:
+            if plugin_instance.name in loaded_plugins:
+                return
 
+            loaded_plugins.add(plugin_instance.name)
+            try:
+                plugin_instance.on_load()
+                plugins.append(plugin_instance)
+            except Exception as e:
+                log.exception("Failed to load plugin %s", plugin_instance.name)
+                name = str(plugin_instance.source) if plugin_instance.source 
else plugin_instance.name or ""
+                import_errors[name] = str(e)
+        import_errors.update(errors)
 
-def initialize_ui_plugins():
-    """Collect extension points for the UI."""
-    global external_views
-    global react_apps
+    with Stats.timer() as timer:
+        __register_plugins(*_load_plugins_from_plugin_directory())
+        __register_plugins(*_load_entrypoint_plugins())
 
-    if external_views is not None and react_apps is not None:
-        return
+        if not settings.LAZY_LOAD_PROVIDERS:
+            __register_plugins(*_load_providers_plugins())
 
-    ensure_plugins_loaded()
+    log.debug("Loading %d plugin(s) took %.2f seconds", len(plugins), 
timer.duration)
+    return plugins, import_errors
 
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
 
+@cache
+def _get_ui_plugins() -> tuple[list[Any], list[Any]]:
+    """Collect extension points for the UI."""
     log.debug("Initialize UI plugin")
 
-    seen_url_route = {}
-    external_views = []
-    react_apps = []
-
-    def _remove_list_item(lst, item):
-        # Mutate in place the plugin's external views and react apps list to 
remove the invalid items
-        # because some function still access these plugin's attribute and not 
the
-        # global variables `external_views` `react_apps`. (get_plugin_info, 
for example)
-        lst.remove(item)
+    seen_url_routes: dict[str, str | None] = {}
 
-    for plugin in plugins:
+    external_views: list[Any] = []
+    react_apps: list[Any] = []
+    for plugin in _get_plugins()[0]:
         external_views_to_remove = []
         react_apps_to_remove = []
         for external_view in plugin.external_views:
@@ -402,18 +358,18 @@ def initialize_ui_plugins():
             url_route = external_view.get("url_route")
             if url_route is None:
                 continue
-            if url_route in seen_url_route:
+            if url_route in seen_url_routes:
                 log.warning(
                     "Plugin '%s' has an external view with an URL route '%s' "
                     "that conflicts with another plugin '%s'. The view will 
not be loaded.",
                     plugin.name,
                     url_route,
-                    seen_url_route[url_route],
+                    seen_url_routes[url_route],
                 )
                 external_views_to_remove.append(external_view)
                 continue
             external_views.append(external_view)
-            seen_url_route[url_route] = plugin.name
+            seen_url_routes[url_route] = plugin.name
 
         for react_app in plugin.react_apps:
             if not isinstance(react_app, dict):
@@ -426,50 +382,35 @@ def initialize_ui_plugins():
             url_route = react_app.get("url_route")
             if url_route is None:
                 continue
-            if url_route in seen_url_route:
+            if url_route in seen_url_routes:
                 log.warning(
                     "Plugin '%s' has a React App with an URL route '%s' "
                     "that conflicts with another plugin '%s'. The React App 
will not be loaded.",
                     plugin.name,
                     url_route,
-                    seen_url_route[url_route],
+                    seen_url_routes[url_route],
                 )
                 react_apps_to_remove.append(react_app)
                 continue
             react_apps.append(react_app)
-            seen_url_route[url_route] = plugin.name
+            seen_url_routes[url_route] = plugin.name
 
         for item in external_views_to_remove:
-            _remove_list_item(plugin.external_views, item)
+            plugin.external_views.remove(item)
         for item in react_apps_to_remove:
-            _remove_list_item(plugin.react_apps, item)
-
+            plugin.react_apps.remove(item)
+    return external_views, react_apps
 
-def initialize_flask_plugins():
-    """Collect flask extension points for WEB UI (legacy)."""
-    global flask_blueprints
-    global flask_appbuilder_views
-    global flask_appbuilder_menu_links
-
-    if (
-        flask_blueprints is not None
-        and flask_appbuilder_views is not None
-        and flask_appbuilder_menu_links is not None
-    ):
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
 
+@cache
+def get_flask_plugins() -> tuple[list[Any], list[Any], list[Any]]:
+    """Collect and get flask extension points for WEB UI (legacy)."""
     log.debug("Initialize legacy Web UI plugin")
 
-    flask_blueprints = []
-    flask_appbuilder_views = []
-    flask_appbuilder_menu_links = []
-
-    for plugin in plugins:
+    flask_appbuilder_views: list[Any] = []
+    flask_appbuilder_menu_links: list[Any] = []
+    flask_blueprints: list[Any] = []
+    for plugin in _get_plugins()[0]:
         flask_appbuilder_views.extend(plugin.appbuilder_views)
         flask_appbuilder_menu_links.extend(plugin.appbuilder_menu_items)
         flask_blueprints.extend([{"name": plugin.name, "blueprint": bp} for bp 
in plugin.flask_blueprints])
@@ -482,130 +423,82 @@ def initialize_flask_plugins():
                 "Please contact the author of the plugin.",
                 plugin.name,
             )
+    return flask_blueprints, flask_appbuilder_views, 
flask_appbuilder_menu_links
 
 
-def initialize_fastapi_plugins():
+@cache
+def get_fastapi_plugins() -> tuple[list[Any], list[Any]]:
     """Collect extension points for the API."""
-    global fastapi_apps
-    global fastapi_root_middlewares
-
-    if fastapi_apps is not None and fastapi_root_middlewares is not None:
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
     log.debug("Initialize FastAPI plugins")
 
-    fastapi_apps = []
-    fastapi_root_middlewares = []
-
-    for plugin in plugins:
+    fastapi_apps: list[Any] = []
+    fastapi_root_middlewares: list[Any] = []
+    for plugin in _get_plugins()[0]:
         fastapi_apps.extend(plugin.fastapi_apps)
         fastapi_root_middlewares.extend(plugin.fastapi_root_middlewares)
+    return fastapi_apps, fastapi_root_middlewares
 
 
-def initialize_extra_operators_links_plugins():
-    """Create modules for loaded extension from extra operators links 
plugins."""
-    global global_operator_extra_links
-    global operator_extra_links
-    global registered_operator_link_classes
-
-    if (
-        global_operator_extra_links is not None
-        and operator_extra_links is not None
-        and registered_operator_link_classes is not None
-    ):
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
+@cache
+def _get_extra_operators_links_plugins() -> tuple[list[Any], list[Any]]:
+    """Create and get modules for loaded extension from extra operators links 
plugins."""
     log.debug("Initialize extra operators links plugins")
 
-    global_operator_extra_links = []
-    operator_extra_links = []
-    registered_operator_link_classes = {}
-
-    for plugin in plugins:
+    global_operator_extra_links: list[Any] = []
+    operator_extra_links: list[Any] = []
+    for plugin in _get_plugins()[0]:
         global_operator_extra_links.extend(plugin.global_operator_extra_links)
         operator_extra_links.extend(list(plugin.operator_extra_links))
+    return global_operator_extra_links, operator_extra_links
 
-        registered_operator_link_classes.update(
-            {qualname(link.__class__): link.__class__ for link in 
plugin.operator_extra_links}
-        )
 
+def get_global_operator_extra_links() -> list[Any]:
+    """Get global operator extra links registered by plugins."""
+    return _get_extra_operators_links_plugins()[0]
 
-def initialize_timetables_plugins():
-    """Collect timetable classes registered by plugins."""
-    global timetable_classes
 
-    if timetable_classes is not None:
-        return
+def get_operator_extra_links() -> list[Any]:
+    """Get operator extra links registered by plugins."""
+    return _get_extra_operators_links_plugins()[1]
 
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
 
+@cache
+def get_timetables_plugins() -> dict[str, type[Timetable]]:
+    """Collect and get timetable classes registered by plugins."""
     log.debug("Initialize extra timetables plugins")
 
-    timetable_classes = {
+    return {
         qualname(timetable_class): timetable_class
-        for plugin in plugins
+        for plugin in _get_plugins()[0]
         for timetable_class in plugin.timetables
     }
 
 
-def initialize_hook_lineage_readers_plugins():
-    """Collect hook lineage reader classes registered by plugins."""
-    global hook_lineage_reader_classes
-
-    if hook_lineage_reader_classes is not None:
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
+@cache
+def get_hook_lineage_readers_plugins() -> list[type[HookLineageReader]]:
+    """Collect and get hook lineage reader classes registered by plugins."""
     log.debug("Initialize hook lineage readers plugins")
+    result: list[type[HookLineageReader]] = []
 
-    hook_lineage_reader_classes = []
-    for plugin in plugins:
-        hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
+    for plugin in _get_plugins()[0]:
+        result.extend(plugin.hook_lineage_readers)
+    return result
 
 
+@cache
 def integrate_macros_plugins() -> None:
     """Integrates macro plugins."""
-    global macros_modules
-
     from airflow.sdk.execution_time import macros
 
-    if macros_modules is not None:
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
     log.debug("Integrate Macros plugins")
 
-    macros_modules = []
-
-    for plugin in plugins:
+    for plugin in _get_plugins()[0]:
         if plugin.name is None:
             raise AirflowPluginException("Invalid plugin name")
 
         macros_module = 
make_module(f"airflow.sdk.execution_time.macros.{plugin.name}", plugin.macros)
 
         if macros_module:
-            macros_modules.append(macros_module)
             sys.modules[macros_module.__name__] = macros_module
             # Register the newly created module on airflow.macros such that it
             # can be accessed when rendering templates.
@@ -614,15 +507,12 @@ def integrate_macros_plugins() -> None:
 
 def integrate_listener_plugins(listener_manager: ListenerManager) -> None:
     """Add listeners from plugins."""
-    ensure_plugins_loaded()
-
-    if plugins:
-        for plugin in plugins:
-            if plugin.name is None:
-                raise AirflowPluginException("Invalid plugin name")
+    for plugin in _get_plugins()[0]:
+        if plugin.name is None:
+            raise AirflowPluginException("Invalid plugin name")
 
-            for listener in plugin.listeners:
-                listener_manager.add_listener(listener)
+        for listener in plugin.listeners:
+            listener_manager.add_listener(listener)
 
 
 def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> 
list[dict[str, Any]]:
@@ -631,79 +521,88 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = 
None) -> list[dict[str
 
     :param attrs_to_dump: A list of plugin attributes to dump
     """
-    ensure_plugins_loaded()
-    integrate_macros_plugins()
-    initialize_flask_plugins()
-    initialize_fastapi_plugins()
-    initialize_ui_plugins()
-    initialize_extra_operators_links_plugins()
+    get_flask_plugins()
+    get_fastapi_plugins()
+    get_global_operator_extra_links()
+    get_operator_extra_links()
+    _get_ui_plugins()
     if not attrs_to_dump:
-        attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
+        attrs_to_dump = {
+            "macros",
+            "admin_views",
+            "flask_blueprints",
+            "fastapi_apps",
+            "fastapi_root_middlewares",
+            "external_views",
+            "react_apps",
+            "menu_links",
+            "appbuilder_views",
+            "appbuilder_menu_items",
+            "global_operator_extra_links",
+            "operator_extra_links",
+            "source",
+            "timetables",
+            "listeners",
+            "priority_weight_strategies",
+        }
     plugins_info = []
-    if plugins:
-        for plugin in plugins:
-            info: dict[str, Any] = {"name": plugin.name}
-            for attr in attrs_to_dump:
-                if attr in ("global_operator_extra_links", 
"operator_extra_links"):
-                    info[attr] = [f"<{qualname(d.__class__)} object>" for d in 
getattr(plugin, attr)]
-                elif attr in ("macros", "timetables", 
"priority_weight_strategies"):
-                    info[attr] = [qualname(d) for d in getattr(plugin, attr)]
-                elif attr == "listeners":
-                    # listeners may be modules or class instances
-                    info[attr] = [
-                        d.__name__ if inspect.ismodule(d) else qualname(d) for 
d in getattr(plugin, attr)
-                    ]
-                elif attr == "appbuilder_views":
-                    info[attr] = [
-                        {**d, "view": qualname(d["view"].__class__) if "view" 
in d else None}
-                        for d in getattr(plugin, attr)
-                    ]
-                elif attr == "flask_blueprints":
-                    info[attr] = [
-                        f"<{qualname(d.__class__)}: name={d.name!r} 
import_name={d.import_name!r}>"
-                        for d in getattr(plugin, attr)
-                    ]
-                elif attr == "fastapi_apps":
-                    info[attr] = [
-                        {**d, "app": qualname(d["app"].__class__) if "app" in 
d else None}
-                        for d in getattr(plugin, attr)
-                    ]
-                elif attr == "fastapi_root_middlewares":
-                    # remove args and kwargs from plugin info to hide 
potentially sensitive info.
-                    info[attr] = [
-                        {
-                            k: (v if k != "middleware" else 
qualname(middleware_dict["middleware"]))
-                            for k, v in middleware_dict.items()
-                            if k not in ("args", "kwargs")
-                        }
-                        for middleware_dict in getattr(plugin, attr)
-                    ]
-                else:
-                    info[attr] = getattr(plugin, attr)
-            plugins_info.append(info)
+    for plugin in _get_plugins()[0]:
+        info: dict[str, Any] = {"name": plugin.name}
+        for attr in attrs_to_dump:
+            if attr in ("global_operator_extra_links", "operator_extra_links"):
+                info[attr] = [f"<{qualname(d.__class__)} object>" for d in 
getattr(plugin, attr)]
+            elif attr in ("macros", "timetables", 
"priority_weight_strategies"):
+                info[attr] = [qualname(d) for d in getattr(plugin, attr)]
+            elif attr == "listeners":
+                # listeners may be modules or class instances
+                info[attr] = [d.__name__ if inspect.ismodule(d) else 
qualname(d) for d in plugin.listeners]
+            elif attr == "appbuilder_views":
+                info[attr] = [
+                    {**d, "view": qualname(d["view"].__class__) if "view" in d 
else None}
+                    for d in plugin.appbuilder_views
+                ]
+            elif attr == "flask_blueprints":
+                info[attr] = [
+                    f"<{qualname(d.__class__)}: name={d.name!r} 
import_name={d.import_name!r}>"
+                    for d in plugin.flask_blueprints
+                ]
+            elif attr == "fastapi_apps":
+                info[attr] = [
+                    {**d, "app": qualname(d["app"].__class__) if "app" in d 
else None}
+                    for d in plugin.fastapi_apps
+                ]
+            elif attr == "fastapi_root_middlewares":
+                # remove args and kwargs from plugin info to hide potentially 
sensitive info.
+                info[attr] = [
+                    {
+                        k: (v if k != "middleware" else 
qualname(middleware_dict["middleware"]))
+                        for k, v in middleware_dict.items()
+                        if k not in ("args", "kwargs")
+                    }
+                    for middleware_dict in plugin.fastapi_root_middlewares
+                ]
+            else:
+                info[attr] = getattr(plugin, attr)
+        plugins_info.append(info)
     return plugins_info
 
 
-def initialize_priority_weight_strategy_plugins():
-    """Collect priority weight strategy classes registered by plugins."""
-    global priority_weight_strategy_classes
-
-    if priority_weight_strategy_classes is not None:
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
+@cache
+def get_priority_weight_strategy_plugins() -> dict[str, 
type[PriorityWeightStrategy]]:
+    """Collect and get priority weight strategy classes registered by 
plugins."""
     log.debug("Initialize extra priority weight strategy plugins")
 
     plugins_priority_weight_strategy_classes = {
         qualname(priority_weight_strategy_class): 
priority_weight_strategy_class
-        for plugin in plugins
+        for plugin in _get_plugins()[0]
         for priority_weight_strategy_class in plugin.priority_weight_strategies
     }
-    priority_weight_strategy_classes = {
+    return {
         **airflow_priority_weight_strategies,
         **plugins_priority_weight_strategy_classes,
     }
+
+
+def get_import_errors() -> dict[str, str]:
+    """Get import errors encountered during plugin loading."""
+    return _get_plugins()[1]
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py 
b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 13ea87ae47c..20cac69435f 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -24,7 +24,6 @@ from typing import TYPE_CHECKING, Any
 
 import methodtools
 
-from airflow.exceptions import AirflowException
 from airflow.serialization.definitions.node import DAGNode
 from airflow.serialization.definitions.param import SerializedParamsDict
 from airflow.serialization.enums import DagAttributeTypes
@@ -262,10 +261,7 @@ class SerializedBaseOperator(DAGNode):
         """All global extra links."""
         from airflow import plugins_manager
 
-        plugins_manager.initialize_extra_operators_links_plugins()
-        if plugins_manager.global_operator_extra_links is None:
-            raise AirflowException("Can't load operators")
-        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
+        return {link.name: link for link in 
plugins_manager.get_global_operator_extra_links()}
 
     @functools.cached_property
     def extra_links(self) -> list[str]:
diff --git 
a/airflow-core/src/airflow/serialization/definitions/mappedoperator.py 
b/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
index 561a23f0b9a..1cf6d357e65 100644
--- a/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/mappedoperator.py
@@ -28,7 +28,7 @@ import methodtools
 import structlog
 from sqlalchemy.orm import Session
 
-from airflow.exceptions import AirflowException, NotMapped
+from airflow.exceptions import NotMapped
 from airflow.sdk import BaseOperator as TaskSDKBaseOperator
 from airflow.sdk.definitions.mappedoperator import MappedOperator as 
TaskSDKMappedOperator
 from airflow.serialization.definitions.baseoperator import 
DEFAULT_OPERATOR_DEPS, SerializedBaseOperator
@@ -372,11 +372,9 @@ class SerializedMappedOperator(DAGNode):
         op_extra_links_from_plugin: dict[str, Any] = {}
         from airflow import plugins_manager
 
-        plugins_manager.initialize_extra_operators_links_plugins()
-        if plugins_manager.operator_extra_links is None:
-            raise AirflowException("Can't load operators")
+        operator_extra_links = plugins_manager.get_operator_extra_links()
         operator_class_type = self.operator_class["task_type"]  # type: ignore
-        for ope in plugins_manager.operator_extra_links:
+        for ope in operator_extra_links:
             if ope.operators and any(operator_class_type in cls.__name__ for 
cls in ope.operators):
                 op_extra_links_from_plugin.update({ope.name: ope})
 
@@ -391,10 +389,8 @@ class SerializedMappedOperator(DAGNode):
         """Returns dictionary of all global extra links."""
         from airflow import plugins_manager
 
-        plugins_manager.initialize_extra_operators_links_plugins()
-        if plugins_manager.global_operator_extra_links is None:
-            raise AirflowException("Can't load operators")
-        return {link.name: link for link in 
plugins_manager.global_operator_extra_links}
+        global_operator_extra_links = 
plugins_manager.get_global_operator_extra_links()
+        return {link.name: link for link in global_operator_extra_links}
 
     @functools.cached_property
     def extra_links(self) -> list[str]:
diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index 3af7f3c07e7..723c113709a 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -116,10 +116,9 @@ def find_registered_custom_timetable(importable_string: 
str) -> type[CoreTimetab
     """Find a user-defined custom timetable class registered via a plugin."""
     from airflow import plugins_manager
 
-    plugins_manager.initialize_timetables_plugins()
-    if plugins_manager.timetable_classes is not None:
-        with contextlib.suppress(KeyError):
-            return plugins_manager.timetable_classes[importable_string]
+    timetable_classes = plugins_manager.get_timetables_plugins()
+    with contextlib.suppress(KeyError):
+        return timetable_classes[importable_string]
     raise TimetableNotRegistered(importable_string)
 
 
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 170879c5339..53b6377b96e 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -131,11 +131,7 @@ def _get_registered_priority_weight_strategy(
 
     if importable_string in airflow_priority_weight_strategies:
         return airflow_priority_weight_strategies[importable_string]
-    plugins_manager.initialize_priority_weight_strategy_plugins()
-    if plugins_manager.priority_weight_strategy_classes:
-        return 
plugins_manager.priority_weight_strategy_classes.get(importable_string)
-    else:
-        return None
+    return 
plugins_manager.get_priority_weight_strategy_plugins().get(importable_string)
 
 
 class _PartitionMapperNotFound(ValueError):
@@ -1159,12 +1155,7 @@ class OperatorSerialization(DAGNode, BaseSerialization):
         if cls._load_operator_extra_links:
             from airflow import plugins_manager
 
-            plugins_manager.initialize_extra_operators_links_plugins()
-
-            if plugins_manager.operator_extra_links is None:
-                raise AirflowException("Can not load plugins")
-
-            for ope in plugins_manager.operator_extra_links:
+            for ope in plugins_manager.get_operator_extra_links():
                 for operator in ope.operators:
                     if (
                         operator.__name__ == encoded_op["task_type"]
@@ -1545,10 +1536,7 @@ class OperatorSerialization(DAGNode, BaseSerialization):
         """
         from airflow import plugins_manager
 
-        plugins_manager.initialize_extra_operators_links_plugins()
-
-        if plugins_manager.registered_operator_link_classes is None:
-            raise AirflowException("Can't load plugins")
+        plugins_manager.get_operator_extra_links()
         op_predefined_extra_links = {}
 
         for name, xcom_key in encoded_op_links.items():
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 597c1f81987..3720994e04d 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -82,11 +82,10 @@ def custom_timetable_plugin(monkeypatch):
     timetable_class_name = qualname(CustomTimetable)
     existing_timetables = getattr(plugins_manager, "timetable_classes", None) 
or {}
 
-    monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins", 
lambda: None)
     monkeypatch.setattr(
         plugins_manager,
-        "timetable_classes",
-        {**existing_timetables, timetable_class_name: CustomTimetable},
+        "get_timetables_plugins",
+        lambda: {**existing_timetables, timetable_class_name: CustomTimetable},
     )
 
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_plugins.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_plugins.py
index 348b8d9526e..4220837ebb1 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_plugins.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_plugins.py
@@ -162,8 +162,8 @@ class TestGetPlugins:
 @skip_if_force_lowest_dependencies_marker
 class TestGetPluginImportErrors:
     @patch(
-        "airflow.plugins_manager.import_errors",
-        new={"plugins/test_plugin.py": "something went wrong"},
+        "airflow.plugins_manager.get_import_errors",
+        new=lambda: {"plugins/test_plugin.py": "something went wrong"},
     )
     def test_should_respond_200(self, test_client, session):
         with assert_queries_count(2):
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py 
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 1e0925f8e73..1eb692e1864 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -113,7 +113,7 @@ def test_catch_all_route_last(client):
 )
 def test_plugin_with_invalid_url_prefix(caplog, fastapi_apps, 
expected_message, invalid_path):
     app = FastAPI()
-    with mock.patch.object(plugins_manager, "fastapi_apps", fastapi_apps):
+    with mock.patch.object(plugins_manager, "get_fastapi_plugins", 
return_value=(fastapi_apps, [])):
         app_module.init_plugins(app)
 
     assert any(expected_message in rec.message for rec in caplog.records)
diff --git a/airflow-core/tests/unit/plugins/test_plugins_manager.py 
b/airflow-core/tests/unit/plugins/test_plugins_manager.py
index c463f6a6273..2b872f23c0b 100644
--- a/airflow-core/tests/unit/plugins/test_plugins_manager.py
+++ b/airflow-core/tests/unit/plugins/test_plugins_manager.py
@@ -75,8 +75,7 @@ class TestPluginsManager:
     def clean_plugins(self):
         from airflow import plugins_manager
 
-        plugins_manager.loaded_plugins = set()
-        plugins_manager.plugins = []
+        plugins_manager._get_plugins.cache_clear()
 
     def test_no_log_when_no_plugins(self, caplog):
         with mock_plugin_manager(plugins=[]):
@@ -89,33 +88,37 @@ class TestPluginsManager:
     def test_loads_filesystem_plugins(self, caplog):
         from airflow import plugins_manager
 
-        with mock.patch("airflow.plugins_manager.plugins", []):
-            plugins_manager.load_plugins_from_plugin_directory()
+        plugins, import_errors = 
plugins_manager._load_plugins_from_plugin_directory()
 
-            assert len(plugins_manager.plugins) == 10
-            for plugin in plugins_manager.plugins:
-                if "AirflowTestOnLoadPlugin" in str(plugin):
-                    assert plugin.name == "postload"
-                    break
-            else:
-                pytest.fail("Wasn't able to find a registered 
`AirflowTestOnLoadPlugin`")
+        assert len(plugins) == 10
+        assert not import_errors
+        for plugin in plugins:
+            if "AirflowTestOnLoadPlugin" in str(plugin):
+                assert plugin.name == "preload"  # on_init() is not called here
+                break
+        else:
+            pytest.fail("Wasn't able to find a registered 
`AirflowTestOnLoadPlugin`")
 
-            assert caplog.record_tuples == []
+        assert caplog.record_tuples == []
 
     def test_loads_filesystem_plugins_exception(self, caplog, tmp_path):
         from airflow import plugins_manager
 
-        with mock.patch("airflow.plugins_manager.plugins", []):
-            (tmp_path / "testplugin.py").write_text(ON_LOAD_EXCEPTION_PLUGIN)
+        (tmp_path / "testplugin.py").write_text(ON_LOAD_EXCEPTION_PLUGIN)
 
-            with conf_vars({("core", "plugins_folder"): os.fspath(tmp_path)}):
-                plugins_manager.load_plugins_from_plugin_directory()
+        with (
+            conf_vars({("core", "plugins_folder"): os.fspath(tmp_path)}),
+            mock.patch("airflow.plugins_manager._load_entrypoint_plugins", 
return_value=([], [])),
+            mock.patch("airflow.plugins_manager._load_providers_plugins", 
return_value=([], [])),
+        ):
+            plugins, import_errors = plugins_manager._get_plugins()
 
-            assert len(plugins_manager.plugins) == 3  # three are loaded from 
examples
+        assert len(plugins) == 3  # three are loaded from examples
+        assert len(import_errors) == 1
 
-            received_logs = caplog.text
-            assert "Failed to import plugin" in received_logs
-            assert "testplugin.py" in received_logs
+        received_logs = caplog.text
+        assert "Failed to load plugin" in received_logs
+        assert "testplugin.py" in received_logs
 
     def test_should_warning_about_incompatible_plugins(self, caplog):
         class AirflowAdminViewsPlugin(AirflowPlugin):
@@ -134,7 +137,7 @@ class TestPluginsManager:
         ):
             from airflow import plugins_manager
 
-            plugins_manager.initialize_flask_plugins()
+            plugins_manager.get_flask_plugins()
 
         assert caplog.record_tuples == [
             (
@@ -169,14 +172,16 @@ class TestPluginsManager:
         ):
             from airflow import plugins_manager
 
-            plugins_manager.initialize_ui_plugins()
+            external_views, react_apps = plugins_manager._get_ui_plugins()
 
             # Verify that the conflicting external view and react app are not 
loaded
-            plugin_b = next(plugin for plugin in plugins_manager.plugins if 
plugin.name == "test_plugin_b")
+            plugin_b = next(
+                plugin for plugin in plugins_manager._get_plugins()[0] if 
plugin.name == "test_plugin_b"
+            )
             assert plugin_b.external_views == []
             assert plugin_b.react_apps == []
-            assert len(plugins_manager.external_views) == 1
-            assert len(plugins_manager.react_apps) == 0
+            assert len(external_views) == 1
+            assert len(react_apps) == 0
 
     def 
test_should_warning_about_external_views_or_react_app_wrong_object(self, 
caplog):
         class TestPluginA(AirflowPlugin):
@@ -191,14 +196,16 @@ class TestPluginsManager:
         ):
             from airflow import plugins_manager
 
-            plugins_manager.initialize_ui_plugins()
+            external_views, react_apps = plugins_manager._get_ui_plugins()
 
             # Verify that the conflicting external view and react app are not 
loaded
-            plugin_a = next(plugin for plugin in plugins_manager.plugins if 
plugin.name == "test_plugin_a")
+            plugin_a = next(
+                plugin for plugin in plugins_manager._get_plugins()[0] if 
plugin.name == "test_plugin_a"
+            )
             assert plugin_a.external_views == [{"url_route": "/test_route"}]
             assert plugin_a.react_apps == [{"url_route": 
"/test_route_react_app"}]
-            assert len(plugins_manager.external_views) == 1
-            assert len(plugins_manager.react_apps) == 1
+            assert len(external_views) == 1
+            assert len(react_apps) == 1
 
         assert caplog.record_tuples == [
             (
@@ -232,7 +239,7 @@ class TestPluginsManager:
         ):
             from airflow import plugins_manager
 
-            plugins_manager.initialize_flask_plugins()
+            plugins_manager.get_flask_plugins()
 
         assert caplog.record_tuples == []
 
@@ -255,7 +262,7 @@ class TestPluginsManager:
         ):
             from airflow import plugins_manager
 
-            plugins_manager.initialize_flask_plugins()
+            plugins_manager.get_flask_plugins()
 
         assert caplog.record_tuples == []
 
@@ -263,7 +270,7 @@ class TestPluginsManager:
         """
         Test that Airflow does not raise an error if there is any Exception 
because of a plugin.
         """
-        from airflow.plugins_manager import import_errors, 
load_entrypoint_plugins
+        from airflow.plugins_manager import _load_entrypoint_plugins
 
         mock_dist = mock.Mock()
         mock_dist.metadata = {"Name": "test-dist"}
@@ -279,14 +286,17 @@ class TestPluginsManager:
             mock_metadata_distribution(return_value=[mock_dist]),
             caplog.at_level(logging.ERROR, logger="airflow.plugins_manager"),
         ):
-            load_entrypoint_plugins()
+            _, import_errors = _load_entrypoint_plugins()
 
             received_logs = caplog.text
             # Assert Traceback is shown too
             assert "Traceback (most recent call last):" in received_logs
             assert "my_fake_module not found" in received_logs
             assert "Failed to import plugin test-entrypoint" in received_logs
-            assert ("test.plugins.test_plugins_manager", "my_fake_module not 
found") in import_errors.items()
+            assert (
+                "test.plugins.test_plugins_manager",
+                "my_fake_module not found",
+            ) in import_errors.items()
 
     def test_registering_plugin_macros(self, request):
         """
@@ -326,15 +336,14 @@ class TestPluginsManager:
             # Verify that the symbol table in 
airflow.sdk.execution_time.macros has been updated with an entry for
             # this plugin, this is necessary in order to allow the plugin's 
macros to be used when
             # rendering templates.
-            assert hasattr(macros, MacroPlugin.name)
+            assert hasattr(macros, MacroPlugin.name or "")
 
     @skip_if_force_lowest_dependencies_marker
     def test_registering_plugin_listeners(self):
         from airflow import plugins_manager
 
         assert not get_listener_manager().has_listeners
-        with mock.patch("airflow.plugins_manager.plugins", []):
-            plugins_manager.load_plugins_from_plugin_directory()
+        with 
mock_plugin_manager(plugins=plugins_manager._load_plugins_from_plugin_directory()[0]):
             plugins_manager.integrate_listener_plugins(get_listener_manager())
 
             assert get_listener_manager().has_listeners
@@ -351,10 +360,9 @@ class TestPluginsManager:
     def test_should_import_plugin_from_providers(self):
         from airflow import plugins_manager
 
-        with mock.patch("airflow.plugins_manager.plugins", []):
-            assert len(plugins_manager.plugins) == 0
-            plugins_manager.load_providers_plugins()
-            assert len(plugins_manager.plugins) >= 2
+        plugins, import_errors = plugins_manager._load_providers_plugins()
+        assert len(plugins) >= 2
+        assert not import_errors
 
     @skip_if_force_lowest_dependencies_marker
     def test_does_not_double_import_entrypoint_provider_plugins(self):
@@ -369,11 +377,10 @@ class TestPluginsManager:
         mock_dist.version = "1.0.0"
         mock_dist.entry_points = [mock_entrypoint]
 
-        with mock.patch("airflow.plugins_manager.plugins", []):
-            assert len(plugins_manager.plugins) == 0
-            plugins_manager.load_entrypoint_plugins()
-            plugins_manager.load_providers_plugins()
-            assert len(plugins_manager.plugins) == 4
+        # Mock/skip loading from plugin dir
+        with 
mock.patch("airflow.plugins_manager._load_plugins_from_plugin_directory", 
return_value=([], [])):
+            plugins = plugins_manager._get_plugins()[0]
+        assert len(plugins) == 4
 
 
 class TestPluginsDirectorySource:
@@ -400,7 +407,7 @@ class TestEntryPointSource:
         mock_dist.entry_points = [mock_entrypoint]
 
         with mock_metadata_distribution(return_value=[mock_dist]):
-            plugins_manager.load_entrypoint_plugins()
+            plugins_manager._load_entrypoint_plugins()
 
         source = plugins_manager.EntryPointSource(mock_entrypoint, mock_dist)
         assert str(mock_entrypoint) == source.entrypoint
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 7b87c6e00a8..8e25e7663d1 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -479,15 +479,16 @@ def serialize_subprocess(queue, dag_folder):
 
 
 @pytest.fixture
-def timetable_plugin(monkeypatch):
+def timetable_plugin(monkeypatch: pytest.MonkeyPatch):
     """Patch plugins manager to always and only return our custom timetable."""
     from airflow import plugins_manager
 
-    monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins", 
lambda: None)
     monkeypatch.setattr(
         plugins_manager,
-        "timetable_classes",
-        {"tests_common.test_utils.timetables.CustomSerializationTimetable": 
CustomSerializationTimetable},
+        "get_timetables_plugins",
+        lambda: {
+            "tests_common.test_utils.timetables.CustomSerializationTimetable": 
CustomSerializationTimetable
+        },
     )
 
 
diff --git a/devel-common/src/tests_common/pytest_plugin.py 
b/devel-common/src/tests_common/pytest_plugin.py
index e622fd6a128..29ef46a1fec 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -217,7 +217,7 @@ def mock_plugins_manager_for_all_non_db_tests():
         return
     from tests_common.test_utils.mock_plugins import mock_plugin_manager
 
-    with mock_plugin_manager() as _fixture:
+    with mock_plugin_manager(plugins=[]) as _fixture:
         yield _fixture
 
 
diff --git a/devel-common/src/tests_common/test_utils/mock_plugins.py 
b/devel-common/src/tests_common/test_utils/mock_plugins.py
index 36d65a82358..dfc654d15ee 100644
--- a/devel-common/src/tests_common/test_utils/mock_plugins.py
+++ b/devel-common/src/tests_common/test_utils/mock_plugins.py
@@ -19,9 +19,9 @@ from __future__ import annotations
 from contextlib import ExitStack, contextmanager
 from unittest import mock
 
-from airflow import __version__ as airflow_version
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
 
-PLUGINS_MANAGER_NULLABLE_ATTRIBUTES = [
+PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V3_0 = [
     "plugins",
     "macros_modules",
     "admin_views",
@@ -75,41 +75,71 @@ def mock_plugin_manager(plugins=None, **kwargs):
     Use this context if you want your test to not have side effects in 
airflow.plugins_manager, and
     other tests do not affect the results of this test.
     """
-    illegal_arguments = set(kwargs.keys()) - 
set(PLUGINS_MANAGER_NULLABLE_ATTRIBUTES) - {"import_errors"}
+    illegal_arguments = set(kwargs.keys()) - 
set(PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V3_0) - {"import_errors"}
     if illegal_arguments:
         raise TypeError(
             f"TypeError: mock_plugin_manager got an unexpected keyword 
arguments: {illegal_arguments}"
         )
     # Handle plugins specially
     with ExitStack() as exit_stack:
+        if AIRFLOW_V_3_2_PLUS:
+            # Always start the block with an non-initialized plugins, so 
ensure_plugins_loaded runs.
+            from airflow import plugins_manager
+
+            plugins_manager._get_plugins.cache_clear()
+            plugins_manager._get_ui_plugins.cache_clear()
+            plugins_manager.get_flask_plugins.cache_clear()
+            plugins_manager.get_fastapi_plugins.cache_clear()
+            plugins_manager._get_extra_operators_links_plugins.cache_clear()
+            plugins_manager.get_timetables_plugins.cache_clear()
+            plugins_manager.get_hook_lineage_readers_plugins.cache_clear()
+            plugins_manager.integrate_macros_plugins.cache_clear()
+            plugins_manager.get_priority_weight_strategy_plugins.cache_clear()
+
+            if plugins is not None or "import_errors" in kwargs:
+                exit_stack.enter_context(
+                    mock.patch(
+                        "airflow.plugins_manager._get_plugins",
+                        return_value=(
+                            plugins or [],
+                            kwargs.get("import_errors", {}),
+                        ),
+                    )
+                )
+            elif kwargs:
+                raise NotImplementedError(
+                    "mock_plugin_manager does not support patching other 
attributes in Airflow 3.2+"
+                )
+        else:
 
-        def mock_loaded_plugins():
-            
exit_stack.enter_context(mock.patch("airflow.plugins_manager.plugins", plugins 
or []))
+            def mock_loaded_plugins():
+                
exit_stack.enter_context(mock.patch("airflow.plugins_manager.plugins", plugins 
or []))
 
-        exit_stack.enter_context(
-            mock.patch(
-                "airflow.plugins_manager.load_plugins_from_plugin_directory", 
side_effect=mock_loaded_plugins
+            exit_stack.enter_context(
+                mock.patch(
+                    
"airflow.plugins_manager.load_plugins_from_plugin_directory",
+                    side_effect=mock_loaded_plugins,
+                )
+            )
+            exit_stack.enter_context(
+                mock.patch("airflow.plugins_manager.load_providers_plugins", 
side_effect=mock_loaded_plugins)
+            )
+            exit_stack.enter_context(
+                mock.patch("airflow.plugins_manager.load_entrypoint_plugins", 
side_effect=mock_loaded_plugins)
             )
-        )
-        exit_stack.enter_context(
-            mock.patch("airflow.plugins_manager.load_providers_plugins", 
side_effect=mock_loaded_plugins)
-        )
-        exit_stack.enter_context(
-            mock.patch("airflow.plugins_manager.load_entrypoint_plugins", 
side_effect=mock_loaded_plugins)
-        )
 
-        if airflow_version <= "3":
-            ATTR_TO_PATCH = PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V2_10
-        else:
-            ATTR_TO_PATCH = PLUGINS_MANAGER_NULLABLE_ATTRIBUTES
+            if AIRFLOW_V_3_0_PLUS:
+                ATTR_TO_PATCH = PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V3_0
+            else:
+                ATTR_TO_PATCH = PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V2_10
 
-        for attr in ATTR_TO_PATCH:
-            
exit_stack.enter_context(mock.patch(f"airflow.plugins_manager.{attr}", 
kwargs.get(attr)))
+            for attr in ATTR_TO_PATCH:
+                
exit_stack.enter_context(mock.patch(f"airflow.plugins_manager.{attr}", 
kwargs.get(attr)))
 
-        # Always start the block with an empty plugins, so 
ensure_plugins_loaded runs.
-        exit_stack.enter_context(mock.patch("airflow.plugins_manager.plugins", 
None))
-        exit_stack.enter_context(
-            mock.patch("airflow.plugins_manager.import_errors", 
kwargs.get("import_errors", {}))
-        )
+            # Always start the block with an non-initialized plugins, so 
ensure_plugins_loaded runs.
+            
exit_stack.enter_context(mock.patch("airflow.plugins_manager.plugins", None))
+            exit_stack.enter_context(
+                mock.patch("airflow.plugins_manager.import_errors", 
kwargs.get("import_errors", {}))
+            )
 
         yield
diff --git a/providers/fab/.pre-commit-config.yaml 
b/providers/fab/.pre-commit-config.yaml
index 5dcd81e5107..7f05001e80e 100644
--- a/providers/fab/.pre-commit-config.yaml
+++ b/providers/fab/.pre-commit-config.yaml
@@ -27,7 +27,12 @@ repos:
       - id: compile-fab-assets
         name: Compile FAB provider assets
         language: node
-        files: ^.*/www/
+        files: ^src/airflow/providers/fab/www/
+        exclude: |
+          (?x)
+          ^src/airflow/providers/fab/www/api_connexion/.*|
+          ^src/airflow/providers/fab/www/extensions/.*|
+          ^src/airflow/providers/fab/www/security/.*
         entry: ../../scripts/ci/prek/compile_provider_assets.py fab
         pass_filenames: false
         additional_dependencies: ['[email protected]']
diff --git a/providers/fab/src/airflow/providers/fab/version_compat.py 
b/providers/fab/src/airflow/providers/fab/version_compat.py
index e1d9559cc31..350e25ce81b 100644
--- a/providers/fab/src/airflow/providers/fab/version_compat.py
+++ b/providers/fab/src/airflow/providers/fab/version_compat.py
@@ -34,3 +34,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
 AIRFLOW_V_3_1_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 1)
+AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
diff --git 
a/providers/fab/src/airflow/providers/fab/www/extensions/init_views.py 
b/providers/fab/src/airflow/providers/fab/www/extensions/init_views.py
index e7623e8217e..f26b6981615 100644
--- a/providers/fab/src/airflow/providers/fab/www/extensions/init_views.py
+++ b/providers/fab/src/airflow/providers/fab/www/extensions/init_views.py
@@ -26,7 +26,7 @@ from connexion.exceptions import BadRequestProblem, 
ProblemException
 from flask import request
 
 from airflow.api_fastapi.app import get_auth_manager
-from airflow.providers.fab.version_compat import AIRFLOW_V_3_1_PLUS
+from airflow.providers.fab.version_compat import AIRFLOW_V_3_1_PLUS, 
AIRFLOW_V_3_2_PLUS
 from airflow.providers.fab.www.api_connexion.exceptions import 
common_error_handler
 
 if TYPE_CHECKING:
@@ -102,11 +102,17 @@ def init_plugins(app):
     """Integrate Flask and FAB with plugins."""
     from airflow import plugins_manager
 
-    plugins_manager.initialize_flask_plugins()
+    if AIRFLOW_V_3_2_PLUS:
+        blueprints, appbuilder_views, appbuilder_menu_links = 
plugins_manager.get_flask_plugins()
+    else:
+        plugins_manager.initialize_flask_plugins()  # type: ignore
+        blueprints = plugins_manager.flask_blueprints  # type: ignore
+        appbuilder_views = plugins_manager.flask_appbuilder_views  # type: 
ignore
+        appbuilder_menu_links = plugins_manager.flask_appbuilder_menu_links  # 
type: ignore
 
     appbuilder = app.appbuilder
 
-    for view in plugins_manager.flask_appbuilder_views:
+    for view in appbuilder_views:
         name = view.get("name")
         if name:
             filtered_view_kwargs = {k: v for k, v in view.items() if k not in 
["view"]}
@@ -124,13 +130,11 @@ def init_plugins(app):
     # Since Airflow 3.1 flask_appbuilder_menu_links are added to the Airflow 3 
UI
     # navbar..
     if not AIRFLOW_V_3_1_PLUS:
-        for menu_link in sorted(
-            plugins_manager.flask_appbuilder_menu_links, key=lambda x: 
(x.get("category", ""), x["name"])
-        ):
+        for menu_link in sorted(appbuilder_menu_links, key=lambda x: 
(x.get("category", ""), x["name"])):
             log.debug("Adding menu link %s to %s", menu_link["name"], 
menu_link["href"])
             appbuilder.add_link(**menu_link)
 
-    for blue_print in plugins_manager.flask_blueprints:
+    for blue_print in blueprints:
         log.debug("Adding blueprint %s:%s", blue_print["name"], 
blue_print["blueprint"].import_name)
         app.register_blueprint(blue_print["blueprint"])
 
diff --git a/providers/fab/www-hash.txt b/providers/fab/www-hash.txt
index 144721dd69c..965b4823c96 100644
--- a/providers/fab/www-hash.txt
+++ b/providers/fab/www-hash.txt
@@ -1 +1 @@
-b1cb162c904247bab244b9fb163a340665e64301d6c62926ee583e82edf3023d
+b8ebef1806aed0a26ed8fd468040a51eaec700520bd41dbbdbf8136a4663eb6e
diff --git a/task-sdk/tests/conftest.py b/task-sdk/tests/conftest.py
index 6ae329e5345..eadb2e7d594 100644
--- a/task-sdk/tests/conftest.py
+++ b/task-sdk/tests/conftest.py
@@ -166,15 +166,12 @@ def _disable_ol_plugin():
     # And we load plugins when setting the priority_weight field
     import airflow.plugins_manager
 
-    old = airflow.plugins_manager.plugins
-
-    assert old is None, "Plugins already loaded, too late to stop them being 
loaded!"
-
-    airflow.plugins_manager.plugins = []
+    old = airflow.plugins_manager._get_plugins
+    airflow.plugins_manager._get_plugins = lambda: ([], {})
 
     yield
 
-    airflow.plugins_manager.plugins = None
+    airflow.plugins_manager._get_plugins = old
 
 
 @pytest.fixture(autouse=True)

Reply via email to