This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d43756c1768 Second cut at verifying team/bundle at dag parse time
(#57166)
d43756c1768 is described below
commit d43756c17688544499dceaef399a6536afbf721f
Author: Niko Oliveira <[email protected]>
AuthorDate: Mon Oct 27 11:07:02 2025 -0700
Second cut at verifying team/bundle at dag parse time (#57166)
Verify team/bundle/executor at dag parse time
Bundle path is already plumbed down to the dag bag for parsing, also
pass along the bundle name which is used to lookup the bundle config and
find the team name from bundle configuration.
This is used to verify that tasks/dags are using executors that are
available/configured for their team.
---
.../src/airflow/cli/commands/dag_command.py | 10 +-
airflow-core/src/airflow/dag_processing/dagbag.py | 42 +++-
airflow-core/src/airflow/dag_processing/manager.py | 1 +
.../src/airflow/dag_processing/processor.py | 9 +-
airflow-core/src/airflow/utils/cli.py | 12 +-
.../tests/unit/cli/commands/test_dag_command.py | 3 +
.../tests/unit/dag_processing/test_dagbag.py | 272 +++++++++++++++------
.../tests/unit/dag_processing/test_manager.py | 33 ++-
.../tests/unit/dag_processing/test_processor.py | 39 ++-
9 files changed, 319 insertions(+), 102 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 79407bc47f6..52f1e142812 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -375,7 +375,7 @@ def dag_list_dags(args, session: Session = NEW_SESSION) ->
None:
for bundle in all_bundles:
if bundle.name in bundles_to_search:
- dagbag = DagBag(bundle.path, bundle_path=bundle.path)
+ dagbag = DagBag(bundle.path, bundle_path=bundle.path,
bundle_name=bundle.name)
dagbag.collect_dags()
dags_list.extend(list(dagbag.dags.values()))
dagbag_import_errors += len(dagbag.import_errors)
@@ -472,7 +472,7 @@ def dag_list_import_errors(args, session: Session =
NEW_SESSION) -> None:
for bundle in all_bundles:
if bundle.name in bundles_to_search:
- dagbag = DagBag(bundle.path, bundle_path=bundle.path)
+ dagbag = DagBag(bundle.path, bundle_path=bundle.path,
bundle_name=bundle.name)
for filename, errors in dagbag.import_errors.items():
data.append({"bundle_name": bundle.name, "filepath":
filename, "error": errors})
else:
@@ -524,7 +524,7 @@ def dag_report(args) -> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
- dagbag = DagBag(bundle.path, include_examples=False)
+ dagbag = DagBag(bundle.path, bundle_name=bundle.name,
include_examples=False)
all_dagbag_stats.extend(dagbag.dagbag_stats)
AirflowConsole().print_as(
@@ -688,5 +688,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION)
-> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
- dag_bag = DagBag(bundle.path, bundle_path=bundle.path,
include_examples=False)
+ dag_bag = DagBag(
+ bundle.path, bundle_path=bundle.path, bundle_name=bundle.name,
include_examples=False
+ )
sync_bag_to_db(dag_bag, bundle.name,
bundle_version=bundle.get_current_version(), session=session)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index a3cc9197183..cadd26412c9 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -134,7 +134,25 @@ def timeout(seconds=1, error_message="Timeout"):
signal.setitimer(signal.ITIMER_REAL, 0)
-def _validate_executor_fields(dag: DAG) -> None:
+def _executor_exists(executor_name: str, team_name: str | None) -> bool:
+ """Check if executor exists, with global fallback for teams."""
+ try:
+ # First pass check for team-specific executor or a global executor
(i.e. team_name=None)
+ ExecutorLoader.lookup_executor_name_by_str(executor_name,
team_name=team_name)
+ return True
+ except UnknownExecutorException:
+ if team_name:
+ # If we had a team_name but didn't find an executor, check if
there is a global executor that
+ # satisfies the request.
+ try:
+ ExecutorLoader.lookup_executor_name_by_str(executor_name,
team_name=None)
+ return True
+ except UnknownExecutorException:
+ pass
+ return False
+
+
+def _validate_executor_fields(dag: DAG, bundle_name: str | None = None) ->
None:
"""Validate that executors specified in tasks are available and owned by
the same team as the dag bundle."""
import logging
@@ -144,32 +162,30 @@ def _validate_executor_fields(dag: DAG) -> None:
# Check if multi team is available by reading the multi_team configuration
(which is boolean)
if conf.getboolean("core", "multi_team"):
# Get team name from bundle configuration if available
- if hasattr(dag, "bundle_name") and dag.bundle_name:
+ if bundle_name:
from airflow.dag_processing.bundles.manager import
DagBundlesManager
bundle_manager = DagBundlesManager()
- bundle_config = bundle_manager._bundle_config[dag.bundle_name]
+ bundle_config = bundle_manager._bundle_config[bundle_name]
dag_team_name = bundle_config.team_name
if dag_team_name:
log.debug(
- "Found team '%s' for DAG '%s' via bundle '%s'",
dag_team_name, dag.dag_id, dag.bundle_name
+ "Found team '%s' for DAG '%s' via bundle '%s'",
dag_team_name, dag.dag_id, bundle_name
)
for task in dag.tasks:
if not task.executor:
continue
- try:
- # Validate that the executor exists and is available for the DAG's
team
- ExecutorLoader.lookup_executor_name_by_str(task.executor,
team_name=dag_team_name)
- except UnknownExecutorException:
+
+ if not _executor_exists(task.executor, dag_team_name):
if dag_team_name:
raise UnknownExecutorException(
f"Task '{task.task_id}' specifies executor
'{task.executor}', which is not available "
- f"for team '{dag_team_name}' (the team associated with DAG
'{dag.dag_id}'). "
- f"Make sure '{task.executor}' is configured for team
'{dag_team_name}' in your "
+ f"for team '{dag_team_name}' (the team associated with DAG
'{dag.dag_id}') or as a global executor. "
+ f"Make sure '{task.executor}' is configured for team
'{dag_team_name}' or globally in your "
"[core] executors configuration, or update the task's
executor to use one of the "
- f"configured executors for team '{dag_team_name}'."
+ f"configured executors for team '{dag_team_name}' or
available global executors."
)
raise UnknownExecutorException(
f"Task '{task.task_id}' specifies executor '{task.executor}',
which is not available. "
@@ -210,9 +226,11 @@ class DagBag(LoggingMixin):
collect_dags: bool = True,
known_pools: set[str] | None = None,
bundle_path: Path | None = None,
+ bundle_name: str | None = None,
):
super().__init__()
self.bundle_path = bundle_path
+ self.bundle_name = bundle_name
include_examples = (
include_examples
if isinstance(include_examples, bool)
@@ -528,7 +546,7 @@ class DagBag(LoggingMixin):
dag.relative_fileloc = relative_fileloc
try:
dag.validate()
- _validate_executor_fields(dag)
+ _validate_executor_fields(dag, self.bundle_name)
self.bag_dag(dag=dag)
except AirflowClusterPolicySkipDag:
pass
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 52ba1e8246b..ff6ff81ab1e 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -921,6 +921,7 @@ class DagFileProcessorManager(LoggingMixin):
id=id,
path=dag_file.absolute_path,
bundle_path=cast("Path", dag_file.bundle_path),
+ bundle_name=dag_file.bundle_name,
callbacks=callback_to_execute_for_file,
selector=self.selector,
logger=logger,
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 0d6fdb84331..bf937eac53f 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -95,6 +95,9 @@ class DagFileParseRequest(BaseModel):
bundle_path: Path
"""Passing bundle path around lets us figure out relative file path."""
+ bundle_name: str
+ """Bundle name for team-specific executor validation."""
+
callback_requests: list[CallbackRequest] = Field(default_factory=list)
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
@@ -203,6 +206,7 @@ def _parse_file(msg: DagFileParseRequest, log:
FilteringBoundLogger) -> DagFileP
bag = DagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
+ bundle_name=msg.bundle_name,
include_examples=False,
load_op_links=False,
)
@@ -493,6 +497,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
*,
path: str | os.PathLike[str],
bundle_path: Path,
+ bundle_name: str,
callbacks: list[CallbackRequest],
target: Callable[[], None] = _parse_file_entrypoint,
client: Client,
@@ -504,7 +509,7 @@ class DagFileProcessorProcess(WatchedSubprocess):
proc: Self = super().start(target=target, client=client, **kwargs)
proc.had_callbacks = bool(callbacks) # Track if this process had
callbacks
- proc._on_child_started(callbacks, path, bundle_path)
+ proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc
def _on_child_started(
@@ -512,10 +517,12 @@ class DagFileProcessorProcess(WatchedSubprocess):
callbacks: list[CallbackRequest],
path: str | os.PathLike[str],
bundle_path: Path,
+ bundle_name: str,
) -> None:
msg = DagFileParseRequest(
file=os.fspath(path),
bundle_path=bundle_path,
+ bundle_name=bundle_name,
callback_requests=callbacks,
)
self.send_msg(msg, request_id=0)
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index c96cbc005b1..a4ad7fd523c 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -280,7 +280,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | N
bundle = manager.get_bundle(bundle_name)
with _airflow_parsing_context_manager(dag_id=dag_id):
dagbag = DagBag(
- dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path, include_examples=False
+ dag_folder=dagfile_path or bundle.path,
+ bundle_path=bundle.path,
+ bundle_name=bundle.name,
+ include_examples=False,
)
if dag := dagbag.dags.get(dag_id):
return dag
@@ -290,7 +293,10 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | N
bundle.initialize()
with _airflow_parsing_context_manager(dag_id=dag_id):
dagbag = DagBag(
- dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path, include_examples=False
+ dag_folder=dagfile_path or bundle.path,
+ bundle_path=bundle.path,
+ bundle_name=bundle.name,
+ include_examples=False,
)
sync_bag_to_db(dagbag, bundle.name, bundle.version)
if dag := dagbag.dags.get(dag_id):
@@ -327,7 +333,7 @@ def get_dags(bundle_names: list | None, dag_id: str,
use_regex: bool = False, fr
return [get_bagged_dag(bundle_names=bundle_names, dag_id=dag_id)]
def _find_dag(bundle):
- dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path)
+ dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path,
bundle_name=bundle.name)
matched_dags = [dag for dag in dagbag.dags.values() if
re.search(dag_id, dag.dag_id)]
return matched_dags
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 10332a37a58..484a2121716 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -784,6 +784,7 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=TEST_DAGS_FOLDER,
+ bundle_name="testing",
include_examples=False,
)
@@ -805,6 +806,7 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
+ bundle_name="testing",
include_examples=False,
)
@@ -836,6 +838,7 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
+ bundle_name="testing",
include_examples=False,
)
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index d0703579681..0b25346814b 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -62,110 +62,210 @@ PY313 = sys.version_info >= (3, 13)
INVALID_DAG_WITH_DEPTH_FILE_CONTENTS = "def something():\n return
airflow_DAG\nsomething()"
[email protected]
-def mock_dag_bundle_manager():
- """Fixture to create a mock DAG bundle manager with team configuration."""
- from airflow.dag_processing.bundles.base import BaseDagBundle
- from airflow.dag_processing.bundles.manager import _InternalBundleConfig
+def db_clean_up():
+ db.clear_db_dags()
+ db.clear_db_runs()
+ db.clear_db_serialized_dags()
+ db.clear_dag_specific_permissions()
- class MockDagBundle(BaseDagBundle):
- @property
- def path(self):
- return None
- def get_current_version(self):
- return "1.0.0"
+class TestValidateExecutorFields:
+ """Comprehensive tests for _validate_executor_fields function."""
- def refresh(self):
- pass
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_multi_team_disabled_ignores_bundle_name(self, mock_lookup):
+ """Test that when multi_team is disabled, bundle_name is ignored and
no team lookup occurs."""
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="t1", executor="test.executor")
- @contextlib.contextmanager
- def _create_bundle_manager(bundle_name="test_bundle",
team_name="test_team"):
- mock_bundle_config = _InternalBundleConfig(bundle_class=MockDagBundle,
kwargs={}, team_name=team_name)
+ # multi_team disabled by default, no need to add conf_vars
+ _validate_executor_fields(dag, bundle_name="some_bundle")
- bundle_config = {bundle_name: mock_bundle_config}
+ # Should call ExecutorLoader without team_name (defaults to None)
+ mock_lookup.assert_called_once_with("test.executor", team_name=None)
- with patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
as mock_manager_class:
- mock_manager = mock_manager_class.return_value
- mock_manager._bundle_config = bundle_config
- yield mock_manager
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_multi_team_enabled_bundle_exists_with_team(self, mock_lookup,
mock_manager_class):
+ """Test successful team lookup when bundle exists and has team_name."""
+ # Setup mock bundle manager
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = "test_team"
- return _create_bundle_manager
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="t1", executor="team.executor")
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
-def test_validate_executor_field_with_team_restriction(mock_executor_lookup,
mock_dag_bundle_manager):
- """Test executor validation with team-based restrictions using bundle
configuration."""
- with mock_dag_bundle_manager():
- mock_executor_lookup.side_effect = UnknownExecutorException("Executor
not available for team")
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag, bundle_name="test_bundle")
- with DAG("test-dag", schedule=None) as dag:
- # Simulate DAG having bundle_name set during parsing
- dag.bundle_name = "test_bundle"
- BaseOperator(task_id="t1", executor="team.restricted.executor")
+ # Should call ExecutorLoader with team from bundle config
+ mock_lookup.assert_called_once_with("team.executor",
team_name="test_team")
- with pytest.raises(
- UnknownExecutorException,
- match=re.escape(
- "Task 't1' specifies executor 'team.restricted.executor',
which is not available "
- "for team 'test_team' (the team associated with DAG
'test-dag'). "
- "Make sure 'team.restricted.executor' is configured for team
'test_team' in your "
- "[core] executors configuration, or update the task's executor
to use one of the "
- "configured executors for team 'test_team'."
- ),
- ):
- with conf_vars({("core", "multi_team"): "True"}):
- _validate_executor_fields(dag)
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_multi_team_enabled_bundle_exists_no_team(self, mock_lookup,
mock_manager_class):
+ """Test when bundle exists but has no team_name (None or empty)."""
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = None # No team associated
+
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
- # Verify the executor lookup was called with the team name from config
- mock_executor_lookup.assert_called_with("team.restricted.executor",
team_name="test_team")
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="t1", executor="test.executor")
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag, bundle_name="test_bundle")
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
-def test_validate_executor_field_no_team_associated(mock_executor_lookup):
- """Test executor validation when no team is associated with DAG (no
bundle_name)."""
- mock_executor_lookup.side_effect = UnknownExecutorException("Unknown
executor")
+ mock_lookup.assert_called_once_with("test.executor", team_name=None)
- with DAG("test-dag", schedule=None) as dag:
- # No bundle_name attribute, so no team will be found, see final
assertion below
- BaseOperator(task_id="t1", executor="unknown.executor")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_multiple_tasks_with_executors(self, mock_lookup):
+ """Test that all tasks with executors are validated."""
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="t1", executor="executor1")
+ BaseOperator(task_id="t2", executor="executor2")
+ BaseOperator(task_id="t3") # No executor, should be skipped
- with pytest.raises(
- UnknownExecutorException,
- match=re.escape(
- "Task 't1' specifies executor 'unknown.executor', which is not
available. "
- "Make sure it is listed in your [core] executors configuration, or
update the task's "
- "executor to use one of the configured executors."
- ),
- ):
with conf_vars({("core", "multi_team"): "True"}):
_validate_executor_fields(dag)
- mock_executor_lookup.assert_called_with("unknown.executor", team_name=None)
+ # Should be called for each task with executor
+ assert mock_lookup.call_count == 2
+ mock_lookup.assert_any_call("executor1", team_name=None)
+ mock_lookup.assert_any_call("executor2", team_name=None)
+
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_executor_validation_failure_with_team(self, mock_lookup,
mock_manager_class):
+ """Test executor validation failure when team is associated
(team-specific error)."""
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = "test_team"
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
[email protected](ExecutorLoader, "lookup_executor_name_by_str")
-def test_validate_executor_field_valid_team_executor(mock_executor_lookup,
mock_dag_bundle_manager):
- """Test executor validation when executor is valid for the DAG's team
(happy path)."""
- with mock_dag_bundle_manager():
- mock_executor_lookup.return_value = None
+ # ExecutorLoader raises exception
+ mock_lookup.side_effect = UnknownExecutorException("Executor not
found")
with DAG("test-dag", schedule=None) as dag:
- dag.bundle_name = "test_bundle"
- BaseOperator(task_id="t1", executor="team.valid.executor")
+ BaseOperator(task_id="task1", executor="invalid.executor")
with conf_vars({("core", "multi_team"): "True"}):
- _validate_executor_fields(dag)
+ with pytest.raises(
+ UnknownExecutorException,
+ match=re.escape(
+ "Task 'task1' specifies executor 'invalid.executor', which
is not available "
+ "for team 'test_team' (the team associated with DAG
'test-dag') or as a global executor. "
+ "Make sure 'invalid.executor' is configured for team
'test_team' or globally in your "
+ "[core] executors configuration, or update the task's
executor to use one of the "
+ "configured executors for team 'test_team' or available
global executors."
+ ),
+ ):
+ _validate_executor_fields(dag, bundle_name="test_bundle")
+
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_executor_validation_failure_no_team(self, mock_lookup):
+ """Test executor validation failure when no team is associated
(generic error)."""
+ mock_lookup.side_effect = UnknownExecutorException("Executor not
found")
- # Verify the executor lookup was called with the team name which is
fetched from bundle config
- mock_executor_lookup.assert_called_with("team.valid.executor",
team_name="test_team")
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="task1", executor="invalid.executor")
+ with conf_vars({("core", "multi_team"): "True"}):
+ with pytest.raises(
+ UnknownExecutorException,
+ match=re.escape(
+ "Task 'task1' specifies executor 'invalid.executor', which
is not available. "
+ "Make sure it is listed in your [core] executors
configuration, or update the task's "
+ "executor to use one of the configured executors."
+ ),
+ ):
+ _validate_executor_fields(dag) # No bundle_name
+
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_global_executor_fallback_success(self, mock_lookup,
mock_manager_class):
+ """Test that team-specific executor failure falls back to global
executor successfully."""
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = "test_team"
+
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
+
+ # First call (team-specific) fails, second call (global) succeeds
+ mock_lookup.side_effect = [UnknownExecutorException("Team executor not
found"), None]
-def db_clean_up():
- db.clear_db_dags()
- db.clear_db_runs()
- db.clear_db_serialized_dags()
- db.clear_dag_specific_permissions()
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="task1", executor="global.executor")
+
+ with conf_vars({("core", "multi_team"): "True"}):
+ # Should not raise exception due to global fallback
+ _validate_executor_fields(dag, bundle_name="test_bundle")
+
+ # Should call lookup twice: first for team, then for global
+ assert mock_lookup.call_count == 2
+ mock_lookup.assert_any_call("global.executor", team_name="test_team")
+ mock_lookup.assert_any_call("global.executor", team_name=None)
+
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_global_executor_fallback_failure(self, mock_lookup,
mock_manager_class):
+ """Test that when both team-specific and global executors fail,
appropriate error is raised."""
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = "test_team"
+
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
+
+ # Both calls fail
+ mock_lookup.side_effect = UnknownExecutorException("Executor not
found")
+
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="task1", executor="unknown.executor")
+
+ with conf_vars({("core", "multi_team"): "True"}):
+ with pytest.raises(
+ UnknownExecutorException,
+ match=re.escape(
+ "Task 'task1' specifies executor 'unknown.executor', which
is not available "
+ "for team 'test_team' (the team associated with DAG
'test-dag') or as a global executor. "
+ "Make sure 'unknown.executor' is configured for team
'test_team' or globally in your "
+ "[core] executors configuration, or update the task's
executor to use one of the "
+ "configured executors for team 'test_team' or available
global executors."
+ ),
+ ):
+ _validate_executor_fields(dag, bundle_name="test_bundle")
+
+ # Should call lookup twice: first for team, then for global fallback
+ assert mock_lookup.call_count == 2
+ mock_lookup.assert_any_call("unknown.executor", team_name="test_team")
+ mock_lookup.assert_any_call("unknown.executor", team_name=None)
+
+ @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
+ @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
+ def test_team_specific_executor_success_no_fallback(self, mock_lookup,
mock_manager_class):
+ """Test that when team-specific executor succeeds, global fallback is
not attempted."""
+ mock_bundle_config = mock.MagicMock()
+ mock_bundle_config.team_name = "test_team"
+
+ mock_manager = mock_manager_class.return_value
+ mock_manager._bundle_config = {"test_bundle": mock_bundle_config}
+
+ # First call (team-specific) succeeds
+ mock_lookup.return_value = None
+
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="task1", executor="team.executor")
+
+ with conf_vars({("core", "multi_team"): "True"}):
+ _validate_executor_fields(dag, bundle_name="test_bundle")
+
+ # Should only call lookup once for team-specific executor
+ mock_lookup.assert_called_once_with("team.executor",
team_name="test_team")
def test_validate_executor_field_executor_not_configured():
@@ -196,6 +296,15 @@ class TestDagBag:
def teardown_class(self):
db_clean_up()
+ def test_dagbag_with_bundle_name(self, tmp_path):
+ """Test that DagBag constructor accepts and stores bundle_name
parameter."""
+ dagbag = DagBag(dag_folder=os.fspath(tmp_path),
include_examples=False, bundle_name="test_bundle")
+ assert dagbag.bundle_name == "test_bundle"
+
+ # Test with None (default)
+ dagbag2 = DagBag(dag_folder=os.fspath(tmp_path),
include_examples=False)
+ assert dagbag2.bundle_name is None
+
def test_timeout_context_manager_raises_exception(self):
"""Test that the timeout context manager raises AirflowTaskTimeout
when time limit is exceeded."""
import time
@@ -211,7 +320,7 @@ class TestDagBag:
"""
Test that we're able to parse some example DAGs and retrieve them
"""
- dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=True)
+ dagbag = DagBag(dag_folder=os.fspath(tmp_path), include_examples=True,
bundle_name="test_bundle")
some_expected_dag_ids = ["example_bash_operator",
"example_branch_operator"]
@@ -823,7 +932,12 @@ with airflow.DAG(
assert "has no tags" in dagbag.import_errors[dag_file]
def test_dagbag_dag_collection(self):
- dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False,
collect_dags=False)
+ dagbag = DagBag(
+ dag_folder=TEST_DAGS_FOLDER,
+ include_examples=False,
+ collect_dags=False,
+ bundle_name="test_collection",
+ )
# since collect_dags is False, dagbag.dags should be empty
assert not dagbag.dags
@@ -831,7 +945,7 @@ with airflow.DAG(
assert dagbag.dags
# test that dagbag.dags is not empty if collect_dags is True
- dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+ dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False,
bundle_name="test_collection")
assert dagbag.dags
def test_dabgag_captured_warnings(self):
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index b7c3f23c6f4..d5d7135ea4e 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -561,6 +561,7 @@ class TestDagFileProcessorManager:
{
"file": "/opt/airflow/dags/test_dag.py",
"bundle_path": "/opt/airflow/dags",
+ "bundle_name": "testing",
"callback_requests": [],
"type": "DagFileParseRequest",
},
@@ -581,6 +582,7 @@ class TestDagFileProcessorManager:
{
"file": "/opt/airflow/dags/dag_callback_dag.py",
"bundle_path": "/opt/airflow/dags",
+ "bundle_name": "testing",
"callback_requests": [
{
"filepath": "dag_callback_dag.py",
@@ -603,7 +605,9 @@ class TestDagFileProcessorManager:
from airflow.sdk.execution_time.comms import _ResponseFrame
processor, read_socket = self.mock_processor()
- processor._on_child_started(callbacks, path,
bundle_path=Path("/opt/airflow/dags"))
+ processor._on_child_started(
+ callbacks, path, bundle_path=Path("/opt/airflow/dags"),
bundle_name="testing"
+ )
read_socket.settimeout(0.1)
# Read response from the read end of the socket
@@ -1036,6 +1040,7 @@ class TestDagFileProcessorManager:
id=mock.ANY,
path=Path(dag2_path.bundle_path, dag2_path.rel_path),
bundle_path=dag2_path.bundle_path,
+ bundle_name="testing",
callbacks=[dag2_req1],
selector=mock.ANY,
logger=mock_logger,
@@ -1046,6 +1051,7 @@ class TestDagFileProcessorManager:
id=mock.ANY,
path=Path(dag1_path.bundle_path, dag1_path.rel_path),
bundle_path=dag1_path.bundle_path,
+ bundle_name="testing",
callbacks=[dag1_req1, dag1_req2],
selector=mock.ANY,
logger=mock_logger,
@@ -1368,3 +1374,28 @@ class TestDagFileProcessorManager:
assert len(team1.dag_bundles) == 0
team2 = session.scalars(select(Team).where(Team.name ==
team2_name)).one()
assert len(team2.dag_bundles) == 0
+
+ @mock.patch.object(DagFileProcessorProcess, "start")
+ def test_create_process_passes_bundle_name_to_process_start(
+ self, mock_process_start, configure_testing_dag_bundle
+ ):
+ """Test that DagFileProcessorManager._create_process() passes
bundle_name to DagFileProcessorProcess.start()"""
+ with configure_testing_dag_bundle("/tmp"):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+
+ # Setup test data
+ file_info = DagFileInfo(
+ bundle_name="testing", rel_path=Path("test_dag.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+
+ # Mock the process creation
+ mock_process_start.return_value = self.mock_processor()[0]
+
+ # Call _create_process (only takes one parameter: dag_file)
+ manager._create_process(file_info)
+
+ # Verify DagFileProcessorProcess.start was called with correct
bundle_name
+ mock_process_start.assert_called_once()
+ call_kwargs = mock_process_start.call_args.kwargs
+ assert call_kwargs["bundle_name"] == "testing"
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8f547724df4..ef1ee928bf7 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -121,6 +121,7 @@ class TestDagFileProcessor:
DagFileParseRequest(
file=file_path,
bundle_path=TEST_DAG_FOLDER,
+ bundle_name="testing",
callback_requests=callback_requests or [],
),
log=structlog.get_logger(),
@@ -160,6 +161,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -195,6 +197,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -228,6 +231,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -271,6 +275,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -308,6 +313,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -337,6 +343,7 @@ class TestDagFileProcessor:
id=1,
path=path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=logger,
logger_filehandle=logger_filehandle,
@@ -370,6 +377,7 @@ class TestDagFileProcessor:
id=1,
path=dag1_path,
bundle_path=tmp_path,
+ bundle_name="testing",
callbacks=[],
logger=MagicMock(spec=FilteringBoundLogger),
logger_filehandle=MagicMock(spec=BinaryIO),
@@ -481,6 +489,7 @@ def test_parse_file_entrypoint_parses_dag_callbacks(mocker):
body={
"file": "/files/dags/wait.py",
"bundle_path": "/files/dags",
+ "bundle_name": "testing",
"callback_requests": [
{
"filepath": "wait.py",
@@ -547,7 +556,9 @@ def test_parse_file_with_dag_callbacks(spy_agency):
)
]
_parse_file(
- DagFileParseRequest(file="A", bundle_path="no matter",
callback_requests=requests),
+ DagFileParseRequest(
+ file="A", bundle_path="no matter", bundle_name="testing",
callback_requests=requests
+ ),
log=structlog.get_logger(),
)
@@ -590,7 +601,7 @@ def test_parse_file_with_task_callbacks(spy_agency):
)
]
_parse_file(
- DagFileParseRequest(file="A", bundle_path="test",
callback_requests=requests),
+ DagFileParseRequest(file="A", bundle_path="test",
bundle_name="testing", callback_requests=requests),
log=structlog.get_logger(),
)
@@ -1758,6 +1769,30 @@ class TestExecuteEmailCallbacks:
with pytest.raises(ValueError, match=expected_error):
_execute_email_callbacks(dagbag, request, log)
+ def test_parse_file_passes_bundle_name_to_dagbag(self):
+ """Test that _parse_file() creates DagBag with correct bundle_name
parameter"""
+ # Mock the DagBag constructor to capture its arguments
+ with patch("airflow.dag_processing.processor.DagBag") as
mock_dagbag_class:
+ # Create a mock instance with proper attributes for Pydantic
validation
+ mock_dagbag_instance = MagicMock()
+ mock_dagbag_instance.dags = {}
+ mock_dagbag_instance.import_errors = {} # Must be a dict, not
MagicMock for Pydantic validation
+ mock_dagbag_class.return_value = mock_dagbag_instance
+
+ request = DagFileParseRequest(
+ file="/test/dag.py",
+ bundle_path=pathlib.Path("/test"),
+ bundle_name="test_bundle",
+ callback_requests=[],
+ )
+
+ _parse_file(request, log=structlog.get_logger())
+
+ # Verify DagBag was called with correct bundle_name
+ mock_dagbag_class.assert_called_once()
+ call_kwargs = mock_dagbag_class.call_args.kwargs
+ assert call_kwargs["bundle_name"] == "test_bundle"
+
class TestDagProcessingMessageTypes:
def test_message_types_in_dag_processor(self):