This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 503b4670107 Add multi-team support for KubernetesExecutor (#61798)
503b4670107 is described below
commit 503b4670107533ffb604234cdcdae08c5918b0c7
Author: Vinod Bottu <[email protected]>
AuthorDate: Sat Mar 7 15:09:15 2026 -0600
Add multi-team support for KubernetesExecutor (#61798)
* Add multi-team support for KubernetesExecutor
* Address review feedback and fix configuration fallback bug
* use AIRFLOW_V_3_2_PLUS version guard and narrow exception types
---------
Co-authored-by: Niko Oliveira <[email protected]>
---
airflow-core/src/airflow/configuration.py | 3 +
.../providers/cncf/kubernetes/cli/definition.py | 8 +-
.../cncf/kubernetes/cli/kubernetes_command.py | 13 +-
.../kubernetes/executors/kubernetes_executor.py | 27 +++-
.../providers/cncf/kubernetes/kube_config.py | 82 +++++++----
.../cncf/kubernetes/template_rendering.py | 20 ++-
.../providers/cncf/kubernetes/version_compat.py | 2 +
.../executors/test_kubernetes_executor.py | 153 +++++++++++++++++++++
8 files changed, 269 insertions(+), 39 deletions(-)
diff --git a/airflow-core/src/airflow/configuration.py
b/airflow-core/src/airflow/configuration.py
index 1f4fc07e4c0..556821d71fb 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -269,6 +269,9 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
def get_provider_config_fallback_defaults(self, section: str, key: str,
**kwargs) -> Any:
"""Get provider config fallback default values."""
+ # Remove team_name from kwargs as the fallback defaults ConfigParser
+ # does not support team-aware lookups (it's a standard ConfigParser).
+ kwargs.pop("team_name", None)
return self._provider_config_fallback_default_values.get(section, key,
fallback=None, **kwargs)
# A mapping of old default values that we want to change and warn the user
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
index 210187bf9ce..01300d1c4cd 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
@@ -68,6 +68,12 @@ ARG_MIN_PENDING_MINUTES = Arg(
),
)
+ARG_TEAM = Arg(
+ ("--team",),
+ default=None,
+ help="Team name for multi-team configuration. When set, team-specific
config overrides are applied.",
+)
+
# CLI Commands
KUBERNETES_COMMANDS = (
ActionCommand(
@@ -85,7 +91,7 @@ KUBERNETES_COMMANDS = (
help="Generate YAML files for all tasks in DAG. Useful for debugging
tasks without "
"launching into a cluster",
func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"),
- args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_COMPAT, ARG_OUTPUT_PATH,
ARG_VERBOSE),
+ args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_COMPAT, ARG_OUTPUT_PATH,
ARG_TEAM, ARG_VERBOSE),
),
)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
index 2b9a2cba9dc..0d0d15c57b1 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
@@ -32,7 +32,11 @@ from
airflow.providers.cncf.kubernetes.executors.kubernetes_executor import Kube
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator,
generate_pod_command_args
-from airflow.providers.cncf.kubernetes.version_compat import
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+ AIRFLOW_V_3_0_PLUS,
+ AIRFLOW_V_3_1_PLUS,
+ AIRFLOW_V_3_2_PLUS,
+)
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.types import DagRunType
@@ -72,7 +76,12 @@ def generate_pod_yaml(args):
dr = DagRun(dag.dag_id, execution_date=logical_date)
dr.run_id = DagRun.generate_run_id(run_type=DagRunType.MANUAL,
execution_date=logical_date)
- kube_config = KubeConfig()
+ executor_conf = None
+ if AIRFLOW_V_3_2_PLUS and args.team:
+ from airflow.executors.base_executor import ExecutorConf
+
+ executor_conf = ExecutorConf(team_name=args.team)
+ kube_config = KubeConfig(executor_conf=executor_conf)
for task in dag.tasks:
if AIRFLOW_V_3_0_PLUS:
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 3d7391a46c7..bb94c76c447 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -79,14 +79,29 @@ class KubernetesExecutor(BaseExecutor):
RUNNING_POD_LOG_LINES = 100
supports_ad_hoc_ti_run: bool = True
+ supports_multi_team: bool = True
if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
# In the v3 path, we store workloads, not commands as strings.
# TODO: TaskSDK: move this type change into BaseExecutor
queued_tasks: dict[TaskInstanceKey, workloads.All] # type:
ignore[assignment]
- def __init__(self):
- self.kube_config = KubeConfig()
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ # Check if self has the ExecutorConf set on the self.conf attribute
with all required methods.
+ # In older Airflow versions, ExecutorConf exists but lacks methods
like getint, getboolean, etc.
+ # In such cases, fall back to the global configuration object.
+ # This allows the changes to be backwards compatible with older
versions of Airflow.
+ # Can be removed when minimum supported provider version is equal to
the version of core airflow
+ # which introduces multi-team configuration (3.2+).
+ if not hasattr(self, "conf") or not hasattr(self.conf, "getint"):
+ self.conf = conf
+
+ self.kube_config = KubeConfig(executor_conf=self.conf)
+ # Override parallelism with team-aware config value
+ self.parallelism = self.kube_config.parallelism
+
self._manager = multiprocessing.Manager()
self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
@@ -96,11 +111,10 @@ class KubernetesExecutor(BaseExecutor):
self.last_handled: dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: str | None = None
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
- self.task_publish_max_retries = conf.getint(
+ self.task_publish_max_retries = self.conf.getint(
"kubernetes_executor", "task_publish_max_retries", fallback=0
)
self.completed: set[KubernetesResults] = set()
- super().__init__(parallelism=self.kube_config.parallelism)
def _list_pods(self, query_kwargs):
query_kwargs["header_params"] = {
@@ -453,14 +467,13 @@ class KubernetesExecutor(BaseExecutor):
self.event_buffer[key] = state, termination_reason
- @staticmethod
- def _get_pod_namespace(ti: TaskInstance):
+ def _get_pod_namespace(self, ti: TaskInstance):
pod_override = ti.executor_config.get("pod_override")
namespace = None
with suppress(Exception):
if pod_override is not None:
namespace = pod_override.metadata.namespace
- return namespace or conf.get("kubernetes_executor", "namespace")
+ return namespace or self.conf.get("kubernetes_executor", "namespace")
def get_task_log(self, ti: TaskInstance, try_number: int) ->
tuple[list[str], list[str]]:
messages = []
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
index ef2a8c1ef0f..eaf02b455f8 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
@@ -16,71 +16,95 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.settings import AIRFLOW_HOME
+if TYPE_CHECKING:
+ from airflow.executors.base_executor import ExecutorConf
+
class KubeConfig:
- """Configuration for Kubernetes."""
+ """
+ Configuration for Kubernetes.
+
+ :param executor_conf: Optional team-aware configuration object. If not
provided,
+ falls back to the global configuration for backwards compatibility.
This parameter
+ supports the multi-team feature introduced in AIP-67.
+ """
core_section = "core"
kubernetes_section = "kubernetes_executor"
logging_section = "logging"
- def __init__(self):
+ def __init__(self, executor_conf: ExecutorConf | None = None):
+ # Use the provided executor_conf for team-aware configuration, or fall
back to global conf
+ # for backwards compatibility with older versions of Airflow.
+ self._conf = executor_conf if executor_conf is not None else conf
+
configuration_dict = conf.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict[self.core_section]
self.airflow_home = AIRFLOW_HOME
- self.dags_folder = conf.get(self.core_section, "dags_folder")
- self.parallelism = conf.getint(self.core_section, "parallelism")
- self.pod_template_file = conf.get(self.kubernetes_section,
"pod_template_file", fallback=None)
+ self.dags_folder = self._conf.get(self.core_section, "dags_folder")
+ self.parallelism = self._conf.getint(self.core_section, "parallelism")
+ self.pod_template_file = self._conf.get(self.kubernetes_section,
"pod_template_file", fallback=None)
- self.delete_worker_pods = conf.getboolean(self.kubernetes_section,
"delete_worker_pods")
- self.delete_worker_pods_on_failure = conf.getboolean(
+ self.delete_worker_pods =
self._conf.getboolean(self.kubernetes_section, "delete_worker_pods")
+ self.delete_worker_pods_on_failure = self._conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
- self.worker_pod_pending_fatal_container_state_reasons = []
- if conf.get(self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+ self.worker_pod_pending_fatal_container_state_reasons: list[str] = []
+ fatal_reasons = self._conf.get(
+ self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons", fallback=""
+ )
+ if fatal_reasons:
self.worker_pod_pending_fatal_container_state_reasons = [
- r.strip()
- for r in conf.get(
- self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons"
- ).split(",")
+ r.strip() for r in fatal_reasons.split(",")
]
- self.worker_pods_creation_batch_size = conf.getint(
+ self.worker_pods_creation_batch_size = self._conf.getint(
self.kubernetes_section, "worker_pods_creation_batch_size"
)
- self.worker_container_repository = conf.get(self.kubernetes_section,
"worker_container_repository")
- self.worker_container_tag = conf.get(self.kubernetes_section,
"worker_container_tag")
+ self.worker_container_repository = self._conf.get(
+ self.kubernetes_section, "worker_container_repository"
+ )
+ self.worker_container_tag = self._conf.get(self.kubernetes_section,
"worker_container_tag")
if self.worker_container_repository and self.worker_container_tag:
self.kube_image =
f"{self.worker_container_repository}:{self.worker_container_tag}"
else:
- self.kube_image = None
+ # Ignore needed because ExecutorConf.get() returns str | None (no
overloads),
+ # so mypy infers kube_image as str from the f-string branch and
rejects None here.
+ # This is a pre-existing type inconsistency: kube_image can be
None at runtime,
+ # but PodGenerator.construct_pod() declares kube_image: str.
Passing None is
+ # intentional — the K8s client omits None fields, whereas "" would
serialize
+ # as 'image': '' in the pod spec.
+ self.kube_image = None # type: ignore[assignment]
# The Kubernetes Namespace in which the Scheduler and Webserver
reside. Note
# that if your
# cluster has RBAC enabled, your scheduler may need service account
permissions to
# create, watch, get, and delete pods in this namespace.
- self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
- self.multi_namespace_mode = conf.getboolean(self.kubernetes_section,
"multi_namespace_mode")
- if self.multi_namespace_mode and conf.get(
- self.kubernetes_section, "multi_namespace_mode_namespace_list"
- ):
- self.multi_namespace_mode_namespace_list = conf.get(
- self.kubernetes_section, "multi_namespace_mode_namespace_list"
- ).split(",")
+ self.kube_namespace = self._conf.get(self.kubernetes_section,
"namespace")
+ self.multi_namespace_mode =
self._conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+ multi_ns_list = self._conf.get(
+ self.kubernetes_section, "multi_namespace_mode_namespace_list",
fallback=""
+ )
+ if self.multi_namespace_mode and multi_ns_list:
+ self.multi_namespace_mode_namespace_list: list[str] | None =
multi_ns_list.split(",")
else:
self.multi_namespace_mode_namespace_list = None
# The Kubernetes Namespace in which pods will be created by the
executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account
permissions to
# interact with cluster components.
- self.executor_namespace = conf.get(self.kubernetes_section,
"namespace")
+ self.executor_namespace: str = (
+ self._conf.get(self.kubernetes_section, "namespace",
fallback="default") or "default"
+ )
- self.kube_client_request_args = conf.getjson(
+ self.kube_client_request_args = self._conf.getjson(
self.kubernetes_section, "kube_client_request_args", fallback={}
)
if not isinstance(self.kube_client_request_args, dict):
@@ -95,7 +119,9 @@ class KubeConfig:
self.kube_client_request_args["_request_timeout"] = tuple(
self.kube_client_request_args["_request_timeout"]
)
- self.delete_option_kwargs = conf.getjson(self.kubernetes_section,
"delete_option_kwargs", fallback={})
+ self.delete_option_kwargs = self._conf.getjson(
+ self.kubernetes_section, "delete_option_kwargs", fallback={}
+ )
if not isinstance(self.delete_option_kwargs, dict):
raise AirflowConfigException(
f"[{self.kubernetes_section}] 'delete_option_kwargs' expected
a JSON dict, got "
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
index bf3f47b7827..b80c5b1eb00 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
@@ -32,9 +32,27 @@ if TYPE_CHECKING:
from airflow.models.taskinstance import TaskInstance
+def _get_executor_conf(dag_id: str):
+ """Build a team-aware ExecutorConf for the given dag_id, if multi-team is
available."""
+ try:
+ from airflow.configuration import conf
+ from airflow.executors.base_executor import ExecutorConf
+ from airflow.models.dag import DagModel
+
+ if not conf.getboolean("core", "multi_team", fallback=False):
+ return None
+ team_name = DagModel.get_team_name(dag_id)
+ if team_name:
+ return ExecutorConf(team_name=team_name)
+ except (ImportError, AttributeError):
+ pass
+ return None
+
+
def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None:
"""Render k8s pod yaml."""
- kube_config = KubeConfig()
+ executor_conf = _get_executor_conf(task_instance.dag_id)
+ kube_config = KubeConfig(executor_conf=executor_conf)
if task_instance.executor_config and
task_instance.executor_config.get("pod_template_file"):
# If a specific pod_template_file was passed to the executor, we make
# sure to render the k8s pod spec using this one, and not the default
one.
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 2fb2ac93a12..7751da07081 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -34,9 +34,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
+AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
+ "AIRFLOW_V_3_2_PLUS",
]
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index f8b4a59356f..86cfdedc49b 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1932,3 +1932,156 @@ class TestKubernetesJobWatcher:
executor = KubernetesExecutor()
assert
executor.kube_config.worker_pod_pending_fatal_container_state_reasons ==
expected_result
+
+
+class TestKubernetesExecutorMultiTeam:
+ """Tests for AIP-67 multi-team support in KubernetesExecutor."""
+
+ def test_supports_multi_team(self):
+ """Test that KubernetesExecutor declares multi-team support."""
+ assert KubernetesExecutor.supports_multi_team is True
+
+ def test_global_executor_without_team_name(self):
+ """Test that global executor (no team) works correctly with backwards
compatibility."""
+ executor = KubernetesExecutor()
+
+ # Verify executor has conf
+ assert hasattr(executor, "conf")
+ # On older Airflow versions, conf is the global AirflowConfigParser
(no team_name attr).
+ # On newer versions, conf is an ExecutorConf with team_name=None.
+ assert getattr(executor.conf, "team_name", None) is None
+
+ # Verify KubeConfig was created with the executor's conf
+ assert executor.kube_config is not None
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_executor_with_team_name(self):
+ """Test that executor created with a team_name has team-specific
conf."""
+ executor = KubernetesExecutor(team_name="ml_team")
+
+ assert executor.conf.team_name == "ml_team"
+ assert executor.team_name == "ml_team"
+ assert executor.kube_config is not None
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_multiple_team_executors_isolation(self, monkeypatch):
+ """Test that multiple team executors can coexist with isolated
resources."""
+
monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE",
"4")
+
monkeypatch.setenv("AIRFLOW__TEAM_B___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE",
"8")
+
+ team_a_executor = KubernetesExecutor(team_name="team_a")
+ team_b_executor = KubernetesExecutor(team_name="team_b")
+
+ try:
+ assert team_a_executor.task_queue is not team_b_executor.task_queue
+ assert team_a_executor.result_queue is not
team_b_executor.result_queue
+ assert team_a_executor.running is not team_b_executor.running
+ assert team_a_executor.queued_tasks is not
team_b_executor.queued_tasks
+
+ assert team_a_executor.conf.team_name == "team_a"
+ assert team_b_executor.conf.team_name == "team_b"
+
+ assert team_a_executor.kube_config.worker_pods_creation_batch_size
== 4
+ assert team_b_executor.kube_config.worker_pods_creation_batch_size
== 8
+
+ finally:
+ team_a_executor.end()
+ team_b_executor.end()
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_team_specific_kube_config(self, monkeypatch):
+ """Test that KubeConfig uses team-specific configuration when provided
via env vars."""
+ # Set team-specific namespace via environment variable
+ monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__NAMESPACE",
"team-a-namespace")
+ monkeypatch.setenv(
+ "AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__DELETE_WORKER_PODS",
+ "false",
+ )
+
+ team_a_executor = KubernetesExecutor(team_name="team_a")
+
+ try:
+ # Verify the KubeConfig picked up the team-specific namespace
+ assert team_a_executor.kube_config.kube_namespace ==
"team-a-namespace"
+ assert team_a_executor.kube_config.delete_worker_pods is False
+ finally:
+ team_a_executor.end()
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_team_and_global_config_isolation(self, monkeypatch):
+ """Test that team-specific and global executors use correct
configurations side-by-side."""
+ global_namespace = "default"
+ team_namespace = "team-ml-namespace"
+
+ # Set up global configuration
+ config_overrides = {
+ ("kubernetes_executor", "namespace"): global_namespace,
+ }
+
+ # Set up team-specific config via environment variable
+
monkeypatch.setenv("AIRFLOW__ML_TEAM___KUBERNETES_EXECUTOR__NAMESPACE",
team_namespace)
+
+ with conf_vars(config_overrides):
+ # Create team-specific executor
+ team_executor = KubernetesExecutor(team_name="ml_team")
+
+ # Create global executor (no team)
+ global_executor = KubernetesExecutor()
+
+ try:
+ # Verify team-specific namespace was used
+ assert team_executor.kube_config.kube_namespace ==
team_namespace
+
+ # Verify global namespace was used for global executor
+ assert global_executor.kube_config.kube_namespace ==
global_namespace
+ finally:
+ team_executor.end()
+ global_executor.end()
+
+ def test_kube_config_fallback_to_global_conf(self):
+ """Test that KubeConfig falls back to global conf when no
executor_conf is provided."""
+ from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
+
+ # KubeConfig without executor_conf should fall back to global
configuration
+ kube_config = KubeConfig()
+
+ # Should still have valid configuration from global defaults
+ assert kube_config.kube_namespace is not None
+ assert kube_config.airflow_home is not None
+
+ def test_executor_conf_passed_to_kube_config(self):
+ """Test that the executor's conf is passed through to KubeConfig."""
+ executor = KubernetesExecutor()
+
+ # The executor should pass its conf to KubeConfig
+ assert executor.kube_config._conf is executor.conf
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_task_publish_max_retries_uses_team_conf(self, monkeypatch):
+ """Test that task_publish_max_retries reads from team-specific conf."""
+
monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__TASK_PUBLISH_MAX_RETRIES",
"5")
+
+ executor = KubernetesExecutor(team_name="team_a")
+
+ try:
+ assert executor.task_publish_max_retries == 5
+ finally:
+ executor.end()
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires
Airflow 3.2+")
+ def test_get_pod_namespace_uses_instance_conf(self, monkeypatch):
+ """Test that _get_pod_namespace uses self.conf instead of global
conf."""
+ monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__NAMESPACE",
"team-a-ns")
+
+ executor = KubernetesExecutor(team_name="team_a")
+
+ try:
+ mock_ti = mock.MagicMock()
+ mock_ti.executor_config = {}
+
+ namespace = executor._get_pod_namespace(mock_ti)
+
+ # Should return the team-specific namespace from the executor's
conf
+ assert namespace == "team-a-ns"
+ finally:
+ executor.end()