This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4284d03e98df5701a3e41cbf0826b2dccbefacee Author: Jarek Potiuk <[email protected]> AuthorDate: Wed Jun 1 19:54:40 2022 +0200 Handle occasional deadlocks in trigger with retries (#24071) Fixes: #23639 (cherry picked from commit d86ae090350de97e385ca4aaf128235f4c21f158) --- airflow/models/trigger.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py index c1ccdd4964..2f332393f9 100644 --- a/airflow/models/trigger.py +++ b/airflow/models/trigger.py @@ -24,6 +24,7 @@ from airflow.models.base import Base from airflow.models.taskinstance import TaskInstance from airflow.triggers.base import BaseTrigger from airflow.utils import timezone +from airflow.utils.retries import run_with_db_retries from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime from airflow.utils.state import State @@ -88,9 +89,11 @@ class Trigger(Base): (triggers have a one-to-many relationship to both) """ # Update all task instances with trigger IDs that are not DEFERRED to remove them - session.query(TaskInstance).filter( - TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None) - ).update({TaskInstance.trigger_id: None}) + for attempt in run_with_db_retries(): + with attempt: + session.query(TaskInstance).filter( + TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None) + ).update({TaskInstance.trigger_id: None}) # Get all triggers that have no task instances depending on them... ids = [ trigger_id
