This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fd45f5f Fix occassional deadloc on MSSQL test DagMaker cleanup
(#18857)
fd45f5f is described below
commit fd45f5f3e38b80993d5624480a793be381194f04
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Oct 9 22:36:53 2021 +0200
Fix occassional deadloc on MSSQL test DagMaker cleanup (#18857)
Occasionally our tests in CI for MsSQL failed with deadlock on
cleaning SerilizedDag table. After closer inspection, the
deadlock happened in the test dag_maker cleanup() code.
This PR fixes it by attempting to retry the cleaning in case
of deadlock.
---
tests/conftest.py | 49 +++++++++++++++++++++++++++++--------------------
1 file changed, 29 insertions(+), 20 deletions(-)
diff --git a/tests/conftest.py b/tests/conftest.py
index 36e88b3..22770c5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -473,7 +473,9 @@ def dag_maker(request):
if serialized_marker:
(want_serialized,) = serialized_marker.args or (True,)
- class DagFactory:
+ from airflow.utils.log.logging_mixin import LoggingMixin
+
+ class DagFactory(LoggingMixin):
def __init__(self):
from airflow.models import DagBag
@@ -601,25 +603,32 @@ def dag_maker(request):
def cleanup(self):
from airflow.models import DagModel, DagRun, TaskInstance, XCom
from airflow.models.serialized_dag import SerializedDagModel
-
- dag_ids = list(self.dagbag.dag_ids)
- if not dag_ids:
- return
- # To isolate problems here with problems from elsewhere on the
session object
- self.session.flush()
-
-
self.session.query(SerializedDagModel).filter(SerializedDagModel.dag_id.in_(dag_ids)).delete(
- synchronize_session=False
- )
-
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
- synchronize_session=False
- )
-
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
- synchronize_session=False
- )
- self.session.commit()
+ from airflow.utils.retries import run_with_db_retries
+
+ for attempt in run_with_db_retries(logger=self.log):
+ with attempt:
+ dag_ids = list(self.dagbag.dag_ids)
+ if not dag_ids:
+ return
+ # To isolate problems here with problems from elsewhere on
the session object
+ self.session.flush()
+
+ self.session.query(SerializedDagModel).filter(
+ SerializedDagModel.dag_id.in_(dag_ids)
+ ).delete(synchronize_session=False)
+
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(
+ synchronize_session=False
+ )
+
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
+ synchronize_session=False
+ )
+
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(
+ synchronize_session=False
+ )
+
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
+ synchronize_session=False
+ )
+ self.session.commit()
factory = DagFactory()