This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8d4617a48e4ba0b43bc11653abf72cf307c26d84
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Sep 23 21:05:02 2025 +0100

    Fix issue where LocalExecutor could start tasks before the state was 
commited (#56010)
    
    * Fix issue where LocalExecutor could start tasks before the state was 
commited
    
    With some recent changes LocalExec was now able to start a task _too 
quickly_,
    and due to it's custom implementation of `queue_workload` it was directly
    sending the message to the MP queue the task in queue_workload, which means 
if
    there is an idle worker process already it will pick it up "instantly" --
    crucially before the database transaction in with TI.state is changed from
    scheduled to queued, is committed!
    
    The fix here is to correctly follow the BaseExecutor interface, and not 
start
    send the workloads for processing until heartbeat is called (which happens 
in
    the scheduler right after the transaction is committed.)
    
    * Fix test
    
    ---------
    
    Co-authored-by: Kaxil Naik <[email protected]>
    (cherry picked from commit 4394b8fdf1756210cda2e7b7971fafab1b1b5c55)
---
 airflow-core/src/airflow/executors/local_executor.py     | 12 +++++-------
 airflow-core/tests/unit/executors/test_local_executor.py | 10 ++++++++--
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index d85bb210e4d..1c15fca7d9c 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -36,7 +36,6 @@ from typing import TYPE_CHECKING
 
 from airflow.executors import workloads
 from airflow.executors.base_executor import PARALLELISM, BaseExecutor
-from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.state import TaskInstanceState
 
 # add logger to parameter of setproctitle to support logging
@@ -48,8 +47,6 @@ else:
     setproctitle = lambda title, logger: real_setproctitle(title)
 
 if TYPE_CHECKING:
-    from sqlalchemy.orm import Session
-
     TaskInstanceStateType = tuple[workloads.TaskInstance, TaskInstanceState, 
Exception | None]
 
 
@@ -253,9 +250,10 @@ class LocalExecutor(BaseExecutor):
     def terminate(self):
         """Terminate the executor is not doing anything."""
 
-    @provide_session
-    def queue_workload(self, workload: workloads.All, session: Session = 
NEW_SESSION):
-        self.activity_queue.put(workload)
+    def _process_workloads(self, workloads):
+        for workload in workloads:
+            self.activity_queue.put(workload)
+            del self.queued_tasks[workload.ti.key]
         with self._unread_messages:
-            self._unread_messages.value += 1
+            self._unread_messages.value += len(workloads)
         self._check_workers()
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index ec7102e2e70..6489746f220 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -28,6 +28,7 @@ from uuid6 import uuid7
 from airflow._shared.timezones import timezone
 from airflow.executors import workloads
 from airflow.executors.local_executor import LocalExecutor, _execute_work
+from airflow.settings import Session
 from airflow.utils.state import State
 
 from tests_common.test_utils.config import conf_vars
@@ -97,7 +98,8 @@ class TestLocalExecutor:
                         dag_rel_path="some/path",
                         log_path=None,
                         bundle_info=dict(name="hi", version="hi"),
-                    )
+                    ),
+                    session=mock.MagicMock(spec=Session),
                 )
 
             executor.queue_workload(
@@ -107,9 +109,13 @@ class TestLocalExecutor:
                     dag_rel_path="some/path",
                     log_path=None,
                     bundle_info=dict(name="hi", version="hi"),
-                )
+                ),
+                session=mock.MagicMock(spec=Session),
             )
 
+            # Process queued workloads to trigger worker spawning
+            executor._process_workloads(list(executor.queued_tasks.values()))
+
             executor.end()
 
             expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else 
parallelism

Reply via email to