This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eb6844f58cb SQLA2: avoid unnecessary locking when working with TIs
(#59686)
eb6844f58cb is described below
commit eb6844f58cb2a0a46d2222bdc78c864d38d05887
Author: Dev-iL <[email protected]>
AuthorDate: Mon Dec 22 21:00:48 2025 +0200
SQLA2: avoid unnecessary locking when working with TIs (#59686)
In SQLAlchemy 2.0, we need to explicitly tell get() not to issue a new
SELECT for an already-loaded object. Using `with_for_update=True` tells it to
use the locked version.
---
.../execution_api/routes/task_instances.py | 28 +++++++++++-----------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 514516e68d3..b5e8b172307 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -412,7 +412,7 @@ def ti_update_state(
"Error updating Task Instance state. Setting the task to failed.",
payload=ti_patch_payload,
)
- ti = session.get(TI, ti_id_str)
+ ti = session.get(TI, ti_id_str, with_for_update=True)
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(), query,
session.bind)
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -462,7 +462,7 @@ def _create_ti_state_update_query_and_update_state(
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload,
TIRetryStatePayload, TISuccessStatePayload)):
- ti = session.get(TI, ti_id_str)
+ ti = session.get(TI, ti_id_str, with_for_update=True)
updated_state = TaskInstanceState(ti_patch_payload.state.value)
if session.bind is not None:
query = TI.duration_expression_update(ti_patch_payload.end_date,
query, session.bind)
@@ -544,22 +544,22 @@ def _create_ti_state_update_query_and_update_state(
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(),
query, session.bind)
query = query.values(state=TaskInstanceState.FAILED)
- ti = session.get(TI, ti_id_str)
- if ti is not None:
- _handle_fail_fast_for_dag(ti=ti, dag_id=dag_id,
session=session, dag_bag=dag_bag)
+ # We skip fail_fast handling in this error case to avoid
fetching the TI object while the row
+ # is still locked from the earlier with_for_update() query,
which might cause deadlock issues
+ # in SQLA2. The task is marked as FAILED regardless.
return query, TaskInstanceState.FAILED
- task_instance = session.get(TI, ti_id_str)
+ # We can directly use ti_id_str instead of fetching the TaskInstance
object to avoid SQLA2
+ # lock contention issues when the TaskInstance row is already locked
from before.
actual_start_date = timezone.utcnow()
- if task_instance is not None and task_instance.id is not None:
- session.add(
- TaskReschedule(
- UUID(str(task_instance.id)),
- actual_start_date,
- ti_patch_payload.end_date,
- ti_patch_payload.reschedule_date,
- )
+ session.add(
+ TaskReschedule(
+ ti_id_str,
+ actual_start_date,
+ ti_patch_payload.end_date,
+ ti_patch_payload.reschedule_date,
)
+ )
query = update(TI).where(TI.id == ti_id_str)
# calculate the duration for TI table too