This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f094e6f917447fed4fbd986175432176f448fe93 Author: Jarek Potiuk <[email protected]> AuthorDate: Sat Oct 9 22:36:53 2021 +0200 Fix occassional deadloc on MSSQL test DagMaker cleanup (#18857) Occasionally our tests in CI for MsSQL failed with deadlock on cleaning SerilizedDag table. After closer inspection, the deadlock happened in the test dag_maker cleanup() code. This PR fixes it by attempting to retry the cleaning in case of deadlock. (cherry picked from commit fd45f5f3e38b80993d5624480a793be381194f04) --- tests/conftest.py | 49 +++++++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 36e88b3..22770c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -473,7 +473,9 @@ def dag_maker(request): if serialized_marker: (want_serialized,) = serialized_marker.args or (True,) - class DagFactory: + from airflow.utils.log.logging_mixin import LoggingMixin + + class DagFactory(LoggingMixin): def __init__(self): from airflow.models import DagBag @@ -601,25 +603,32 @@ def dag_maker(request): def cleanup(self): from airflow.models import DagModel, DagRun, TaskInstance, XCom from airflow.models.serialized_dag import SerializedDagModel - - dag_ids = list(self.dagbag.dag_ids) - if not dag_ids: - return - # To isolate problems here with problems from elsewhere on the session object - self.session.flush() - - self.session.query(SerializedDagModel).filter(SerializedDagModel.dag_id.in_(dag_ids)).delete( - synchronize_session=False - ) - self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(synchronize_session=False) - self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete( - synchronize_session=False - ) - self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(synchronize_session=False) - self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete( - synchronize_session=False - ) - self.session.commit() + from airflow.utils.retries import run_with_db_retries + + for attempt in run_with_db_retries(logger=self.log): + with attempt: + dag_ids = list(self.dagbag.dag_ids) + if not dag_ids: + return + # To isolate problems here with problems from elsewhere on the session object + self.session.flush() + + self.session.query(SerializedDagModel).filter( + SerializedDagModel.dag_id.in_(dag_ids) + ).delete(synchronize_session=False) + self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete( + synchronize_session=False + ) + self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete( + synchronize_session=False + ) + self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete( + synchronize_session=False + ) + self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete( + synchronize_session=False + ) + self.session.commit() factory = DagFactory()
