This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cd713ab7cb9 Use FOR KEY SHARE UPDATE instead of FOR UPDATE (#42082)
cd713ab7cb9 is described below

commit cd713ab7cb9588559ed44be789db0dbc2c450807
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Nov 26 09:31:34 2024 -0800

    Use FOR KEY SHARE UPDATE instead of FOR UPDATE (#42082)
    
    Generally speaking, the goal of our SELECT FOR UPDATE statements is 
concurrency control.  We want to make sure that nothing else is handling the 
dag run (or ti) at the same time.  In no case that I'm aware of, are we 
actually going to update the PK value for the record.  So we can take the 
weaker lock, and still have that concurrency control.  But this only works on 
postgres.  Mysql has no such weaker for update lock.
    
    One negative consequence of taking the stronger lock, is it blocks any 
insert into a table of a row that references the locked row in a FK.  This can 
cause slowdowns and pileups in some circumstances.  In one case we do an extra 
FOR UPDATE lock because that FK wait issue was associated with a deadlock. This 
change makes that extra unnecessary in the postgres case so we remove it here.  
And with the mysql case in mind, we add a commit to avoid the deadlocking.
---
 airflow/models/taskinstance.py | 29 ++++++++++-------------------
 airflow/utils/sqlalchemy.py    |  4 ++++
 tests/utils/test_sqlalchemy.py |  2 +-
 3 files changed, 15 insertions(+), 20 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6c8798bdec5..324308f6f61 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1673,8 +1673,6 @@ def _handle_reschedule(
 
     ti = _coalesce_to_orm_ti(ti=ti, session=session)
 
-    from airflow.models.dagrun import DagRun  # Avoid circular import
-
     ti.refresh_from_db(session)
 
     if TYPE_CHECKING:
@@ -1683,16 +1681,16 @@ def _handle_reschedule(
     ti.end_date = timezone.utcnow()
     ti.set_duration()
 
-    # Lock DAG run to be sure not to get into a deadlock situation when trying 
to insert
-    # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
-    with_row_locks(
-        session.query(DagRun).filter_by(
-            dag_id=ti.dag_id,
-            run_id=ti.run_id,
-        ),
-        session=session,
-    ).one()
-    # Log reschedule request
+    # set state
+    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+    ti.clear_next_method_args()
+
+    session.merge(ti)
+    session.commit()
+
+    # we add this in separate commit to reduce likelihood of deadlock
+    # see https://github.com/apache/airflow/pull/21362 for more info
     session.add(
         TaskReschedule(
             ti.task_id,
@@ -1705,13 +1703,6 @@ def _handle_reschedule(
             ti.map_index,
         )
     )
-
-    # set state
-    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
-
-    ti.clear_next_method_args()
-
-    session.merge(ti)
     session.commit()
     return ti
 
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 14dd7cee1b2..541898c1a1c 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -312,6 +312,7 @@ def with_row_locks(
     *,
     nowait: bool = False,
     skip_locked: bool = False,
+    key_share: bool = True,
     **kwargs,
 ) -> Query:
     """
@@ -329,6 +330,7 @@ def with_row_locks(
     :param session: ORM Session
     :param nowait: If set to True, will pass NOWAIT to supported database 
backends.
     :param skip_locked: If set to True, will pass SKIP LOCKED to supported 
database backends.
+    :param key_share: If true, will lock with FOR KEY SHARE UPDATE (at least 
on postgres).
     :param kwargs: Extra kwargs to pass to with_for_update (of, nowait, 
skip_locked, etc)
     :return: updated query
     """
@@ -343,6 +345,8 @@ def with_row_locks(
         kwargs["nowait"] = True
     if skip_locked:
         kwargs["skip_locked"] = True
+    if key_share:
+        kwargs["key_share"] = True
     return query.with_for_update(**kwargs)
 
 
diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py
index 70e60203bdd..45a5ec0fe37 100644
--- a/tests/utils/test_sqlalchemy.py
+++ b/tests/utils/test_sqlalchemy.py
@@ -147,7 +147,7 @@ class TestSqlAlchemyUtils:
             returned_value = with_row_locks(query=query, session=session, 
nowait=True)
 
         if expected_use_row_level_lock:
-            query.with_for_update.assert_called_once_with(nowait=True)
+            query.with_for_update.assert_called_once_with(nowait=True, 
key_share=True)
         else:
             assert returned_value == query
             query.with_for_update.assert_not_called()

Reply via email to