This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd713ab7cb9 Use FOR KEY SHARE UPDATE instead of FOR UPDATE (#42082)
cd713ab7cb9 is described below
commit cd713ab7cb9588559ed44be789db0dbc2c450807
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Nov 26 09:31:34 2024 -0800
Use FOR KEY SHARE UPDATE instead of FOR UPDATE (#42082)
Generally speaking, the goal of our SELECT FOR UPDATE statements is
concurrency control. We want to make sure that nothing else is handling the
dag run (or ti) at the same time. In no case that I'm aware of, are we
actually going to update the PK value for the record. So we can take the
weaker lock, and still have that concurrency control. But this only works on
postgres. Mysql has no such weaker for update lock.
One negative consequence of taking the stronger lock, is it blocks any
insert into a table of a row that references the locked row in a FK. This can
cause slowdowns and pileups in some circumstances. In one case we do an extra
FOR UPDATE lock because that FK wait issue was associated with a deadlock. This
change makes that extra unnecessary in the postgres case so we remove it here.
And with the mysql case in mind, we add a commit to avoid the deadlocking.
---
airflow/models/taskinstance.py | 29 ++++++++++-------------------
airflow/utils/sqlalchemy.py | 4 ++++
tests/utils/test_sqlalchemy.py | 2 +-
3 files changed, 15 insertions(+), 20 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6c8798bdec5..324308f6f61 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1673,8 +1673,6 @@ def _handle_reschedule(
ti = _coalesce_to_orm_ti(ti=ti, session=session)
- from airflow.models.dagrun import DagRun # Avoid circular import
-
ti.refresh_from_db(session)
if TYPE_CHECKING:
@@ -1683,16 +1681,16 @@ def _handle_reschedule(
ti.end_date = timezone.utcnow()
ti.set_duration()
- # Lock DAG run to be sure not to get into a deadlock situation when trying
to insert
- # TaskReschedule which apparently also creates lock on corresponding
DagRun entity
- with_row_locks(
- session.query(DagRun).filter_by(
- dag_id=ti.dag_id,
- run_id=ti.run_id,
- ),
- session=session,
- ).one()
- # Log reschedule request
+ # set state
+ ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+ ti.clear_next_method_args()
+
+ session.merge(ti)
+ session.commit()
+
+ # we add this in separate commit to reduce likelihood of deadlock
+ # see https://github.com/apache/airflow/pull/21362 for more info
session.add(
TaskReschedule(
ti.task_id,
@@ -1705,13 +1703,6 @@ def _handle_reschedule(
ti.map_index,
)
)
-
- # set state
- ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
-
- ti.clear_next_method_args()
-
- session.merge(ti)
session.commit()
return ti
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 14dd7cee1b2..541898c1a1c 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -312,6 +312,7 @@ def with_row_locks(
*,
nowait: bool = False,
skip_locked: bool = False,
+ key_share: bool = True,
**kwargs,
) -> Query:
"""
@@ -329,6 +330,7 @@ def with_row_locks(
:param session: ORM Session
:param nowait: If set to True, will pass NOWAIT to supported database
backends.
:param skip_locked: If set to True, will pass SKIP LOCKED to supported
database backends.
+ :param key_share: If true, will lock with FOR KEY SHARE UPDATE (at least
on postgres).
:param kwargs: Extra kwargs to pass to with_for_update (of, nowait,
skip_locked, etc)
:return: updated query
"""
@@ -343,6 +345,8 @@ def with_row_locks(
kwargs["nowait"] = True
if skip_locked:
kwargs["skip_locked"] = True
+ if key_share:
+ kwargs["key_share"] = True
return query.with_for_update(**kwargs)
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 70e60203bdd..45a5ec0fe37 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -147,7 +147,7 @@ class TestSqlAlchemyUtils:
returned_value = with_row_locks(query=query, session=session,
nowait=True)
if expected_use_row_level_lock:
- query.with_for_update.assert_called_once_with(nowait=True)
+ query.with_for_update.assert_called_once_with(nowait=True,
key_share=True)
else:
assert returned_value == query
query.with_for_update.assert_not_called()