This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9da6fc3b8f8 Remove the ability to import executors from plugins 
(#43289)
9da6fc3b8f8 is described below

commit 9da6fc3b8f8b4989d94de381c4bdfc993113f981
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Oct 23 15:37:05 2024 +0100

    Remove the ability to import executors from plugins (#43289)
    
    Executors should no longer be registered or imported via Airflow's plugin 
mechanism -- these types of classes
    are just treated as plain python classes by Airflow, so there is no need to 
register them with Airflow.
---
 .../api_fastapi/core_api/openapi/v1-generated.yaml |  6 ---
 .../api_fastapi/core_api/serializers/plugins.py    |  1 -
 airflow/executors/executor_loader.py               | 12 ------
 airflow/plugins_manager.py                         | 33 +-------------
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  8 ----
 airflow/ui/openapi-gen/requests/types.gen.ts       |  1 -
 airflow/www/views.py                               |  1 -
 newsfragments/43289.significant.rst                |  4 ++
 .../endpoints/test_plugin_endpoint.py              |  1 -
 tests/api_connexion/schemas/test_plugin_schema.py  |  3 --
 tests/cli/commands/test_plugins_command.py         |  1 -
 tests/executors/test_executor_loader.py            | 50 +---------------------
 tests/plugins/test_plugin.py                       |  8 ----
 tests_common/test_utils/mock_plugins.py            |  2 -
 14 files changed, 7 insertions(+), 124 deletions(-)

diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 9078c90a2c8..3d890d4da31 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2212,11 +2212,6 @@ components:
             type: string
           type: array
           title: Hooks
-        executors:
-          items:
-            type: string
-          type: array
-          title: Executors
         macros:
           items:
             type: string
@@ -2274,7 +2269,6 @@ components:
       required:
       - name
       - hooks
-      - executors
       - macros
       - flask_blueprints
       - fastapi_apps
diff --git a/airflow/api_fastapi/core_api/serializers/plugins.py 
b/airflow/api_fastapi/core_api/serializers/plugins.py
index ee6812bb954..68bc8ea443c 100644
--- a/airflow/api_fastapi/core_api/serializers/plugins.py
+++ b/airflow/api_fastapi/core_api/serializers/plugins.py
@@ -65,7 +65,6 @@ class PluginResponse(BaseModel):
 
     name: str
     hooks: list[str]
-    executors: list[str]
     macros: list[str]
     flask_blueprints: list[str]
     fastapi_apps: list[FastAPIAppResponse]
diff --git a/airflow/executors/executor_loader.py 
b/airflow/executors/executor_loader.py
index 4a940793df2..f74153f95fc 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -21,7 +21,6 @@ from __future__ import annotations
 import functools
 import logging
 import os
-from contextlib import suppress
 from typing import TYPE_CHECKING
 
 from airflow.api_internal.internal_api_call import InternalApiConfig
@@ -284,17 +283,6 @@ class ExecutorLoader:
                 cls.validate_database_executor_compatibility(executor)
             return executor
 
-        if executor_name.connector_source == ConnectorSource.PLUGIN:
-            with suppress(ImportError, AttributeError):
-                # Load plugins here for executors as at that time the plugins 
might not have been
-                # initialized yet
-                from airflow import plugins_manager
-
-                plugins_manager.integrate_executor_plugins()
-                return (
-                    
_import_and_validate(f"airflow.executors.{executor_name.module_path}"),
-                    ConnectorSource.PLUGIN,
-                )
         return _import_and_validate(executor_name.module_path), 
executor_name.connector_source
 
     @classmethod
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 2ec1388d163..fc7adc5993f 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -64,7 +64,6 @@ loaded_plugins: set[str] = set()
 # Plugin components to integrate as modules
 registered_hooks: list[BaseHook] | None = None
 macros_modules: list[Any] | None = None
-executors_modules: list[Any] | None = None
 
 # Plugin components to integrate directly
 admin_views: list[Any] | None = None
@@ -88,7 +87,6 @@ during deserialization
 """
 PLUGINS_ATTRIBUTES_TO_DUMP = {
     "hooks",
-    "executors",
     "macros",
     "admin_views",
     "flask_blueprints",
@@ -154,7 +152,6 @@ class AirflowPlugin:
     name: str | None = None
     source: AirflowPluginSource | None = None
     hooks: list[Any] = []
-    executors: list[Any] = []
     macros: list[Any] = []
     admin_views: list[Any] = []
     flask_blueprints: list[Any] = []
@@ -533,33 +530,6 @@ def initialize_hook_lineage_readers_plugins():
         hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
 
 
-def integrate_executor_plugins() -> None:
-    """Integrate executor plugins to the context."""
-    global plugins
-    global executors_modules
-
-    if executors_modules is not None:
-        return
-
-    ensure_plugins_loaded()
-
-    if plugins is None:
-        raise AirflowPluginException("Can't load plugins.")
-
-    log.debug("Integrate executor plugins")
-
-    executors_modules = []
-    for plugin in plugins:
-        if plugin.name is None:
-            raise AirflowPluginException("Invalid plugin name")
-        plugin_name: str = plugin.name
-
-        executors_module = make_module("airflow.executors." + plugin_name, 
plugin.executors)
-        if executors_module:
-            executors_modules.append(executors_module)
-            sys.modules[executors_module.__name__] = executors_module
-
-
 def integrate_macros_plugins() -> None:
     """Integrates macro plugins."""
     global plugins
@@ -615,7 +585,6 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = 
None) -> list[dict[str
     :param attrs_to_dump: A list of plugin attributes to dump
     """
     ensure_plugins_loaded()
-    integrate_executor_plugins()
     integrate_macros_plugins()
     initialize_web_ui_plugins()
     initialize_fastapi_plugins()
@@ -629,7 +598,7 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None = 
None) -> list[dict[str
             for attr in attrs_to_dump:
                 if attr in ("global_operator_extra_links", 
"operator_extra_links"):
                     info[attr] = [f"<{qualname(d.__class__)} object>" for d in 
getattr(plugin, attr)]
-                elif attr in ("macros", "timetables", "hooks", "executors", 
"priority_weight_strategies"):
+                elif attr in ("macros", "timetables", "hooks", 
"priority_weight_strategies"):
                     info[attr] = [qualname(d) for d in getattr(plugin, attr)]
                 elif attr == "listeners":
                     # listeners may be modules or class instances
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 530125a19dd..356814d15d5 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1333,13 +1333,6 @@ export const $PluginResponse = {
       type: "array",
       title: "Hooks",
     },
-    executors: {
-      items: {
-        type: "string",
-      },
-      type: "array",
-      title: "Executors",
-    },
     macros: {
       items: {
         type: "string",
@@ -1419,7 +1412,6 @@ export const $PluginResponse = {
   required: [
     "name",
     "hooks",
-    "executors",
     "macros",
     "flask_blueprints",
     "fastapi_apps",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 75f2296379a..40054121470 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -311,7 +311,6 @@ export type PluginCollectionResponse = {
 export type PluginResponse = {
   name: string;
   hooks: Array<string>;
-  executors: Array<string>;
   macros: Array<string>;
   flask_blueprints: Array<string>;
   fastapi_apps: Array<FastAPIAppResponse>;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c5bbdd389b0..c153cc80597 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -4286,7 +4286,6 @@ class PluginView(AirflowBaseView):
     def list(self):
         """List loaded plugins."""
         plugins_manager.ensure_plugins_loaded()
-        plugins_manager.integrate_executor_plugins()
         plugins_manager.initialize_extra_operators_links_plugins()
         plugins_manager.initialize_web_ui_plugins()
         plugins_manager.initialize_fastapi_plugins()
diff --git a/newsfragments/43289.significant.rst 
b/newsfragments/43289.significant.rst
new file mode 100644
index 00000000000..15063202640
--- /dev/null
+++ b/newsfragments/43289.significant.rst
@@ -0,0 +1,4 @@
+Support for adding executors via Airflow Plugins is removed
+
+Executors should no longer be registered or imported via Airflow's plugin 
mechanism -- these types of classes
+are just treated as plain Python classes by Airflow, so there is no need to 
register them with Airflow.
diff --git a/tests/api_connexion/endpoints/test_plugin_endpoint.py 
b/tests/api_connexion/endpoints/test_plugin_endpoint.py
index 924de84dc0d..487ba53a300 100644
--- a/tests/api_connexion/endpoints/test_plugin_endpoint.py
+++ b/tests/api_connexion/endpoints/test_plugin_endpoint.py
@@ -145,7 +145,6 @@ class TestGetPlugins(TestPluginsEndpoint):
                 {
                     "appbuilder_menu_items": [appbuilder_menu_items],
                     "appbuilder_views": [{"view": qualname(MockView)}],
-                    "executors": [],
                     "flask_blueprints": [
                         f"<{qualname(bp.__class__)}: name={bp.name!r} 
import_name={bp.import_name!r}>"
                     ],
diff --git a/tests/api_connexion/schemas/test_plugin_schema.py 
b/tests/api_connexion/schemas/test_plugin_schema.py
index 0c7141e3493..951933e9ffc 100644
--- a/tests/api_connexion/schemas/test_plugin_schema.py
+++ b/tests/api_connexion/schemas/test_plugin_schema.py
@@ -86,7 +86,6 @@ class TestPluginSchema(TestPluginBase):
         assert deserialized_plugin == {
             "appbuilder_menu_items": [appbuilder_menu_items],
             "appbuilder_views": [{"view": 
self.mock_plugin.appbuilder_views[0]["view"]}],
-            "executors": [],
             "flask_blueprints": [str(bp)],
             "fastapi_apps": [
                 {"app": app, "name": "App name", "url_prefix": "/some_prefix"},
@@ -113,7 +112,6 @@ class TestPluginCollectionSchema(TestPluginBase):
                 {
                     "appbuilder_menu_items": [appbuilder_menu_items],
                     "appbuilder_views": [{"view": 
self.mock_plugin.appbuilder_views[0]["view"]}],
-                    "executors": [],
                     "flask_blueprints": [str(bp)],
                     "fastapi_apps": [
                         {"app": app, "name": "App name", "url_prefix": 
"/some_prefix"},
@@ -131,7 +129,6 @@ class TestPluginCollectionSchema(TestPluginBase):
                 {
                     "appbuilder_menu_items": [appbuilder_menu_items],
                     "appbuilder_views": [{"view": 
self.mock_plugin.appbuilder_views[0]["view"]}],
-                    "executors": [],
                     "flask_blueprints": [str(bp)],
                     "fastapi_apps": [
                         {"app": app, "name": "App name", "url_prefix": 
"/some_prefix"},
diff --git a/tests/cli/commands/test_plugins_command.py 
b/tests/cli/commands/test_plugins_command.py
index d07641ec841..c9807520e4e 100644
--- a/tests/cli/commands/test_plugins_command.py
+++ b/tests/cli/commands/test_plugins_command.py
@@ -69,7 +69,6 @@ class TestPluginsCommand:
                 "admin_views": [],
                 "macros": ["tests.plugins.test_plugin.plugin_macro"],
                 "menu_links": [],
-                "executors": ["tests.plugins.test_plugin.PluginExecutor"],
                 "flask_blueprints": [
                     "<flask.blueprints.Blueprint: name='test_plugin' 
import_name='tests.plugins.test_plugin'>"
                 ],
diff --git a/tests/executors/test_executor_loader.py 
b/tests/executors/test_executor_loader.py
index 40a336bc580..68bc02a6300 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -22,7 +22,6 @@ from unittest import mock
 
 import pytest
 
-from airflow import plugins_manager
 from airflow.exceptions import AirflowConfigException
 from airflow.executors import executor_loader
 from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader, 
ExecutorName
@@ -34,9 +33,6 @@ from tests_common.test_utils.config import conf_vars
 
 pytestmark = pytest.mark.skip_if_database_isolation_mode
 
-# Plugin Manager creates new modules, which is difficult to mock, so we use 
test isolation by a unique name.
-TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties"
-
 
 class FakeExecutor:
     is_single_threaded = False
@@ -46,11 +42,6 @@ class FakeSingleThreadedExecutor:
     is_single_threaded = True
 
 
-class FakePlugin(plugins_manager.AirflowPlugin):
-    name = TEST_PLUGIN_NAME
-    executors = [FakeExecutor]
-
-
 class TestExecutorLoader:
     def setup_method(self) -> None:
         from airflow.executors import executor_loader
@@ -89,17 +80,6 @@ class TestExecutorLoader:
             assert executor.name == 
ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
             assert executor.name.connector_source == ConnectorSource.CORE
 
-    @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
-    @mock.patch("airflow.plugins_manager.executors_modules", None)
-    def test_should_support_plugins(self):
-        with conf_vars({("core", "executor"): 
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
-            executor = ExecutorLoader.get_default_executor()
-            assert executor is not None
-            assert "FakeExecutor" == executor.__class__.__name__
-            assert executor.name is not None
-            assert executor.name == 
ExecutorName(f"{TEST_PLUGIN_NAME}.FakeExecutor")
-            assert executor.name.connector_source == ConnectorSource.PLUGIN
-
     def test_should_support_custom_path(self):
         with conf_vars({("core", "executor"): 
"tests.executors.test_executor_loader.FakeExecutor"}):
             executor = ExecutorLoader.get_default_executor()
@@ -124,7 +104,7 @@ class TestExecutorLoader:
             ),
             # Core executors and custom module path executor and plugin
             (
-                f"CeleryExecutor, LocalExecutor, 
tests.executors.test_executor_loader.FakeExecutor, 
{TEST_PLUGIN_NAME}.FakeExecutor",
+                "CeleryExecutor, LocalExecutor, 
tests.executors.test_executor_loader.FakeExecutor",
                 [
                     ExecutorName(
                         
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
@@ -138,17 +118,12 @@ class TestExecutorLoader:
                         "tests.executors.test_executor_loader.FakeExecutor",
                         None,
                     ),
-                    ExecutorName(
-                        f"{TEST_PLUGIN_NAME}.FakeExecutor",
-                        None,
-                    ),
                 ],
             ),
             # Core executors and custom module path executor and plugin with 
aliases
             (
                 (
-                    "CeleryExecutor, LocalExecutor, 
fake_exec:tests.executors.test_executor_loader.FakeExecutor, "
-                    f"plugin_exec:{TEST_PLUGIN_NAME}.FakeExecutor"
+                    "CeleryExecutor, LocalExecutor, 
fake_exec:tests.executors.test_executor_loader.FakeExecutor"
                 ),
                 [
                     ExecutorName(
@@ -163,10 +138,6 @@ class TestExecutorLoader:
                         "tests.executors.test_executor_loader.FakeExecutor",
                         "fake_exec",
                     ),
-                    ExecutorName(
-                        f"{TEST_PLUGIN_NAME}.FakeExecutor",
-                        "plugin_exec",
-                    ),
                 ],
             ),
         ],
@@ -194,8 +165,6 @@ class TestExecutorLoader:
             "CeleryExecutor, my.module.path, my.module.path",
             "CeleryExecutor, my_alias:my.module.path, my.module.path",
             "CeleryExecutor, my_alias:my.module.path, 
other_alias:my.module.path",
-            f"CeleryExecutor, {TEST_PLUGIN_NAME}.FakeExecutor, 
{TEST_PLUGIN_NAME}.FakeExecutor",
-            f"my_alias:{TEST_PLUGIN_NAME}.FakeExecutor, 
other_alias:{TEST_PLUGIN_NAME}.FakeExecutor",
         ],
     )
     def test_get_hybrid_executors_from_config_duplicates_should_fail(self, 
executor_config):
@@ -239,21 +208,6 @@ class TestExecutorLoader:
             assert expected_value == executor.__name__
             assert import_source == ConnectorSource.CORE
 
-    @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
-    @mock.patch("airflow.plugins_manager.executors_modules", None)
-    @pytest.mark.parametrize(
-        ("executor_config"),
-        [
-            (f"{TEST_PLUGIN_NAME}.FakeExecutor"),
-            (f"my_cool_alias:{TEST_PLUGIN_NAME}.FakeExecutor, CeleryExecutor"),
-        ],
-    )
-    def test_should_support_import_plugins(self, executor_config):
-        with conf_vars({("core", "executor"): executor_config}):
-            executor, import_source = 
ExecutorLoader.import_default_executor_cls()
-            assert "FakeExecutor" == executor.__name__
-            assert import_source == ConnectorSource.PLUGIN
-
     @pytest.mark.parametrize(
         "executor_config",
         [
diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py
index 01b18c48a63..98f64e75456 100644
--- a/tests/plugins/test_plugin.py
+++ b/tests/plugins/test_plugin.py
@@ -21,8 +21,6 @@ from fastapi import FastAPI
 from flask import Blueprint
 from flask_appbuilder import BaseView as AppBuilderBaseView, expose
 
-from airflow.executors.base_executor import BaseExecutor
-
 # Importing base classes that we need to derive
 from airflow.hooks.base import BaseHook
 
@@ -49,11 +47,6 @@ class PluginHook(BaseHook):
     pass
 
 
-# Will show up under airflow.executors.test_plugin.PluginExecutor
-class PluginExecutor(BaseExecutor):
-    pass
-
-
 # Will show up under airflow.macros.test_plugin.plugin_macro
 def plugin_macro():
     pass
@@ -123,7 +116,6 @@ class CustomPriorityWeightStrategy(PriorityWeightStrategy):
 class AirflowTestPlugin(AirflowPlugin):
     name = "test_plugin"
     hooks = [PluginHook]
-    executors = [PluginExecutor]
     macros = [plugin_macro]
     flask_blueprints = [bp]
     fastapi_apps = [app_with_metadata]
diff --git a/tests_common/test_utils/mock_plugins.py 
b/tests_common/test_utils/mock_plugins.py
index 3e50f1b413e..875a9abbd3a 100644
--- a/tests_common/test_utils/mock_plugins.py
+++ b/tests_common/test_utils/mock_plugins.py
@@ -25,7 +25,6 @@ PLUGINS_MANAGER_NULLABLE_ATTRIBUTES = [
     "plugins",
     "registered_hooks",
     "macros_modules",
-    "executors_modules",
     "admin_views",
     "flask_blueprints",
     "fastapi_apps",
@@ -44,7 +43,6 @@ PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V2_10 = [
     "plugins",
     "registered_hooks",
     "macros_modules",
-    "executors_modules",
     "admin_views",
     "flask_blueprints",
     "menu_links",

Reply via email to