This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8d4617a48e4ba0b43bc11653abf72cf307c26d84 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Sep 23 21:05:02 2025 +0100 Fix issue where LocalExecutor could start tasks before the state was commited (#56010) * Fix issue where LocalExecutor could start tasks before the state was commited With some recent changes LocalExec was now able to start a task _too quickly_, and due to it's custom implementation of `queue_workload` it was directly sending the message to the MP queue the task in queue_workload, which means if there is an idle worker process already it will pick it up "instantly" -- crucially before the database transaction in with TI.state is changed from scheduled to queued, is committed! The fix here is to correctly follow the BaseExecutor interface, and not start send the workloads for processing until heartbeat is called (which happens in the scheduler right after the transaction is committed.) * Fix test --------- Co-authored-by: Kaxil Naik <[email protected]> (cherry picked from commit 4394b8fdf1756210cda2e7b7971fafab1b1b5c55) --- airflow-core/src/airflow/executors/local_executor.py | 12 +++++------- airflow-core/tests/unit/executors/test_local_executor.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/executors/local_executor.py b/airflow-core/src/airflow/executors/local_executor.py index d85bb210e4d..1c15fca7d9c 100644 --- a/airflow-core/src/airflow/executors/local_executor.py +++ b/airflow-core/src/airflow/executors/local_executor.py @@ -36,7 +36,6 @@ from typing import TYPE_CHECKING from airflow.executors import workloads from airflow.executors.base_executor import PARALLELISM, BaseExecutor -from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState # add logger to parameter of setproctitle to support logging @@ -48,8 +47,6 @@ else: setproctitle = lambda title, logger: real_setproctitle(title) if TYPE_CHECKING: - from sqlalchemy.orm import Session - TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, Exception | None] @@ -253,9 +250,10 @@ class LocalExecutor(BaseExecutor): def terminate(self): """Terminate the executor is not doing anything.""" - @provide_session - def queue_workload(self, workload: workloads.All, session: Session = NEW_SESSION): - self.activity_queue.put(workload) + def _process_workloads(self, workloads): + for workload in workloads: + self.activity_queue.put(workload) + del self.queued_tasks[workload.ti.key] with self._unread_messages: - self._unread_messages.value += 1 + self._unread_messages.value += len(workloads) self._check_workers() diff --git a/airflow-core/tests/unit/executors/test_local_executor.py b/airflow-core/tests/unit/executors/test_local_executor.py index ec7102e2e70..6489746f220 100644 --- a/airflow-core/tests/unit/executors/test_local_executor.py +++ b/airflow-core/tests/unit/executors/test_local_executor.py @@ -28,6 +28,7 @@ from uuid6 import uuid7 from airflow._shared.timezones import timezone from airflow.executors import workloads from airflow.executors.local_executor import LocalExecutor, _execute_work +from airflow.settings import Session from airflow.utils.state import State from tests_common.test_utils.config import conf_vars @@ -97,7 +98,8 @@ class TestLocalExecutor: dag_rel_path="some/path", log_path=None, bundle_info=dict(name="hi", version="hi"), - ) + ), + session=mock.MagicMock(spec=Session), ) executor.queue_workload( @@ -107,9 +109,13 @@ class TestLocalExecutor: dag_rel_path="some/path", log_path=None, bundle_info=dict(name="hi", version="hi"), - ) + ), + session=mock.MagicMock(spec=Session), ) + # Process queued workloads to trigger worker spawning + executor._process_workloads(list(executor.queued_tasks.values())) + executor.end() expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism
