This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6afab7b3c84 AIP-67 - Multi-team: Verify dag/task executors are present
in team (#55973)
6afab7b3c84 is described below
commit 6afab7b3c8491505e1a8667d3b8911c9e70693a6
Author: Niko Oliveira <[email protected]>
AuthorDate: Thu Oct 16 15:48:06 2025 -0700
AIP-67 - Multi-team: Verify dag/task executors are present in team (#55973)
* Verify dag/task executors are present in team
If a dag or task is configured to use a specific executor (leveraging
Multiple Executor Configuration) and the dag belongs to a team, then
validate that the executor is present in that team before adding the dag
to the dag bag.
* Add multi_team config gate
---
airflow-core/src/airflow/dag_processing/dagbag.py | 32 ++++++-
.../tests/unit/dag_processing/test_dagbag.py | 99 ++++++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index ec6f739ac19..d1ff5ee803e 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -135,12 +135,42 @@ def timeout(seconds=1, error_message="Timeout"):
def _validate_executor_fields(dag: DAG) -> None:
+ """Validate that executors specified in tasks are available and owned by
the same team as the dag bundle."""
+ import logging
+
+ log = logging.getLogger(__name__)
+ dag_team_name = None
+
+ # Check if multi team is available by reading the multi_team configuration
(which is boolean)
+ if conf.getboolean("core", "multi_team"):
+ # Get team name from bundle configuration if available
+ if hasattr(dag, "bundle_name") and dag.bundle_name:
+ from airflow.dag_processing.bundles.manager import
DagBundlesManager
+
+ bundle_manager = DagBundlesManager()
+ bundle_config = bundle_manager._bundle_config[dag.bundle_name]
+
+ dag_team_name = bundle_config.team_name
+ if dag_team_name:
+ log.debug(
+ "Found team '%s' for DAG '%s' via bundle '%s'",
dag_team_name, dag.dag_id, dag.bundle_name
+ )
+
for task in dag.tasks:
if not task.executor:
continue
try:
- ExecutorLoader.lookup_executor_name_by_str(task.executor)
+ # Validate that the executor exists and is available for the DAG's
team
+ ExecutorLoader.lookup_executor_name_by_str(task.executor,
team_name=dag_team_name)
except UnknownExecutorException:
+ if dag_team_name:
+ raise UnknownExecutorException(
+ f"Task '{task.task_id}' specifies executor
'{task.executor}', which is not available "
+ f"for team '{dag_team_name}' (the team associated with DAG
'{dag.dag_id}'). "
+ f"Make sure '{task.executor}' is configured for team
'{dag_team_name}' in your "
+ "[core] executors configuration, or update the task's
executor to use one of the "
+ f"configured executors for team '{dag_team_name}'."
+ )
raise UnknownExecutorException(
f"Task '{task.task_id}' specifies executor '{task.executor}',
which is not available. "
"Make sure it is listed in your [core] executors
configuration, or update the task's "
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index b423e79bd92..d0703579681 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -62,6 +62,105 @@ PY313 = sys.version_info >= (3, 13)
INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = "def something():\n return
airflow_DAG\nsomething()"
[email protected]
+def mock_dag_bundle_manager():
+ """Fixture to create a mock DAG bundle manager with team configuration."""
+ from airflow.dag_processing.bundles.base import BaseDagBundle
+ from airflow.dag_processing.bundles.manager import _InternalBundleConfig
+
+ class MockDagBundle(BaseDagBundle):
+ @property
+ def path(self):
+ return None
+
+ def get_current_version(self):
+ return "1.0.0"
+
+ def refresh(self):
+ pass
+
+ @contextlib.contextmanager
+ def _create_bundle_manager(bundle_name="test_bundle",
team_name="test_team"):
+ mock_bundle_config = _InternalBundleConfig(bundle_class=MockDagBundle,
kwargs={}, team_name=team_name)
+
+ bundle_config = {bundle_name: mock_bundle_config}
+
+ with patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
as mock_manager_class:
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = bundle_config
+ yield mock_manager
+
+ return _create_bundle_manager
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_with_team_restriction(mock_executor_lookup,
mock_dag_bundle_manager):
+ """Test executor validation with team-based restrictions using bundle
configuration."""
+ with mock_dag_bundle_manager():
+ mock_executor_lookup.side_effect = UnknownExecutorException("Executor
not available for team")
+
+ with DAG("test-dag", schedule=None) as dag:
+ # Simulate DAG having bundle_name set during parsing
+ dag.bundle_name = "test_bundle"
+ BaseOperator(task_id="t1", executor="team.restricted.executor")
+
+ with pytest.raises(
+ UnknownExecutorException,
+ match=re.escape(
+ "Task 't1' specifies executor 'team.restricted.executor',
which is not available "
+ "for team 'test_team' (the team associated with DAG
'test-dag'). "
+ "Make sure 'team.restricted.executor' is configured for team
'test_team' in your "
+ "[core] executors configuration, or update the task's executor
to use one of the "
+ "configured executors for team 'test_team'."
+ ),
+ ):
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag)
+
+ # Verify the executor lookup was called with the team name from config
+ mock_executor_lookup.assert_called_with("team.restricted.executor",
team_name="test_team")
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_no_team_associated(mock_executor_lookup):
+ """Test executor validation when no team is associated with DAG (no
bundle_name)."""
+ mock_executor_lookup.side_effect = UnknownExecutorException("Unknown
executor")
+
+ with DAG("test-dag", schedule=None) as dag:
+ # No bundle_name attribute, so no team will be found, see final
assertion below
+ BaseOperator(task_id="t1", executor="unknown.executor")
+
+ with pytest.raises(
+ UnknownExecutorException,
+ match=re.escape(
+ "Task 't1' specifies executor 'unknown.executor', which is not
available. "
+ "Make sure it is listed in your [core] executors configuration, or
update the task's "
+ "executor to use one of the configured executors."
+ ),
+ ):
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag)
+
+ mock_executor_lookup.assert_called_with("unknown.executor", team_name=None)
+
+
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
+def test_validate_executor_field_valid_team_executor(mock_executor_lookup,
mock_dag_bundle_manager):
+ """Test executor validation when executor is valid for the DAG's team
(happy path)."""
+ with mock_dag_bundle_manager():
+ mock_executor_lookup.return_value = None
+
+ with DAG("test-dag", schedule=None) as dag:
+ dag.bundle_name = "test_bundle"
+ BaseOperator(task_id="t1", executor="team.valid.executor")
+
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag)
+
+ # Verify the executor lookup was called with the team name which is
fetched from bundle config
+ mock_executor_lookup.assert_called_with("team.valid.executor",
team_name="test_team")
+
+
def db_clean_up():
db.clear_db_dags()
db.clear_db_runs()