This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c48c219b1d2 Don't validate team existence during dag validation
(#62596)
c48c219b1d2 is described below
commit c48c219b1d2968ac158489bd95979d79b410881b
Author: Niko Oliveira <[email protected]>
AuthorDate: Sat Feb 28 09:09:31 2026 -0800
Don't validate team existence during dag validation (#62596)
During dag parsing we validate Dags that use Multiple Executor
Configuration, to ensure the executor being requested for a task is
available in config and for that team. We do that by examining various
airflow and bundle config. However, downstream of getting all the
currently configured executors, a new multi-team check was asserting the
existence of teams in the DB. This check is really only meant to be done
at Scheduler startup. We should not perform this check each parse loop.
This will also save on DB queries that were unintentionally happening
each validation.
---
airflow-core/src/airflow/dag_processing/dagbag.py | 6 ++-
.../src/airflow/executors/executor_loader.py | 4 +-
.../tests/unit/dag_processing/test_dagbag.py | 51 +++++++++++++++++-----
.../tests/unit/executors/test_executor_loader.py | 21 +++++++++
4 files changed, 68 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 649be758477..5062a47bc7e 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -103,14 +103,16 @@ def _executor_exists(executor_name: str, team_name: str |
None) -> bool:
"""Check if executor exists, with global fallback for teams."""
try:
# First pass check for team-specific executor or a global executor
(i.e. team_name=None)
- ExecutorLoader.lookup_executor_name_by_str(executor_name,
team_name=team_name)
+ ExecutorLoader.lookup_executor_name_by_str(executor_name,
team_name=team_name, validate_teams=False)
return True
except UnknownExecutorException:
if team_name:
# If we had a team_name but didn't find an executor, check if
there is a global executor that
# satisfies the request.
try:
- ExecutorLoader.lookup_executor_name_by_str(executor_name,
team_name=None)
+ ExecutorLoader.lookup_executor_name_by_str(
+ executor_name, team_name=None, validate_teams=False
+ )
return True
except UnknownExecutorException:
pass
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 6a86426f210..ef951bb1f7c 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -301,7 +301,7 @@ class ExecutorLoader:
@classmethod
def lookup_executor_name_by_str(
- cls, executor_name_str: str, team_name: str | None = None
+ cls, executor_name_str: str, team_name: str | None = None,
validate_teams: bool = True
) -> ExecutorName:
# lookup the executor by alias first, if not check if we're given a
module path
if (
@@ -310,7 +310,7 @@ class ExecutorLoader:
or not _alias_to_executors_per_team
):
# if we haven't loaded the executors yet, such as directly calling
load_executor
- cls._get_executor_names()
+ cls._get_executor_names(validate_teams)
if executor_name := _alias_to_executors_per_team.get(team_name,
{}).get(executor_name_str):
return executor_name
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index b8eaccd8e78..a673f9f1b0d 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -87,7 +87,7 @@ class TestValidateExecutorFields:
_validate_executor_fields(dag, bundle_name="some_bundle")
# Should call ExecutorLoader without team_name (defaults to None)
- mock_lookup.assert_called_once_with("test.executor", team_name=None)
+ mock_lookup.assert_called_once_with("test.executor", team_name=None,
validate_teams=False)
@patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -107,7 +107,7 @@ class TestValidateExecutorFields:
_validate_executor_fields(dag, bundle_name="test_bundle")
# Should call ExecutorLoader with team from bundle config
- mock_lookup.assert_called_once_with("team.executor",
team_name="test_team")
+ mock_lookup.assert_called_once_with("team.executor",
team_name="test_team", validate_teams=False)
@patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -125,7 +125,7 @@ class TestValidateExecutorFields:
with conf_vars({("core", "multi_team"): "True"}):
_validate_executor_fields(dag, bundle_name="test_bundle")
- mock_lookup.assert_called_once_with("test.executor", team_name=None)
+ mock_lookup.assert_called_once_with("test.executor", team_name=None,
validate_teams=False)
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
def test_multiple_tasks_with_executors(self, mock_lookup):
@@ -140,8 +140,8 @@ class TestValidateExecutorFields:
# Should be called for each task with executor
assert mock_lookup.call_count == 2
- mock_lookup.assert_any_call("executor1", team_name=None)
- mock_lookup.assert_any_call("executor2", team_name=None)
+ mock_lookup.assert_any_call("executor1", team_name=None,
validate_teams=False)
+ mock_lookup.assert_any_call("executor2", team_name=None,
validate_teams=False)
@patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -213,8 +213,8 @@ class TestValidateExecutorFields:
# Should call lookup twice: first for team, then for global
assert mock_lookup.call_count == 2
- mock_lookup.assert_any_call("global.executor", team_name="test_team")
- mock_lookup.assert_any_call("global.executor", team_name=None)
+ mock_lookup.assert_any_call("global.executor", team_name="test_team",
validate_teams=False)
+ mock_lookup.assert_any_call("global.executor", team_name=None,
validate_teams=False)
@patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -247,8 +247,8 @@ class TestValidateExecutorFields:
# Should call lookup twice: first for team, then for global fallback
assert mock_lookup.call_count == 2
- mock_lookup.assert_any_call("unknown.executor", team_name="test_team")
- mock_lookup.assert_any_call("unknown.executor", team_name=None)
+ mock_lookup.assert_any_call("unknown.executor", team_name="test_team",
validate_teams=False)
+ mock_lookup.assert_any_call("unknown.executor", team_name=None,
validate_teams=False)
@patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
@patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -270,7 +270,38 @@ class TestValidateExecutorFields:
_validate_executor_fields(dag, bundle_name="test_bundle")
# Should only call lookup once for team-specific executor
- mock_lookup.assert_called_once_with("team.executor",
team_name="test_team")
+ mock_lookup.assert_called_once_with("team.executor",
team_name="test_team", validate_teams=False)
+
+ @pytest.mark.usefixtures("clean_executor_loader")
+ def test_validate_executor_fields_does_not_access_database(self):
+ """Regression test: executor validation during DAG parsing must not
access the database.
+
+ In Airflow 3, DAG parsing happens in isolated subprocesses where
database access
+ is blocked via block_orm_access(). The _validate_executor_fields
function must
+ validate executors using only local config (validate_teams=False),
without querying
+ the database to verify team names exist. If validate_teams were True,
the call chain
+ would reach Team.get_all_team_names() which does a DB query, raising
RuntimeError
+ in the parsing subprocess.
+ """
+ with DAG("test-dag", schedule=None) as dag:
+ BaseOperator(task_id="t1", executor="LocalExecutor")
+
+ with conf_vars(
+ {
+ ("core", "executor"): "LocalExecutor;team1=CeleryExecutor",
+ ("core", "multi_team"): "True",
+ }
+ ):
+ # Patch _validate_teams_exist_in_database to raise RuntimeError,
+ # simulating what happens in the DAG parsing subprocess where DB
is blocked.
+ # If the fix is correct, this should never be called.
+ with patch.object(
+ ExecutorLoader,
+ "_validate_teams_exist_in_database",
+ side_effect=RuntimeError("Direct database access via the ORM
is not allowed in Airflow 3.0"),
+ ):
+ # Should succeed without hitting the database
+ _validate_executor_fields(dag)
def test_validate_executor_field_executor_not_configured():
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 6c2116836d7..07efc11b9a3 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -614,6 +614,27 @@ class TestExecutorLoader:
executor_loader.ExecutorLoader.get_executor_names(validate_teams=False)
mock_get_team_names.assert_not_called()
+ def
test_lookup_executor_name_by_str_skips_db_when_validate_teams_false(self):
+ """Regression test: lookup_executor_name_by_str with
validate_teams=False must not access the DB.
+
+ During DAG parsing in Airflow 3, database access is blocked. The
lookup_executor_name_by_str
+ method is called from _validate_executor_fields during parsing, so it
must be able to resolve
+ executor names using only local config without triggering
_validate_teams_exist_in_database.
+ """
+ with conf_vars(
+ {("core", "executor"): "=LocalExecutor;team1=CeleryExecutor",
("core", "multi_team"): "True"}
+ ):
+ with patch.object(
+ executor_loader.ExecutorLoader,
+ "_validate_teams_exist_in_database",
+ side_effect=RuntimeError("Direct database access via the ORM
is not allowed in Airflow 3.0"),
+ ):
+ # Should succeed without hitting the database
+ result =
executor_loader.ExecutorLoader.lookup_executor_name_by_str(
+ "LocalExecutor", validate_teams=False
+ )
+ assert result.alias == "LocalExecutor"
+
def test_get_executor_names_default_validates_teams(self):
"""Test that get_executor_names validates teams by default."""
with patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names: