This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6afab7b3c84 AIP-67 - Multi-team: Verify dag/task executors are present 
in team (#55973)
6afab7b3c84 is described below

commit 6afab7b3c8491505e1a8667d3b8911c9e70693a6
Author: Niko Oliveira <[email protected]>
AuthorDate: Thu Oct 16 15:48:06 2025 -0700

    AIP-67 - Multi-team: Verify dag/task executors are present in team (#55973)
    
    * Verify dag/task executors are present in team
    
    If a dag or task is configured to use a specific executor (leveraging
    Multiple Executor Configuration) and the dag belongs to a team, then
    validate that the executor is present in that team before adding the dag
    to the dag bag.
    
    * Add multi_team config gate
---
 airflow-core/src/airflow/dag_processing/dagbag.py  | 32 ++++++-
 .../tests/unit/dag_processing/test_dagbag.py       | 99 ++++++++++++++++++++++
 2 files changed, 130 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py 
b/airflow-core/src/airflow/dag_processing/dagbag.py
index ec6f739ac19..d1ff5ee803e 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -135,12 +135,42 @@ def timeout(seconds=1, error_message="Timeout"):
 
 
 def _validate_executor_fields(dag: DAG) -> None:
+    """Validate that executors specified in tasks are available and owned by 
the same team as the dag bundle."""
+    import logging
+
+    log = logging.getLogger(__name__)
+    dag_team_name = None
+
+    # Check if multi team is available by reading the multi_team configuration 
(which is boolean)
+    if conf.getboolean("core", "multi_team"):
+        # Get team name from bundle configuration if available
+        if hasattr(dag, "bundle_name") and dag.bundle_name:
+            from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+
+            bundle_manager = DagBundlesManager()
+            bundle_config = bundle_manager._bundle_config[dag.bundle_name]
+
+            dag_team_name = bundle_config.team_name
+            if dag_team_name:
+                log.debug(
+                    "Found team '%s' for DAG '%s' via bundle '%s'", 
dag_team_name, dag.dag_id, dag.bundle_name
+                )
+
     for task in dag.tasks:
         if not task.executor:
             continue
         try:
-            ExecutorLoader.lookup_executor_name_by_str(task.executor)
+            # Validate that the executor exists and is available for the DAG's 
team
+            ExecutorLoader.lookup_executor_name_by_str(task.executor, 
team_name=dag_team_name)
         except UnknownExecutorException:
+            if dag_team_name:
+                raise UnknownExecutorException(
+                    f"Task '{task.task_id}' specifies executor 
'{task.executor}', which is not available "
+                    f"for team '{dag_team_name}' (the team associated with DAG 
'{dag.dag_id}'). "
+                    f"Make sure '{task.executor}' is configured for team 
'{dag_team_name}' in your "
+                    "[core] executors configuration, or update the task's 
executor to use one of the "
+                    f"configured executors for team '{dag_team_name}'."
+                )
             raise UnknownExecutorException(
                 f"Task '{task.task_id}' specifies executor '{task.executor}', 
which is not available. "
                 "Make sure it is listed in your [core] executors 
configuration, or update the task's "
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py 
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index b423e79bd92..d0703579681 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -62,6 +62,105 @@ PY313 = sys.version_info >= (3, 13)
 INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = "def something():\n    return 
airflow_DAG\nsomething()"
 
 
[email protected]
+def mock_dag_bundle_manager():
+    """Fixture to create a mock DAG bundle manager with team configuration."""
+    from airflow.dag_processing.bundles.base import BaseDagBundle
+    from airflow.dag_processing.bundles.manager import _InternalBundleConfig
+
+    class MockDagBundle(BaseDagBundle):
+        @property
+        def path(self):
+            return None
+
+        def get_current_version(self):
+            return "1.0.0"
+
+        def refresh(self):
+            pass
+
+    @contextlib.contextmanager
+    def _create_bundle_manager(bundle_name="test_bundle", 
team_name="test_team"):
+        mock_bundle_config = _InternalBundleConfig(bundle_class=MockDagBundle, 
kwargs={}, team_name=team_name)
+
+        bundle_config = {bundle_name: mock_bundle_config}
+
+        with patch("airflow.dag_processing.bundles.manager.DagBundlesManager") 
as mock_manager_class:
+            mock_manager = mock_manager_class.return_value
+            mock_manager._bundle_config = bundle_config
+            yield mock_manager
+
+    return _create_bundle_manager
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_with_team_restriction(mock_executor_lookup, 
mock_dag_bundle_manager):
+    """Test executor validation with team-based restrictions using bundle 
configuration."""
+    with mock_dag_bundle_manager():
+        mock_executor_lookup.side_effect = UnknownExecutorException("Executor 
not available for team")
+
+        with DAG("test-dag", schedule=None) as dag:
+            # Simulate DAG having bundle_name set during parsing
+            dag.bundle_name = "test_bundle"
+            BaseOperator(task_id="t1", executor="team.restricted.executor")
+
+        with pytest.raises(
+            UnknownExecutorException,
+            match=re.escape(
+                "Task 't1' specifies executor 'team.restricted.executor', 
which is not available "
+                "for team 'test_team' (the team associated with DAG 
'test-dag'). "
+                "Make sure 'team.restricted.executor' is configured for team 
'test_team' in your "
+                "[core] executors configuration, or update the task's executor 
to use one of the "
+                "configured executors for team 'test_team'."
+            ),
+        ):
+            with conf_vars({("core", "multi_team"): "True"}):
+                _validate_executor_fields(dag)
+
+        # Verify the executor lookup was called with the team name from config
+        mock_executor_lookup.assert_called_with("team.restricted.executor", 
team_name="test_team")
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_no_team_associated(mock_executor_lookup):
+    """Test executor validation when no team is associated with DAG (no 
bundle_name)."""
+    mock_executor_lookup.side_effect = UnknownExecutorException("Unknown 
executor")
+
+    with DAG("test-dag", schedule=None) as dag:
+        # No bundle_name attribute, so no team will be found, see final 
assertion below
+        BaseOperator(task_id="t1", executor="unknown.executor")
+
+    with pytest.raises(
+        UnknownExecutorException,
+        match=re.escape(
+            "Task 't1' specifies executor 'unknown.executor', which is not 
available. "
+            "Make sure it is listed in your [core] executors configuration, or 
update the task's "
+            "executor to use one of the configured executors."
+        ),
+    ):
+        with conf_vars({("core", "multi_team"): "True"}):
+            _validate_executor_fields(dag)
+
+    mock_executor_lookup.assert_called_with("unknown.executor", team_name=None)
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_valid_team_executor(mock_executor_lookup, 
mock_dag_bundle_manager):
+    """Test executor validation when executor is valid for the DAG's team 
(happy path)."""
+    with mock_dag_bundle_manager():
+        mock_executor_lookup.return_value = None
+
+        with DAG("test-dag", schedule=None) as dag:
+            dag.bundle_name = "test_bundle"
+            BaseOperator(task_id="t1", executor="team.valid.executor")
+
+        with conf_vars({("core", "multi_team"): "True"}):
+            _validate_executor_fields(dag)
+
+        # Verify the executor lookup was called with the team name which is 
fetched from bundle config
+        mock_executor_lookup.assert_called_with("team.valid.executor", 
team_name="test_team")
+
+
 def db_clean_up():
     db.clear_db_dags()
     db.clear_db_runs()

Reply via email to