This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new ad0b99b2cb1 [v3-2-test] Fix deadlock in ti_update_state caused by FOR 
UPDATE locking dag_run (#67246) (#67264)
ad0b99b2cb1 is described below

commit ad0b99b2cb16ecbcd421e4031da8b27d9901c2bf
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 21 08:14:00 2026 +0530

    [v3-2-test] Fix deadlock in ti_update_state caused by FOR UPDATE locking 
dag_run (#67246) (#67264)
    
    session.get(TI, id, with_for_update=True) emits a SELECT that joins
    dag_run (via the lazy="joined" relationship) and applies FOR UPDATE to
    both tables. Under concurrent task completions this serialises all
    workers on the same dag_run row, producing deadlock cycles with the
    scheduler's trigger-rule dependency checks.
    
    Three other callsites in this file already use with_for_update={"of": TI}
    for exactly this reason. Apply the same fix to the two remaining callsites
    in _create_ti_state_update_query_and_update_state and its error-recovery
    path.
    (cherry picked from commit 315d1591644629cca400e723769ba01408b343f6)
    
    Co-authored-by: Arthur <[email protected]>
---
 .../src/airflow/api_fastapi/execution_api/routes/task_instances.py    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 15feb13ac3d..44a2674efb8 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -418,7 +418,7 @@ def ti_update_state(
             "Error updating Task Instance state. Setting the task to failed.",
             payload=ti_patch_payload,
         )
-        ti = session.get(TI, task_instance_id, with_for_update=True)
+        ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
         if session.bind is not None:
             query = TI.duration_expression_update(timezone.utcnow(), query, 
session.bind)
         query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -521,7 +521,7 @@ def _create_ti_state_update_query_and_update_state(
     dag_id: str,
 ) -> tuple[Update, TaskInstanceState]:
     if isinstance(ti_patch_payload, (TITerminalStatePayload, 
TIRetryStatePayload, TISuccessStatePayload)):
-        ti = session.get(TI, task_instance_id, with_for_update=True)
+        ti = session.get(TI, task_instance_id, with_for_update={"of": TI})
         updated_state = TaskInstanceState(ti_patch_payload.state.value)
         if session.bind is not None:
             query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)

Reply via email to