This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e9787265ac Allow teams to use global executors by default (#59116)
7e9787265ac is described below
commit 7e9787265ac929b4d2943c3975cfc7b99790914d
Author: Niko Oliveira <[email protected]>
AuthorDate: Sun Dec 7 13:59:16 2025 -0800
Allow teams to use global executors by default (#59116)
If someone has configured a team, but has not added any executors to it,
allow the team to then use the default global executor (which we enforce
must exist). This was always possible but previously had to be
configured directly in the tasks/dags, now it will be used by default.
---
.../src/airflow/jobs/scheduler_job_runner.py | 4 +++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 32 ++++++++++++++++++++++
2 files changed, 36 insertions(+)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index a7fd0049810..e50af60db88 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2935,6 +2935,10 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# First executor that resolves should be the default for
that team
if _executor.team_name == team_name:
executor = _executor
+ break
+ else:
+ # No executor found for that team, fall back to global
default
+ executor = self.job.executor
else:
# An executor is specified on the TaskInstance (as a str), so we
need to find it in the list of executors
for _executor in self.job.executors:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 767159bde59..b1c38c1c120 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -7570,6 +7570,38 @@ class TestSchedulerJob:
# Should return the team-specific default executor set above
assert result == mock_executors[1]
+ @conf_vars({("core", "multi_team"): "true"})
+ def
test_multi_team_try_to_load_executor_no_explicit_executor_with_team_no_team_default(
+ self, dag_maker, mock_executors, session
+ ):
+ """Test executor selection when no explicit executor but team exists
and team has no executors (should
+ fallback to the global executor)."""
+ clear_db_teams()
+ clear_db_dag_bundles()
+
+ team = Team(name="team_a")
+ session.add(team)
+ session.flush()
+
+ bundle = DagBundleModel(name="bundle_a")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker(dag_id="dag_a", bundle_name="bundle_a",
session=session):
+ task = EmptyOperator(task_id="test_task") # No explicit executor
+
+ dr = dag_maker.create_dagrun()
+ ti = dr.get_task_instance(task.task_id, session)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ result = self.job_runner._try_to_load_executor(ti, session)
+
+ # Should return the team-specific default executor set above
+ assert result == mock_executors[0]
+
@conf_vars({("core", "multi_team"): "true"})
def test_multi_team_try_to_load_executor_explicit_executor_matches_team(
self, dag_maker, mock_executors, session