This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 503b4670107 Add multi-team support for KubernetesExecutor (#61798)
503b4670107 is described below

commit 503b4670107533ffb604234cdcdae08c5918b0c7
Author: Vinod Bottu <[email protected]>
AuthorDate: Sat Mar 7 15:09:15 2026 -0600

    Add multi-team support for KubernetesExecutor (#61798)
    
    * Add multi-team support for KubernetesExecutor
    
    * Address review feedback and fix configuration fallback bug
    
    * use AIRFLOW_V_3_2_PLUS version guard and narrow exception types
    
    ---------
    
    Co-authored-by: Niko Oliveira <[email protected]>
---
 airflow-core/src/airflow/configuration.py          |   3 +
 .../providers/cncf/kubernetes/cli/definition.py    |   8 +-
 .../cncf/kubernetes/cli/kubernetes_command.py      |  13 +-
 .../kubernetes/executors/kubernetes_executor.py    |  27 +++-
 .../providers/cncf/kubernetes/kube_config.py       |  82 +++++++----
 .../cncf/kubernetes/template_rendering.py          |  20 ++-
 .../providers/cncf/kubernetes/version_compat.py    |   2 +
 .../executors/test_kubernetes_executor.py          | 153 +++++++++++++++++++++
 8 files changed, 269 insertions(+), 39 deletions(-)

diff --git a/airflow-core/src/airflow/configuration.py 
b/airflow-core/src/airflow/configuration.py
index 1f4fc07e4c0..556821d71fb 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -269,6 +269,9 @@ class AirflowConfigParser(_SharedAirflowConfigParser):
 
     def get_provider_config_fallback_defaults(self, section: str, key: str, 
**kwargs) -> Any:
         """Get provider config fallback default values."""
+        # Remove team_name from kwargs as the fallback defaults ConfigParser
+        # does not support team-aware lookups (it's a standard ConfigParser).
+        kwargs.pop("team_name", None)
         return self._provider_config_fallback_default_values.get(section, key, 
fallback=None, **kwargs)
 
     # A mapping of old default values that we want to change and warn the user
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
index 210187bf9ce..01300d1c4cd 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/definition.py
@@ -68,6 +68,12 @@ ARG_MIN_PENDING_MINUTES = Arg(
     ),
 )
 
+ARG_TEAM = Arg(
+    ("--team",),
+    default=None,
+    help="Team name for multi-team configuration. When set, team-specific 
config overrides are applied.",
+)
+
 # CLI Commands
 KUBERNETES_COMMANDS = (
     ActionCommand(
@@ -85,7 +91,7 @@ KUBERNETES_COMMANDS = (
         help="Generate YAML files for all tasks in DAG. Useful for debugging 
tasks without "
         "launching into a cluster",
         
func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"),
-        args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_COMPAT, ARG_OUTPUT_PATH, 
ARG_VERBOSE),
+        args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_COMPAT, ARG_OUTPUT_PATH, 
ARG_TEAM, ARG_VERBOSE),
     ),
 )
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
index 2b9a2cba9dc..0d0d15c57b1 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/cli/kubernetes_command.py
@@ -32,7 +32,11 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor import Kube
 from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
create_unique_id
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, 
generate_pod_command_args
-from airflow.providers.cncf.kubernetes.version_compat import 
AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS
+from airflow.providers.cncf.kubernetes.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 from airflow.utils import cli as cli_utils, yaml
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.types import DagRunType
@@ -72,7 +76,12 @@ def generate_pod_yaml(args):
         dr = DagRun(dag.dag_id, execution_date=logical_date)
         dr.run_id = DagRun.generate_run_id(run_type=DagRunType.MANUAL, 
execution_date=logical_date)
 
-    kube_config = KubeConfig()
+    executor_conf = None
+    if AIRFLOW_V_3_2_PLUS and args.team:
+        from airflow.executors.base_executor import ExecutorConf
+
+        executor_conf = ExecutorConf(team_name=args.team)
+    kube_config = KubeConfig(executor_conf=executor_conf)
 
     for task in dag.tasks:
         if AIRFLOW_V_3_0_PLUS:
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 3d7391a46c7..bb94c76c447 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -79,14 +79,29 @@ class KubernetesExecutor(BaseExecutor):
 
     RUNNING_POD_LOG_LINES = 100
     supports_ad_hoc_ti_run: bool = True
+    supports_multi_team: bool = True
 
     if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
         # In the v3 path, we store workloads, not commands as strings.
         # TODO: TaskSDK: move this type change into BaseExecutor
         queued_tasks: dict[TaskInstanceKey, workloads.All]  # type: 
ignore[assignment]
 
-    def __init__(self):
-        self.kube_config = KubeConfig()
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        # Check if self has the ExecutorConf set on the self.conf attribute 
with all required methods.
+        # In older Airflow versions, ExecutorConf exists but lacks methods 
like getint, getboolean, etc.
+        # In such cases, fall back to the global configuration object.
+        # This allows the changes to be backwards compatible with older 
versions of Airflow.
+        # Can be removed when minimum supported provider version is equal to 
the version of core airflow
+        # which introduces multi-team configuration (3.2+).
+        if not hasattr(self, "conf") or not hasattr(self.conf, "getint"):
+            self.conf = conf
+
+        self.kube_config = KubeConfig(executor_conf=self.conf)
+        # Override parallelism with team-aware config value
+        self.parallelism = self.kube_config.parallelism
+
         self._manager = multiprocessing.Manager()
         self.task_queue: Queue[KubernetesJob] = self._manager.Queue()
         self.result_queue: Queue[KubernetesResults] = self._manager.Queue()
@@ -96,11 +111,10 @@ class KubernetesExecutor(BaseExecutor):
         self.last_handled: dict[TaskInstanceKey, float] = {}
         self.kubernetes_queue: str | None = None
         self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
-        self.task_publish_max_retries = conf.getint(
+        self.task_publish_max_retries = self.conf.getint(
             "kubernetes_executor", "task_publish_max_retries", fallback=0
         )
         self.completed: set[KubernetesResults] = set()
-        super().__init__(parallelism=self.kube_config.parallelism)
 
     def _list_pods(self, query_kwargs):
         query_kwargs["header_params"] = {
@@ -453,14 +467,13 @@ class KubernetesExecutor(BaseExecutor):
 
         self.event_buffer[key] = state, termination_reason
 
-    @staticmethod
-    def _get_pod_namespace(ti: TaskInstance):
+    def _get_pod_namespace(self, ti: TaskInstance):
         pod_override = ti.executor_config.get("pod_override")
         namespace = None
         with suppress(Exception):
             if pod_override is not None:
                 namespace = pod_override.metadata.namespace
-        return namespace or conf.get("kubernetes_executor", "namespace")
+        return namespace or self.conf.get("kubernetes_executor", "namespace")
 
     def get_task_log(self, ti: TaskInstance, try_number: int) -> 
tuple[list[str], list[str]]:
         messages = []
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
index ef2a8c1ef0f..eaf02b455f8 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kube_config.py
@@ -16,71 +16,95 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException
 from airflow.settings import AIRFLOW_HOME
 
+if TYPE_CHECKING:
+    from airflow.executors.base_executor import ExecutorConf
+
 
 class KubeConfig:
-    """Configuration for Kubernetes."""
+    """
+    Configuration for Kubernetes.
+
+    :param executor_conf: Optional team-aware configuration object. If not 
provided,
+        falls back to the global configuration for backwards compatibility. 
This parameter
+        supports the multi-team feature introduced in AIP-67.
+    """
 
     core_section = "core"
     kubernetes_section = "kubernetes_executor"
     logging_section = "logging"
 
-    def __init__(self):
+    def __init__(self, executor_conf: ExecutorConf | None = None):
+        # Use the provided executor_conf for team-aware configuration, or fall 
back to global conf
+        # for backwards compatibility with older versions of Airflow.
+        self._conf = executor_conf if executor_conf is not None else conf
+
         configuration_dict = conf.as_dict(display_sensitive=True)
         self.core_configuration = configuration_dict[self.core_section]
         self.airflow_home = AIRFLOW_HOME
-        self.dags_folder = conf.get(self.core_section, "dags_folder")
-        self.parallelism = conf.getint(self.core_section, "parallelism")
-        self.pod_template_file = conf.get(self.kubernetes_section, 
"pod_template_file", fallback=None)
+        self.dags_folder = self._conf.get(self.core_section, "dags_folder")
+        self.parallelism = self._conf.getint(self.core_section, "parallelism")
+        self.pod_template_file = self._conf.get(self.kubernetes_section, 
"pod_template_file", fallback=None)
 
-        self.delete_worker_pods = conf.getboolean(self.kubernetes_section, 
"delete_worker_pods")
-        self.delete_worker_pods_on_failure = conf.getboolean(
+        self.delete_worker_pods = 
self._conf.getboolean(self.kubernetes_section, "delete_worker_pods")
+        self.delete_worker_pods_on_failure = self._conf.getboolean(
             self.kubernetes_section, "delete_worker_pods_on_failure"
         )
-        self.worker_pod_pending_fatal_container_state_reasons = []
-        if conf.get(self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+        self.worker_pod_pending_fatal_container_state_reasons: list[str] = []
+        fatal_reasons = self._conf.get(
+            self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons", fallback=""
+        )
+        if fatal_reasons:
             self.worker_pod_pending_fatal_container_state_reasons = [
-                r.strip()
-                for r in conf.get(
-                    self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons"
-                ).split(",")
+                r.strip() for r in fatal_reasons.split(",")
             ]
 
-        self.worker_pods_creation_batch_size = conf.getint(
+        self.worker_pods_creation_batch_size = self._conf.getint(
             self.kubernetes_section, "worker_pods_creation_batch_size"
         )
-        self.worker_container_repository = conf.get(self.kubernetes_section, 
"worker_container_repository")
-        self.worker_container_tag = conf.get(self.kubernetes_section, 
"worker_container_tag")
+        self.worker_container_repository = self._conf.get(
+            self.kubernetes_section, "worker_container_repository"
+        )
+        self.worker_container_tag = self._conf.get(self.kubernetes_section, 
"worker_container_tag")
 
         if self.worker_container_repository and self.worker_container_tag:
             self.kube_image = 
f"{self.worker_container_repository}:{self.worker_container_tag}"
         else:
-            self.kube_image = None
+            # Ignore needed because ExecutorConf.get() returns str | None (no 
overloads),
+            # so mypy infers kube_image as str from the f-string branch and 
rejects None here.
+            # This is a pre-existing type inconsistency: kube_image can be 
None at runtime,
+            # but PodGenerator.construct_pod() declares kube_image: str. 
Passing None is
+            # intentional — the K8s client omits None fields, whereas "" would 
serialize
+            # as 'image': '' in the pod spec.
+            self.kube_image = None  # type: ignore[assignment]
 
         # The Kubernetes Namespace in which the Scheduler and Webserver 
reside. Note
         # that if your
         # cluster has RBAC enabled, your scheduler may need service account 
permissions to
         # create, watch, get, and delete pods in this namespace.
-        self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
-        self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, 
"multi_namespace_mode")
-        if self.multi_namespace_mode and conf.get(
-            self.kubernetes_section, "multi_namespace_mode_namespace_list"
-        ):
-            self.multi_namespace_mode_namespace_list = conf.get(
-                self.kubernetes_section, "multi_namespace_mode_namespace_list"
-            ).split(",")
+        self.kube_namespace = self._conf.get(self.kubernetes_section, 
"namespace")
+        self.multi_namespace_mode = 
self._conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+        multi_ns_list = self._conf.get(
+            self.kubernetes_section, "multi_namespace_mode_namespace_list", 
fallback=""
+        )
+        if self.multi_namespace_mode and multi_ns_list:
+            self.multi_namespace_mode_namespace_list: list[str] | None = 
multi_ns_list.split(",")
         else:
             self.multi_namespace_mode_namespace_list = None
         # The Kubernetes Namespace in which pods will be created by the 
executor. Note
         # that if your
         # cluster has RBAC enabled, your workers may need service account 
permissions to
         # interact with cluster components.
-        self.executor_namespace = conf.get(self.kubernetes_section, 
"namespace")
+        self.executor_namespace: str = (
+            self._conf.get(self.kubernetes_section, "namespace", 
fallback="default") or "default"
+        )
 
-        self.kube_client_request_args = conf.getjson(
+        self.kube_client_request_args = self._conf.getjson(
             self.kubernetes_section, "kube_client_request_args", fallback={}
         )
         if not isinstance(self.kube_client_request_args, dict):
@@ -95,7 +119,9 @@ class KubeConfig:
                 self.kube_client_request_args["_request_timeout"] = tuple(
                     self.kube_client_request_args["_request_timeout"]
                 )
-        self.delete_option_kwargs = conf.getjson(self.kubernetes_section, 
"delete_option_kwargs", fallback={})
+        self.delete_option_kwargs = self._conf.getjson(
+            self.kubernetes_section, "delete_option_kwargs", fallback={}
+        )
         if not isinstance(self.delete_option_kwargs, dict):
             raise AirflowConfigException(
                 f"[{self.kubernetes_section}] 'delete_option_kwargs' expected 
a JSON dict, got "
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
index bf3f47b7827..b80c5b1eb00 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
@@ -32,9 +32,27 @@ if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
 
 
+def _get_executor_conf(dag_id: str):
+    """Build a team-aware ExecutorConf for the given dag_id, if multi-team is 
available."""
+    try:
+        from airflow.configuration import conf
+        from airflow.executors.base_executor import ExecutorConf
+        from airflow.models.dag import DagModel
+
+        if not conf.getboolean("core", "multi_team", fallback=False):
+            return None
+        team_name = DagModel.get_team_name(dag_id)
+        if team_name:
+            return ExecutorConf(team_name=team_name)
+    except (ImportError, AttributeError):
+        pass
+    return None
+
+
 def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None:
     """Render k8s pod yaml."""
-    kube_config = KubeConfig()
+    executor_conf = _get_executor_conf(task_instance.dag_id)
+    kube_config = KubeConfig(executor_conf=executor_conf)
     if task_instance.executor_config and 
task_instance.executor_config.get("pod_template_file"):
         # If a specific pod_template_file was passed to the executor, we make
         # sure to render the k8s pod spec using this one, and not the default 
one.
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 2fb2ac93a12..7751da07081 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -34,9 +34,11 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS = get_base_airflow_version_tuple() >= (3, 1, 0)
+AIRFLOW_V_3_2_PLUS = get_base_airflow_version_tuple() >= (3, 2, 0)
 
 
 __all__ = [
     "AIRFLOW_V_3_0_PLUS",
     "AIRFLOW_V_3_1_PLUS",
+    "AIRFLOW_V_3_2_PLUS",
 ]
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index f8b4a59356f..86cfdedc49b 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1932,3 +1932,156 @@ class TestKubernetesJobWatcher:
             executor = KubernetesExecutor()
 
         assert 
executor.kube_config.worker_pod_pending_fatal_container_state_reasons == 
expected_result
+
+
+class TestKubernetesExecutorMultiTeam:
+    """Tests for AIP-67 multi-team support in KubernetesExecutor."""
+
+    def test_supports_multi_team(self):
+        """Test that KubernetesExecutor declares multi-team support."""
+        assert KubernetesExecutor.supports_multi_team is True
+
+    def test_global_executor_without_team_name(self):
+        """Test that global executor (no team) works correctly with backwards 
compatibility."""
+        executor = KubernetesExecutor()
+
+        # Verify executor has conf
+        assert hasattr(executor, "conf")
+        # On older Airflow versions, conf is the global AirflowConfigParser 
(no team_name attr).
+        # On newer versions, conf is an ExecutorConf with team_name=None.
+        assert getattr(executor.conf, "team_name", None) is None
+
+        # Verify KubeConfig was created with the executor's conf
+        assert executor.kube_config is not None
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_executor_with_team_name(self):
+        """Test that executor created with a team_name has team-specific 
conf."""
+        executor = KubernetesExecutor(team_name="ml_team")
+
+        assert executor.conf.team_name == "ml_team"
+        assert executor.team_name == "ml_team"
+        assert executor.kube_config is not None
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_multiple_team_executors_isolation(self, monkeypatch):
+        """Test that multiple team executors can coexist with isolated 
resources."""
+        
monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE",
 "4")
+        
monkeypatch.setenv("AIRFLOW__TEAM_B___KUBERNETES_EXECUTOR__WORKER_PODS_CREATION_BATCH_SIZE",
 "8")
+
+        team_a_executor = KubernetesExecutor(team_name="team_a")
+        team_b_executor = KubernetesExecutor(team_name="team_b")
+
+        try:
+            assert team_a_executor.task_queue is not team_b_executor.task_queue
+            assert team_a_executor.result_queue is not 
team_b_executor.result_queue
+            assert team_a_executor.running is not team_b_executor.running
+            assert team_a_executor.queued_tasks is not 
team_b_executor.queued_tasks
+
+            assert team_a_executor.conf.team_name == "team_a"
+            assert team_b_executor.conf.team_name == "team_b"
+
+            assert team_a_executor.kube_config.worker_pods_creation_batch_size 
== 4
+            assert team_b_executor.kube_config.worker_pods_creation_batch_size 
== 8
+
+        finally:
+            team_a_executor.end()
+            team_b_executor.end()
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_team_specific_kube_config(self, monkeypatch):
+        """Test that KubeConfig uses team-specific configuration when provided 
via env vars."""
+        # Set team-specific namespace via environment variable
+        monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__NAMESPACE", 
"team-a-namespace")
+        monkeypatch.setenv(
+            "AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__DELETE_WORKER_PODS",
+            "false",
+        )
+
+        team_a_executor = KubernetesExecutor(team_name="team_a")
+
+        try:
+            # Verify the KubeConfig picked up the team-specific namespace
+            assert team_a_executor.kube_config.kube_namespace == 
"team-a-namespace"
+            assert team_a_executor.kube_config.delete_worker_pods is False
+        finally:
+            team_a_executor.end()
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_team_and_global_config_isolation(self, monkeypatch):
+        """Test that team-specific and global executors use correct 
configurations side-by-side."""
+        global_namespace = "default"
+        team_namespace = "team-ml-namespace"
+
+        # Set up global configuration
+        config_overrides = {
+            ("kubernetes_executor", "namespace"): global_namespace,
+        }
+
+        # Set up team-specific config via environment variable
+        
monkeypatch.setenv("AIRFLOW__ML_TEAM___KUBERNETES_EXECUTOR__NAMESPACE", 
team_namespace)
+
+        with conf_vars(config_overrides):
+            # Create team-specific executor
+            team_executor = KubernetesExecutor(team_name="ml_team")
+
+            # Create global executor (no team)
+            global_executor = KubernetesExecutor()
+
+            try:
+                # Verify team-specific namespace was used
+                assert team_executor.kube_config.kube_namespace == 
team_namespace
+
+                # Verify global namespace was used for global executor
+                assert global_executor.kube_config.kube_namespace == 
global_namespace
+            finally:
+                team_executor.end()
+                global_executor.end()
+
+    def test_kube_config_fallback_to_global_conf(self):
+        """Test that KubeConfig falls back to global conf when no 
executor_conf is provided."""
+        from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
+
+        # KubeConfig without executor_conf should fall back to global 
configuration
+        kube_config = KubeConfig()
+
+        # Should still have valid configuration from global defaults
+        assert kube_config.kube_namespace is not None
+        assert kube_config.airflow_home is not None
+
+    def test_executor_conf_passed_to_kube_config(self):
+        """Test that the executor's conf is passed through to KubeConfig."""
+        executor = KubernetesExecutor()
+
+        # The executor should pass its conf to KubeConfig
+        assert executor.kube_config._conf is executor.conf
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_task_publish_max_retries_uses_team_conf(self, monkeypatch):
+        """Test that task_publish_max_retries reads from team-specific conf."""
+        
monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__TASK_PUBLISH_MAX_RETRIES",
 "5")
+
+        executor = KubernetesExecutor(team_name="team_a")
+
+        try:
+            assert executor.task_publish_max_retries == 5
+        finally:
+            executor.end()
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_2_PLUS, reason="Multi-team requires 
Airflow 3.2+")
+    def test_get_pod_namespace_uses_instance_conf(self, monkeypatch):
+        """Test that _get_pod_namespace uses self.conf instead of global 
conf."""
+        monkeypatch.setenv("AIRFLOW__TEAM_A___KUBERNETES_EXECUTOR__NAMESPACE", 
"team-a-ns")
+
+        executor = KubernetesExecutor(team_name="team_a")
+
+        try:
+            mock_ti = mock.MagicMock()
+            mock_ti.executor_config = {}
+
+            namespace = executor._get_pod_namespace(mock_ti)
+
+            # Should return the team-specific namespace from the executor's 
conf
+            assert namespace == "team-a-ns"
+        finally:
+            executor.end()

Reply via email to