This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9da6fc3b8f8 Remove the ability to import executors from plugins
(#43289)
9da6fc3b8f8 is described below
commit 9da6fc3b8f8b4989d94de381c4bdfc993113f981
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Oct 23 15:37:05 2024 +0100
Remove the ability to import executors from plugins (#43289)
Executors should no longer be registered or imported via Airflow's plugin
mechanism -- these types of classes
are just treated as plain python classes by Airflow, so there is no need to
register them with Airflow.
---
.../api_fastapi/core_api/openapi/v1-generated.yaml | 6 ---
.../api_fastapi/core_api/serializers/plugins.py | 1 -
airflow/executors/executor_loader.py | 12 ------
airflow/plugins_manager.py | 33 +-------------
airflow/ui/openapi-gen/requests/schemas.gen.ts | 8 ----
airflow/ui/openapi-gen/requests/types.gen.ts | 1 -
airflow/www/views.py | 1 -
newsfragments/43289.significant.rst | 4 ++
.../endpoints/test_plugin_endpoint.py | 1 -
tests/api_connexion/schemas/test_plugin_schema.py | 3 --
tests/cli/commands/test_plugins_command.py | 1 -
tests/executors/test_executor_loader.py | 50 +---------------------
tests/plugins/test_plugin.py | 8 ----
tests_common/test_utils/mock_plugins.py | 2 -
14 files changed, 7 insertions(+), 124 deletions(-)
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 9078c90a2c8..3d890d4da31 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2212,11 +2212,6 @@ components:
type: string
type: array
title: Hooks
- executors:
- items:
- type: string
- type: array
- title: Executors
macros:
items:
type: string
@@ -2274,7 +2269,6 @@ components:
required:
- name
- hooks
- - executors
- macros
- flask_blueprints
- fastapi_apps
diff --git a/airflow/api_fastapi/core_api/serializers/plugins.py
b/airflow/api_fastapi/core_api/serializers/plugins.py
index ee6812bb954..68bc8ea443c 100644
--- a/airflow/api_fastapi/core_api/serializers/plugins.py
+++ b/airflow/api_fastapi/core_api/serializers/plugins.py
@@ -65,7 +65,6 @@ class PluginResponse(BaseModel):
name: str
hooks: list[str]
- executors: list[str]
macros: list[str]
flask_blueprints: list[str]
fastapi_apps: list[FastAPIAppResponse]
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 4a940793df2..f74153f95fc 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -21,7 +21,6 @@ from __future__ import annotations
import functools
import logging
import os
-from contextlib import suppress
from typing import TYPE_CHECKING
from airflow.api_internal.internal_api_call import InternalApiConfig
@@ -284,17 +283,6 @@ class ExecutorLoader:
cls.validate_database_executor_compatibility(executor)
return executor
- if executor_name.connector_source == ConnectorSource.PLUGIN:
- with suppress(ImportError, AttributeError):
- # Load plugins here for executors as at that time the plugins
might not have been
- # initialized yet
- from airflow import plugins_manager
-
- plugins_manager.integrate_executor_plugins()
- return (
-
_import_and_validate(f"airflow.executors.{executor_name.module_path}"),
- ConnectorSource.PLUGIN,
- )
return _import_and_validate(executor_name.module_path),
executor_name.connector_source
@classmethod
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 2ec1388d163..fc7adc5993f 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -64,7 +64,6 @@ loaded_plugins: set[str] = set()
# Plugin components to integrate as modules
registered_hooks: list[BaseHook] | None = None
macros_modules: list[Any] | None = None
-executors_modules: list[Any] | None = None
# Plugin components to integrate directly
admin_views: list[Any] | None = None
@@ -88,7 +87,6 @@ during deserialization
"""
PLUGINS_ATTRIBUTES_TO_DUMP = {
"hooks",
- "executors",
"macros",
"admin_views",
"flask_blueprints",
@@ -154,7 +152,6 @@ class AirflowPlugin:
name: str | None = None
source: AirflowPluginSource | None = None
hooks: list[Any] = []
- executors: list[Any] = []
macros: list[Any] = []
admin_views: list[Any] = []
flask_blueprints: list[Any] = []
@@ -533,33 +530,6 @@ def initialize_hook_lineage_readers_plugins():
hook_lineage_reader_classes.extend(plugin.hook_lineage_readers)
-def integrate_executor_plugins() -> None:
- """Integrate executor plugins to the context."""
- global plugins
- global executors_modules
-
- if executors_modules is not None:
- return
-
- ensure_plugins_loaded()
-
- if plugins is None:
- raise AirflowPluginException("Can't load plugins.")
-
- log.debug("Integrate executor plugins")
-
- executors_modules = []
- for plugin in plugins:
- if plugin.name is None:
- raise AirflowPluginException("Invalid plugin name")
- plugin_name: str = plugin.name
-
- executors_module = make_module("airflow.executors." + plugin_name,
plugin.executors)
- if executors_module:
- executors_modules.append(executors_module)
- sys.modules[executors_module.__name__] = executors_module
-
-
def integrate_macros_plugins() -> None:
"""Integrates macro plugins."""
global plugins
@@ -615,7 +585,6 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None =
None) -> list[dict[str
:param attrs_to_dump: A list of plugin attributes to dump
"""
ensure_plugins_loaded()
- integrate_executor_plugins()
integrate_macros_plugins()
initialize_web_ui_plugins()
initialize_fastapi_plugins()
@@ -629,7 +598,7 @@ def get_plugin_info(attrs_to_dump: Iterable[str] | None =
None) -> list[dict[str
for attr in attrs_to_dump:
if attr in ("global_operator_extra_links",
"operator_extra_links"):
info[attr] = [f"<{qualname(d.__class__)} object>" for d in
getattr(plugin, attr)]
- elif attr in ("macros", "timetables", "hooks", "executors",
"priority_weight_strategies"):
+ elif attr in ("macros", "timetables", "hooks",
"priority_weight_strategies"):
info[attr] = [qualname(d) for d in getattr(plugin, attr)]
elif attr == "listeners":
# listeners may be modules or class instances
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 530125a19dd..356814d15d5 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1333,13 +1333,6 @@ export const $PluginResponse = {
type: "array",
title: "Hooks",
},
- executors: {
- items: {
- type: "string",
- },
- type: "array",
- title: "Executors",
- },
macros: {
items: {
type: "string",
@@ -1419,7 +1412,6 @@ export const $PluginResponse = {
required: [
"name",
"hooks",
- "executors",
"macros",
"flask_blueprints",
"fastapi_apps",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 75f2296379a..40054121470 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -311,7 +311,6 @@ export type PluginCollectionResponse = {
export type PluginResponse = {
name: string;
hooks: Array<string>;
- executors: Array<string>;
macros: Array<string>;
flask_blueprints: Array<string>;
fastapi_apps: Array<FastAPIAppResponse>;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c5bbdd389b0..c153cc80597 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -4286,7 +4286,6 @@ class PluginView(AirflowBaseView):
def list(self):
"""List loaded plugins."""
plugins_manager.ensure_plugins_loaded()
- plugins_manager.integrate_executor_plugins()
plugins_manager.initialize_extra_operators_links_plugins()
plugins_manager.initialize_web_ui_plugins()
plugins_manager.initialize_fastapi_plugins()
diff --git a/newsfragments/43289.significant.rst
b/newsfragments/43289.significant.rst
new file mode 100644
index 00000000000..15063202640
--- /dev/null
+++ b/newsfragments/43289.significant.rst
@@ -0,0 +1,4 @@
+Support for adding executors via Airflow Plugins is removed
+
+Executors should no longer be registered or imported via Airflow's plugin
mechanism -- these types of classes
+are just treated as plain Python classes by Airflow, so there is no need to
register them with Airflow.
diff --git a/tests/api_connexion/endpoints/test_plugin_endpoint.py
b/tests/api_connexion/endpoints/test_plugin_endpoint.py
index 924de84dc0d..487ba53a300 100644
--- a/tests/api_connexion/endpoints/test_plugin_endpoint.py
+++ b/tests/api_connexion/endpoints/test_plugin_endpoint.py
@@ -145,7 +145,6 @@ class TestGetPlugins(TestPluginsEndpoint):
{
"appbuilder_menu_items": [appbuilder_menu_items],
"appbuilder_views": [{"view": qualname(MockView)}],
- "executors": [],
"flask_blueprints": [
f"<{qualname(bp.__class__)}: name={bp.name!r}
import_name={bp.import_name!r}>"
],
diff --git a/tests/api_connexion/schemas/test_plugin_schema.py
b/tests/api_connexion/schemas/test_plugin_schema.py
index 0c7141e3493..951933e9ffc 100644
--- a/tests/api_connexion/schemas/test_plugin_schema.py
+++ b/tests/api_connexion/schemas/test_plugin_schema.py
@@ -86,7 +86,6 @@ class TestPluginSchema(TestPluginBase):
assert deserialized_plugin == {
"appbuilder_menu_items": [appbuilder_menu_items],
"appbuilder_views": [{"view":
self.mock_plugin.appbuilder_views[0]["view"]}],
- "executors": [],
"flask_blueprints": [str(bp)],
"fastapi_apps": [
{"app": app, "name": "App name", "url_prefix": "/some_prefix"},
@@ -113,7 +112,6 @@ class TestPluginCollectionSchema(TestPluginBase):
{
"appbuilder_menu_items": [appbuilder_menu_items],
"appbuilder_views": [{"view":
self.mock_plugin.appbuilder_views[0]["view"]}],
- "executors": [],
"flask_blueprints": [str(bp)],
"fastapi_apps": [
{"app": app, "name": "App name", "url_prefix":
"/some_prefix"},
@@ -131,7 +129,6 @@ class TestPluginCollectionSchema(TestPluginBase):
{
"appbuilder_menu_items": [appbuilder_menu_items],
"appbuilder_views": [{"view":
self.mock_plugin.appbuilder_views[0]["view"]}],
- "executors": [],
"flask_blueprints": [str(bp)],
"fastapi_apps": [
{"app": app, "name": "App name", "url_prefix":
"/some_prefix"},
diff --git a/tests/cli/commands/test_plugins_command.py
b/tests/cli/commands/test_plugins_command.py
index d07641ec841..c9807520e4e 100644
--- a/tests/cli/commands/test_plugins_command.py
+++ b/tests/cli/commands/test_plugins_command.py
@@ -69,7 +69,6 @@ class TestPluginsCommand:
"admin_views": [],
"macros": ["tests.plugins.test_plugin.plugin_macro"],
"menu_links": [],
- "executors": ["tests.plugins.test_plugin.PluginExecutor"],
"flask_blueprints": [
"<flask.blueprints.Blueprint: name='test_plugin'
import_name='tests.plugins.test_plugin'>"
],
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index 40a336bc580..68bc02a6300 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -22,7 +22,6 @@ from unittest import mock
import pytest
-from airflow import plugins_manager
from airflow.exceptions import AirflowConfigException
from airflow.executors import executor_loader
from airflow.executors.executor_loader import ConnectorSource, ExecutorLoader,
ExecutorName
@@ -34,9 +33,6 @@ from tests_common.test_utils.config import conf_vars
pytestmark = pytest.mark.skip_if_database_isolation_mode
-# Plugin Manager creates new modules, which is difficult to mock, so we use
test isolation by a unique name.
-TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties"
-
class FakeExecutor:
is_single_threaded = False
@@ -46,11 +42,6 @@ class FakeSingleThreadedExecutor:
is_single_threaded = True
-class FakePlugin(plugins_manager.AirflowPlugin):
- name = TEST_PLUGIN_NAME
- executors = [FakeExecutor]
-
-
class TestExecutorLoader:
def setup_method(self) -> None:
from airflow.executors import executor_loader
@@ -89,17 +80,6 @@ class TestExecutorLoader:
assert executor.name ==
ExecutorName(ExecutorLoader.executors[executor_name], alias=executor_name)
assert executor.name.connector_source == ConnectorSource.CORE
- @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
- @mock.patch("airflow.plugins_manager.executors_modules", None)
- def test_should_support_plugins(self):
- with conf_vars({("core", "executor"):
f"{TEST_PLUGIN_NAME}.FakeExecutor"}):
- executor = ExecutorLoader.get_default_executor()
- assert executor is not None
- assert "FakeExecutor" == executor.__class__.__name__
- assert executor.name is not None
- assert executor.name ==
ExecutorName(f"{TEST_PLUGIN_NAME}.FakeExecutor")
- assert executor.name.connector_source == ConnectorSource.PLUGIN
-
def test_should_support_custom_path(self):
with conf_vars({("core", "executor"):
"tests.executors.test_executor_loader.FakeExecutor"}):
executor = ExecutorLoader.get_default_executor()
@@ -124,7 +104,7 @@ class TestExecutorLoader:
),
# Core executors and custom module path executor and plugin
(
- f"CeleryExecutor, LocalExecutor,
tests.executors.test_executor_loader.FakeExecutor,
{TEST_PLUGIN_NAME}.FakeExecutor",
+ "CeleryExecutor, LocalExecutor,
tests.executors.test_executor_loader.FakeExecutor",
[
ExecutorName(
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
@@ -138,17 +118,12 @@ class TestExecutorLoader:
"tests.executors.test_executor_loader.FakeExecutor",
None,
),
- ExecutorName(
- f"{TEST_PLUGIN_NAME}.FakeExecutor",
- None,
- ),
],
),
# Core executors and custom module path executor and plugin with
aliases
(
(
- "CeleryExecutor, LocalExecutor,
fake_exec:tests.executors.test_executor_loader.FakeExecutor, "
- f"plugin_exec:{TEST_PLUGIN_NAME}.FakeExecutor"
+ "CeleryExecutor, LocalExecutor,
fake_exec:tests.executors.test_executor_loader.FakeExecutor"
),
[
ExecutorName(
@@ -163,10 +138,6 @@ class TestExecutorLoader:
"tests.executors.test_executor_loader.FakeExecutor",
"fake_exec",
),
- ExecutorName(
- f"{TEST_PLUGIN_NAME}.FakeExecutor",
- "plugin_exec",
- ),
],
),
],
@@ -194,8 +165,6 @@ class TestExecutorLoader:
"CeleryExecutor, my.module.path, my.module.path",
"CeleryExecutor, my_alias:my.module.path, my.module.path",
"CeleryExecutor, my_alias:my.module.path,
other_alias:my.module.path",
- f"CeleryExecutor, {TEST_PLUGIN_NAME}.FakeExecutor,
{TEST_PLUGIN_NAME}.FakeExecutor",
- f"my_alias:{TEST_PLUGIN_NAME}.FakeExecutor,
other_alias:{TEST_PLUGIN_NAME}.FakeExecutor",
],
)
def test_get_hybrid_executors_from_config_duplicates_should_fail(self,
executor_config):
@@ -239,21 +208,6 @@ class TestExecutorLoader:
assert expected_value == executor.__name__
assert import_source == ConnectorSource.CORE
- @mock.patch("airflow.plugins_manager.plugins", [FakePlugin()])
- @mock.patch("airflow.plugins_manager.executors_modules", None)
- @pytest.mark.parametrize(
- ("executor_config"),
- [
- (f"{TEST_PLUGIN_NAME}.FakeExecutor"),
- (f"my_cool_alias:{TEST_PLUGIN_NAME}.FakeExecutor, CeleryExecutor"),
- ],
- )
- def test_should_support_import_plugins(self, executor_config):
- with conf_vars({("core", "executor"): executor_config}):
- executor, import_source =
ExecutorLoader.import_default_executor_cls()
- assert "FakeExecutor" == executor.__name__
- assert import_source == ConnectorSource.PLUGIN
-
@pytest.mark.parametrize(
"executor_config",
[
diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py
index 01b18c48a63..98f64e75456 100644
--- a/tests/plugins/test_plugin.py
+++ b/tests/plugins/test_plugin.py
@@ -21,8 +21,6 @@ from fastapi import FastAPI
from flask import Blueprint
from flask_appbuilder import BaseView as AppBuilderBaseView, expose
-from airflow.executors.base_executor import BaseExecutor
-
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
@@ -49,11 +47,6 @@ class PluginHook(BaseHook):
pass
-# Will show up under airflow.executors.test_plugin.PluginExecutor
-class PluginExecutor(BaseExecutor):
- pass
-
-
# Will show up under airflow.macros.test_plugin.plugin_macro
def plugin_macro():
pass
@@ -123,7 +116,6 @@ class CustomPriorityWeightStrategy(PriorityWeightStrategy):
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
hooks = [PluginHook]
- executors = [PluginExecutor]
macros = [plugin_macro]
flask_blueprints = [bp]
fastapi_apps = [app_with_metadata]
diff --git a/tests_common/test_utils/mock_plugins.py
b/tests_common/test_utils/mock_plugins.py
index 3e50f1b413e..875a9abbd3a 100644
--- a/tests_common/test_utils/mock_plugins.py
+++ b/tests_common/test_utils/mock_plugins.py
@@ -25,7 +25,6 @@ PLUGINS_MANAGER_NULLABLE_ATTRIBUTES = [
"plugins",
"registered_hooks",
"macros_modules",
- "executors_modules",
"admin_views",
"flask_blueprints",
"fastapi_apps",
@@ -44,7 +43,6 @@ PLUGINS_MANAGER_NULLABLE_ATTRIBUTES_V2_10 = [
"plugins",
"registered_hooks",
"macros_modules",
- "executors_modules",
"admin_views",
"flask_blueprints",
"menu_links",