This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eb6844f58cb SQLA2: avoid unnecessary locking when working with TIs 
(#59686)
eb6844f58cb is described below

commit eb6844f58cb2a0a46d2222bdc78c864d38d05887
Author: Dev-iL <[email protected]>
AuthorDate: Mon Dec 22 21:00:48 2025 +0200

    SQLA2: avoid unnecessary locking when working with TIs (#59686)
    
    In SQLAlchemy 2.0, we need to explicitly tell get() not to issue a new 
SELECT for an already-loaded object. Using `with_for_update=True` tells it to 
use the locked version.
---
 .../execution_api/routes/task_instances.py         | 28 +++++++++++-----------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 514516e68d3..b5e8b172307 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -412,7 +412,7 @@ def ti_update_state(
             "Error updating Task Instance state. Setting the task to failed.",
             payload=ti_patch_payload,
         )
-        ti = session.get(TI, ti_id_str)
+        ti = session.get(TI, ti_id_str, with_for_update=True)
         if session.bind is not None:
             query = TI.duration_expression_update(timezone.utcnow(), query, 
session.bind)
         query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -462,7 +462,7 @@ def _create_ti_state_update_query_and_update_state(
     dag_id: str,
 ) -> tuple[Update, TaskInstanceState]:
     if isinstance(ti_patch_payload, (TITerminalStatePayload, 
TIRetryStatePayload, TISuccessStatePayload)):
-        ti = session.get(TI, ti_id_str)
+        ti = session.get(TI, ti_id_str, with_for_update=True)
         updated_state = TaskInstanceState(ti_patch_payload.state.value)
         if session.bind is not None:
             query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
@@ -544,22 +544,22 @@ def _create_ti_state_update_query_and_update_state(
                 if session.bind is not None:
                     query = TI.duration_expression_update(timezone.utcnow(), 
query, session.bind)
                 query = query.values(state=TaskInstanceState.FAILED)
-                ti = session.get(TI, ti_id_str)
-                if ti is not None:
-                    _handle_fail_fast_for_dag(ti=ti, dag_id=dag_id, 
session=session, dag_bag=dag_bag)
+                # We skip fail_fast handling in this error case to avoid 
fetching the TI object while the row
+                # is still locked from the earlier with_for_update() query, 
which might cause deadlock issues
+                # in SQLA2. The task is marked as FAILED regardless.
                 return query, TaskInstanceState.FAILED
 
-        task_instance = session.get(TI, ti_id_str)
+        # We can directly use ti_id_str instead of fetching the TaskInstance 
object to avoid SQLA2
+        #  lock contention issues when the TaskInstance row is already locked 
from before.
         actual_start_date = timezone.utcnow()
-        if task_instance is not None and task_instance.id is not None:
-            session.add(
-                TaskReschedule(
-                    UUID(str(task_instance.id)),
-                    actual_start_date,
-                    ti_patch_payload.end_date,
-                    ti_patch_payload.reschedule_date,
-                )
+        session.add(
+            TaskReschedule(
+                ti_id_str,
+                actual_start_date,
+                ti_patch_payload.end_date,
+                ti_patch_payload.reschedule_date,
             )
+        )
 
         query = update(TI).where(TI.id == ti_id_str)
         # calculate the duration for TI table too

Reply via email to