This is an automated email from the ASF dual-hosted git repository.
ashb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2def8027d7c Correctly pre-allocate `external_exeuctor_id` with
multiple executors. (#67388)
2def8027d7c is described below
commit 2def8027d7c22f49c206593b71cc8808bd6ad642
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Mon May 25 06:58:36 2026 +0100
Correctly pre-allocate `external_exeuctor_id` with multiple executors.
(#67388)
---
.../src/airflow/jobs/scheduler_job_runner.py | 6 ++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 38 +++++++++++++++++-----
2 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d04ee85c202..6acd92fbe92 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -35,8 +35,10 @@ from typing import TYPE_CHECKING, Any, cast
from sqlalchemy import (
CTE,
+ Text,
and_,
case,
+ cast as sql_cast,
delete,
exists,
func,
@@ -953,9 +955,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
opt_in_names.add(exc.name.module_path)
whens = []
if opt_in_names:
- whens.append((TI.executor.in_(opt_in_names),
random_db_uuid()))
+ whens.append((TI.executor.in_(opt_in_names),
sql_cast(random_db_uuid(), Text)))
if default_opts_in:
- whens.append((TI.executor.is_(None), random_db_uuid()))
+ whens.append((TI.executor.is_(None),
sql_cast(random_db_uuid(), Text)))
if whens:
queued_values["external_executor_id"] = case(*whens,
else_=TI.external_executor_id)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 3580136ad70..05803f5fd5b 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2820,31 +2820,51 @@ class TestSchedulerJob:
dag_id = "SchedulerJobTest.test_executable_sets_external_executor_id"
session = settings.Session()
with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE,
session=session):
- EmptyOperator(task_id="dummy")
+ EmptyOperator(task_id="a_task_pre_assign")
+ EmptyOperator(task_id="b_task_regular")
class PreAssigningExecutor(MockExecutor):
pre_assigns_external_executor_id = True
+ mock_module_path = "mock.pre_assigning.executor"
+ mock_alias = "pre_assigning_executor"
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job,
executors=[PreAssigningExecutor()])
+ regular_exec = MockExecutor()
+ assert regular_exec.pre_assigns_external_executor_id is False,
"Pre-condition"
+
+ pre_assigning_exec = PreAssigningExecutor()
+
+ self.job_runner = SchedulerJobRunner(job=Job(),
executors=(regular_exec, pre_assigning_exec))
dr = dag_maker.create_dagrun()
- ti = dr.get_task_instance("dummy", session)
- ti.state = State.SCHEDULED
- session.merge(ti)
+ ti_pre_assign = dr.get_task_instance("a_task_pre_assign", session)
+ ti_regular = dr.get_task_instance("b_task_regular", session)
+
+ ti_regular.state = State.SCHEDULED
+ ti_regular.executor = regular_exec.name.module_path
+ ti_pre_assign.state = State.SCHEDULED
+ ti_pre_assign.executor = pre_assigning_exec.name.module_path
session.flush()
returned_tis =
self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
+ returned_tis.sort(key=lambda ti: ti.task_id)
+
+ assert len(returned_tis) == 2
- assert len(returned_tis) == 1
# In-memory object (post make_transient) should carry the UUID
+ assert returned_tis[0].id == ti_pre_assign.id
assert returned_tis[0].external_executor_id is not None
- UUID(returned_tis[0].external_executor_id)
+ assert UUID(returned_tis[0].external_executor_id), "is valid uuid"
# DB row should also have it (the whole point — survives a crash)
- db_value =
session.scalar(select(TaskInstance.external_executor_id).where(TaskInstance.id
== ti.id))
+ db_value = session.scalar(
+ select(TaskInstance.external_executor_id).where(TaskInstance.id ==
ti_pre_assign.id)
+ )
assert db_value == returned_tis[0].external_executor_id
+ # In mixed-executor mode, only TIs routed to a pre-assigning executor
get an external_executor_id.
+ assert returned_tis[1].id == ti_regular.id
+ assert returned_tis[1].external_executor_id is None
+
session.rollback()
@pytest.mark.parametrize("state", [State.FAILED, State.SUCCESS])