This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c48c219b1d2 Don't validate team existence during dag validation 
(#62596)
c48c219b1d2 is described below

commit c48c219b1d2968ac158489bd95979d79b410881b
Author: Niko Oliveira <[email protected]>
AuthorDate: Sat Feb 28 09:09:31 2026 -0800

    Don't validate team existence during dag validation (#62596)
    
    During dag parsing we validate Dags that use Multiple Executor
    Configuration, to ensure the executor being requested for a task is
    available in config and for that team. We do that by examining various
    airflow and bundle config. However, downstream of getting all the
    currently configured executors, a new multi-team check was asserting the
    existence of teams in the DB. This check is really only meant to be done
    at Scheduler startup. We should not perform this check each parse loop.
    
    This will also save on DB queries that were unintentionally happening
    each validation.
---
 airflow-core/src/airflow/dag_processing/dagbag.py  |  6 ++-
 .../src/airflow/executors/executor_loader.py       |  4 +-
 .../tests/unit/dag_processing/test_dagbag.py       | 51 +++++++++++++++++-----
 .../tests/unit/executors/test_executor_loader.py   | 21 +++++++++
 4 files changed, 68 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py 
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 649be758477..5062a47bc7e 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -103,14 +103,16 @@ def _executor_exists(executor_name: str, team_name: str | 
None) -> bool:
     """Check if executor exists, with global fallback for teams."""
     try:
         # First pass check for team-specific executor or a global executor 
(i.e. team_name=None)
-        ExecutorLoader.lookup_executor_name_by_str(executor_name, 
team_name=team_name)
+        ExecutorLoader.lookup_executor_name_by_str(executor_name, 
team_name=team_name, validate_teams=False)
         return True
     except UnknownExecutorException:
         if team_name:
             # If we had a team_name but didn't find an executor, check if 
there is a global executor that
             # satisfies the request.
             try:
-                ExecutorLoader.lookup_executor_name_by_str(executor_name, 
team_name=None)
+                ExecutorLoader.lookup_executor_name_by_str(
+                    executor_name, team_name=None, validate_teams=False
+                )
                 return True
             except UnknownExecutorException:
                 pass
diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index 6a86426f210..ef951bb1f7c 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -301,7 +301,7 @@ class ExecutorLoader:
 
     @classmethod
     def lookup_executor_name_by_str(
-        cls, executor_name_str: str, team_name: str | None = None
+        cls, executor_name_str: str, team_name: str | None = None, 
validate_teams: bool = True
     ) -> ExecutorName:
         # lookup the executor by alias first, if not check if we're given a 
module path
         if (
@@ -310,7 +310,7 @@ class ExecutorLoader:
             or not _alias_to_executors_per_team
         ):
             # if we haven't loaded the executors yet, such as directly calling 
load_executor
-            cls._get_executor_names()
+            cls._get_executor_names(validate_teams)
 
         if executor_name := _alias_to_executors_per_team.get(team_name, 
{}).get(executor_name_str):
             return executor_name
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py 
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index b8eaccd8e78..a673f9f1b0d 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -87,7 +87,7 @@ class TestValidateExecutorFields:
         _validate_executor_fields(dag, bundle_name="some_bundle")
 
         # Should call ExecutorLoader without team_name (defaults to None)
-        mock_lookup.assert_called_once_with("test.executor", team_name=None)
+        mock_lookup.assert_called_once_with("test.executor", team_name=None, 
validate_teams=False)
 
     @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -107,7 +107,7 @@ class TestValidateExecutorFields:
             _validate_executor_fields(dag, bundle_name="test_bundle")
 
         # Should call ExecutorLoader with team from bundle config
-        mock_lookup.assert_called_once_with("team.executor", 
team_name="test_team")
+        mock_lookup.assert_called_once_with("team.executor", 
team_name="test_team", validate_teams=False)
 
     @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -125,7 +125,7 @@ class TestValidateExecutorFields:
         with conf_vars({("core", "multi_team"): "True"}):
             _validate_executor_fields(dag, bundle_name="test_bundle")
 
-        mock_lookup.assert_called_once_with("test.executor", team_name=None)
+        mock_lookup.assert_called_once_with("test.executor", team_name=None, 
validate_teams=False)
 
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
     def test_multiple_tasks_with_executors(self, mock_lookup):
@@ -140,8 +140,8 @@ class TestValidateExecutorFields:
 
         # Should be called for each task with executor
         assert mock_lookup.call_count == 2
-        mock_lookup.assert_any_call("executor1", team_name=None)
-        mock_lookup.assert_any_call("executor2", team_name=None)
+        mock_lookup.assert_any_call("executor1", team_name=None, 
validate_teams=False)
+        mock_lookup.assert_any_call("executor2", team_name=None, 
validate_teams=False)
 
     @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -213,8 +213,8 @@ class TestValidateExecutorFields:
 
         # Should call lookup twice: first for team, then for global
         assert mock_lookup.call_count == 2
-        mock_lookup.assert_any_call("global.executor", team_name="test_team")
-        mock_lookup.assert_any_call("global.executor", team_name=None)
+        mock_lookup.assert_any_call("global.executor", team_name="test_team", 
validate_teams=False)
+        mock_lookup.assert_any_call("global.executor", team_name=None, 
validate_teams=False)
 
     @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -247,8 +247,8 @@ class TestValidateExecutorFields:
 
         # Should call lookup twice: first for team, then for global fallback
         assert mock_lookup.call_count == 2
-        mock_lookup.assert_any_call("unknown.executor", team_name="test_team")
-        mock_lookup.assert_any_call("unknown.executor", team_name=None)
+        mock_lookup.assert_any_call("unknown.executor", team_name="test_team", 
validate_teams=False)
+        mock_lookup.assert_any_call("unknown.executor", team_name=None, 
validate_teams=False)
 
     @patch("airflow.dag_processing.bundles.manager.DagBundlesManager")
     @patch.object(ExecutorLoader, "lookup_executor_name_by_str")
@@ -270,7 +270,38 @@ class TestValidateExecutorFields:
             _validate_executor_fields(dag, bundle_name="test_bundle")
 
         # Should only call lookup once for team-specific executor
-        mock_lookup.assert_called_once_with("team.executor", 
team_name="test_team")
+        mock_lookup.assert_called_once_with("team.executor", 
team_name="test_team", validate_teams=False)
+
+    @pytest.mark.usefixtures("clean_executor_loader")
+    def test_validate_executor_fields_does_not_access_database(self):
+        """Regression test: executor validation during DAG parsing must not 
access the database.
+
+        In Airflow 3, DAG parsing happens in isolated subprocesses where 
database access
+        is blocked via block_orm_access(). The _validate_executor_fields 
function must
+        validate executors using only local config (validate_teams=False), 
without querying
+        the database to verify team names exist. If validate_teams were True, 
the call chain
+        would reach Team.get_all_team_names() which does a DB query, raising 
RuntimeError
+        in the parsing subprocess.
+        """
+        with DAG("test-dag", schedule=None) as dag:
+            BaseOperator(task_id="t1", executor="LocalExecutor")
+
+        with conf_vars(
+            {
+                ("core", "executor"): "LocalExecutor;team1=CeleryExecutor",
+                ("core", "multi_team"): "True",
+            }
+        ):
+            # Patch _validate_teams_exist_in_database to raise RuntimeError,
+            # simulating what happens in the DAG parsing subprocess where DB 
is blocked.
+            # If the fix is correct, this should never be called.
+            with patch.object(
+                ExecutorLoader,
+                "_validate_teams_exist_in_database",
+                side_effect=RuntimeError("Direct database access via the ORM 
is not allowed in Airflow 3.0"),
+            ):
+                # Should succeed without hitting the database
+                _validate_executor_fields(dag)
 
 
 def test_validate_executor_field_executor_not_configured():
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 6c2116836d7..07efc11b9a3 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -614,6 +614,27 @@ class TestExecutorLoader:
                 
executor_loader.ExecutorLoader.get_executor_names(validate_teams=False)
                 mock_get_team_names.assert_not_called()
 
+    def 
test_lookup_executor_name_by_str_skips_db_when_validate_teams_false(self):
+        """Regression test: lookup_executor_name_by_str with 
validate_teams=False must not access the DB.
+
+        During DAG parsing in Airflow 3, database access is blocked. The 
lookup_executor_name_by_str
+        method is called from _validate_executor_fields during parsing, so it 
must be able to resolve
+        executor names using only local config without triggering 
_validate_teams_exist_in_database.
+        """
+        with conf_vars(
+            {("core", "executor"): "=LocalExecutor;team1=CeleryExecutor", 
("core", "multi_team"): "True"}
+        ):
+            with patch.object(
+                executor_loader.ExecutorLoader,
+                "_validate_teams_exist_in_database",
+                side_effect=RuntimeError("Direct database access via the ORM 
is not allowed in Airflow 3.0"),
+            ):
+                # Should succeed without hitting the database
+                result = 
executor_loader.ExecutorLoader.lookup_executor_name_by_str(
+                    "LocalExecutor", validate_teams=False
+                )
+                assert result.alias == "LocalExecutor"
+
     def test_get_executor_names_default_validates_teams(self):
         """Test that get_executor_names validates teams by default."""
         with patch.object(executor_loader.Team, "get_all_team_names") as 
mock_get_team_names:

Reply via email to