This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 83643e555a3 Remove core Airflow support for static hybrid executors
(#47322)
83643e555a3 is described below
commit 83643e555a3836b41ff3630c889ca5585e75f5fa
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Mar 5 16:06:59 2025 -0800
Remove core Airflow support for static hybrid executors (#47322)
Remove all the handholding and custom logic we have in core airflow
which allows the use of static hybrid executors like
LocalKubernetesExecutor and CeleryKubernetesExecutor. These executors
will still work on 2.X versions of Airflow, but moving forward they will
not be supported on Airflow 3
---
airflow/config_templates/config.yml | 3 +-
airflow/executors/executor_constants.py | 4 ---
airflow/executors/executor_loader.py | 37 +++-------------------
airflow/settings.py | 6 +---
.../local_commands/test_scheduler_command.py | 1 -
.../local_commands/test_standalone_command.py | 6 ----
tests/cli/conftest.py | 10 ++----
tests/cli/test_cli_parser.py | 4 ---
tests/executors/test_executor_loader.py | 2 --
tests/sensors/test_base.py | 8 -----
tests/utils/test_log_handlers.py | 10 +-----
11 files changed, 10 insertions(+), 81 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 69767306b52..637dcd0ef8f 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -71,8 +71,7 @@ core:
description: |
The executor class that airflow should use. Choices include
``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``,
- ``KubernetesExecutor``, ``CeleryKubernetesExecutor``,
``LocalKubernetesExecutor`` or the
- full import path to the class when using a custom executor.
+ ``KubernetesExecutor`` or the full import path to the class when using
a custom executor.
version_added: ~
type: string
example: ~
diff --git a/airflow/executors/executor_constants.py
b/airflow/executors/executor_constants.py
index 65d814f28ac..5d752e23233 100644
--- a/airflow/executors/executor_constants.py
+++ b/airflow/executors/executor_constants.py
@@ -28,19 +28,15 @@ class ConnectorSource(Enum):
LOCAL_EXECUTOR = "LocalExecutor"
-LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
-CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
- LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
diff --git a/airflow/executors/executor_loader.py
b/airflow/executors/executor_loader.py
index 6d6b8d115bc..80fed5c7278 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -25,12 +25,10 @@ from typing import TYPE_CHECKING
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
CORE_EXECUTOR_NAMES,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
- LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
@@ -59,12 +57,8 @@ class ExecutorLoader:
executors = {
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
- LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
- "executors.local_kubernetes_executor.LocalKubernetesExecutor",
SEQUENTIAL_EXECUTOR:
"airflow.executors.sequential_executor.SequentialExecutor",
CELERY_EXECUTOR:
"airflow.providers.celery.executors.celery_executor.CeleryExecutor",
- CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
- "executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.kubernetes_executor.KubernetesExecutor",
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
@@ -265,17 +259,12 @@ class ExecutorLoader:
_executor_name = executor_name
try:
- if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
- executor = cls.__load_celery_kubernetes_executor()
- elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
- executor = cls.__load_local_kubernetes_executor()
+ executor_cls, import_source =
cls.import_executor_cls(_executor_name)
+ log.debug("Loading executor %s from %s", _executor_name,
import_source.value)
+ if _executor_name.team_id:
+ executor = executor_cls(team_id=_executor_name.team_id)
else:
- executor_cls, import_source =
cls.import_executor_cls(_executor_name)
- log.debug("Loading executor %s from %s", _executor_name,
import_source.value)
- if _executor_name.team_id:
- executor = executor_cls(team_id=_executor_name.team_id)
- else:
- executor = executor_cls()
+ executor = executor_cls()
except ImportError as e:
log.error(e)
@@ -315,19 +304,3 @@ class ExecutorLoader:
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name)
return executor, source
-
- @classmethod
- def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
- celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
- kubernetes_executor =
import_string(cls.executors[KUBERNETES_EXECUTOR])()
-
- celery_kubernetes_executor_cls =
import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
- return celery_kubernetes_executor_cls(celery_executor,
kubernetes_executor)
-
- @classmethod
- def __load_local_kubernetes_executor(cls) -> BaseExecutor:
- local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
- kubernetes_executor =
import_string(cls.executors[KUBERNETES_EXECUTOR])()
-
- local_kubernetes_executor_cls =
import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
- return local_kubernetes_executor_cls(local_executor,
kubernetes_executor)
diff --git a/airflow/settings.py b/airflow/settings.py
index fb9450ef741..307ee1e668a 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -687,11 +687,7 @@ LAZY_LOAD_PLUGINS: bool = conf.getboolean("core",
"lazy_load_plugins", fallback=
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers",
fallback=True)
# Determines if the executor utilizes Kubernetes
-IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
- executor_constants.KUBERNETES_EXECUTOR,
- executor_constants.CELERY_KUBERNETES_EXECUTOR,
- executor_constants.LOCAL_KUBERNETES_EXECUTOR,
-}
+IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") ==
executor_constants.KUBERNETES_EXECUTOR
# Executors can set this to true to configure logging correctly for
# containerized executors.
diff --git a/tests/cli/commands/local_commands/test_scheduler_command.py
b/tests/cli/commands/local_commands/test_scheduler_command.py
index 2dfce8edde6..8da375b2b14 100644
--- a/tests/cli/commands/local_commands/test_scheduler_command.py
+++ b/tests/cli/commands/local_commands/test_scheduler_command.py
@@ -45,7 +45,6 @@ class TestSchedulerCommand:
("LocalExecutor", True),
("SequentialExecutor", True),
("KubernetesExecutor", False),
- ("LocalKubernetesExecutor", True),
],
)
@mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner")
diff --git a/tests/cli/commands/local_commands/test_standalone_command.py
b/tests/cli/commands/local_commands/test_standalone_command.py
index 464e0d3aec2..484596529f2 100644
--- a/tests/cli/commands/local_commands/test_standalone_command.py
+++ b/tests/cli/commands/local_commands/test_standalone_command.py
@@ -26,11 +26,9 @@ from airflow.cli.commands.local_commands.standalone_command
import StandaloneCom
from airflow.executors import executor_loader
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
- LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
@@ -40,17 +38,13 @@ class TestStandaloneCommand:
"conf_executor_name, conf_sql_alchemy_conn,
expected_standalone_executor",
[
(LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
- (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string",
SEQUENTIAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
- (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string",
SEQUENTIAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
- (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string",
LOCAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
- (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string",
LOCAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
],
diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py
index 17f891c94f8..5c46e0aa01b 100644
--- a/tests/cli/conftest.py
+++ b/tests/cli/conftest.py
@@ -23,8 +23,8 @@ import pytest
from airflow.executors import local_executor
from airflow.models.dagbag import DagBag
-from airflow.providers.celery.executors import celery_executor,
celery_kubernetes_executor
-from airflow.providers.cncf.kubernetes.executors import kubernetes_executor,
local_kubernetes_executor
+from airflow.providers.celery.executors import celery_executor
+from airflow.providers.cncf.kubernetes.executors import kubernetes_executor
from tests_common.test_utils.config import conf_vars
@@ -33,15 +33,9 @@ custom_executor_module = type(sys)("custom_executor")
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
-custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
- "CustomCeleryKubernetesExecutor",
(celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
-)
custom_executor_module.CustomLocalExecutor = type( # type: ignore
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
)
-custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
- "CustomLocalKubernetesExecutor",
(local_kubernetes_executor.LocalKubernetesExecutor,), {}
-)
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
)
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 3bfbaeb23bd..d50914c3adc 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -365,16 +365,12 @@ class TestCli:
"executor,expected_args",
[
("CeleryExecutor", ["celery"]),
- ("CeleryKubernetesExecutor", ["celery", "kubernetes"]),
("KubernetesExecutor", ["kubernetes"]),
("LocalExecutor", []),
- ("LocalKubernetesExecutor", ["kubernetes"]),
("SequentialExecutor", []),
# custom executors are mapped to the regular ones in `conftest.py`
("custom_executor.CustomLocalExecutor", []),
- ("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]),
("custom_executor.CustomCeleryExecutor", ["celery"]),
- ("custom_executor.CustomCeleryKubernetesExecutor", ["celery",
"kubernetes"]),
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
],
)
diff --git a/tests/executors/test_executor_loader.py
b/tests/executors/test_executor_loader.py
index de6703954b1..ebe5ae6c409 100644
--- a/tests/executors/test_executor_loader.py
+++ b/tests/executors/test_executor_loader.py
@@ -45,7 +45,6 @@ class TestExecutorLoader:
"executor_name",
[
"CeleryExecutor",
- "CeleryKubernetesExecutor",
"DebugExecutor",
"KubernetesExecutor",
"LocalExecutor",
@@ -287,7 +286,6 @@ class TestExecutorLoader:
("executor_config", "expected_value"),
[
("CeleryExecutor", "CeleryExecutor"),
- ("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"),
("DebugExecutor", "DebugExecutor"),
("KubernetesExecutor", "KubernetesExecutor"),
("LocalExecutor", "LocalExecutor"),
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 970960d7f1b..f7e1021ecb1 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -35,11 +35,9 @@ from airflow.exceptions import (
from airflow.executors.debug_executor import DebugExecutor
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
- LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
from airflow.executors.local_executor import LocalExecutor
@@ -48,9 +46,7 @@ from airflow.models import TaskInstance, TaskReschedule
from airflow.models.trigger import TriggerFailureReason
from airflow.models.xcom import XCom
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
-from airflow.providers.celery.executors.celery_kubernetes_executor import
CeleryKubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
KubernetesExecutor
-from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor
import LocalKubernetesExecutor
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue,
poke_mode_only
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
@@ -1306,20 +1302,16 @@ class TestBaseSensor:
"executor_cls_mode",
[
(CeleryExecutor, "poke"),
- (CeleryKubernetesExecutor, "poke"),
(DebugExecutor, "reschedule"),
(KubernetesExecutor, "poke"),
(LocalExecutor, "poke"),
- (LocalKubernetesExecutor, "poke"),
(SequentialExecutor, "poke"),
],
ids=[
CELERY_EXECUTOR,
- CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
- LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
],
)
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index f1245b86363..80a20bf66e4 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -199,21 +199,13 @@ class TestFileTaskLogHandler:
@pytest.mark.parametrize(
"executor_name",
[
- (executor_constants.LOCAL_KUBERNETES_EXECUTOR),
- (executor_constants.CELERY_KUBERNETES_EXECUTOR),
(executor_constants.KUBERNETES_EXECUTOR),
(None),
],
)
@conf_vars(
{
- ("core", "EXECUTOR"): ",".join(
- [
- executor_constants.LOCAL_KUBERNETES_EXECUTOR,
- executor_constants.CELERY_KUBERNETES_EXECUTOR,
- executor_constants.KUBERNETES_EXECUTOR,
- ]
- ),
+ ("core", "EXECUTOR"): executor_constants.KUBERNETES_EXECUTOR,
}
)
@patch(