This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd45f5f  Fix occassional deadloc on MSSQL test DagMaker cleanup 
(#18857)
fd45f5f is described below

commit fd45f5f3e38b80993d5624480a793be381194f04
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sat Oct 9 22:36:53 2021 +0200

    Fix occassional deadloc on MSSQL test DagMaker cleanup (#18857)
    
    Occasionally our tests in CI for MsSQL failed with deadlock on
    cleaning SerilizedDag table. After closer inspection, the
    deadlock happened in the test dag_maker cleanup() code.
    
    This PR fixes it by attempting to retry the cleaning in case
    of deadlock.
---
 tests/conftest.py | 49 +++++++++++++++++++++++++++++--------------------
 1 file changed, 29 insertions(+), 20 deletions(-)

diff --git a/tests/conftest.py b/tests/conftest.py
index 36e88b3..22770c5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -473,7 +473,9 @@ def dag_maker(request):
     if serialized_marker:
         (want_serialized,) = serialized_marker.args or (True,)
 
-    class DagFactory:
+    from airflow.utils.log.logging_mixin import LoggingMixin
+
+    class DagFactory(LoggingMixin):
         def __init__(self):
             from airflow.models import DagBag
 
@@ -601,25 +603,32 @@ def dag_maker(request):
         def cleanup(self):
             from airflow.models import DagModel, DagRun, TaskInstance, XCom
             from airflow.models.serialized_dag import SerializedDagModel
-
-            dag_ids = list(self.dagbag.dag_ids)
-            if not dag_ids:
-                return
-            # To isolate problems here with problems from elsewhere on the 
session object
-            self.session.flush()
-
-            
self.session.query(SerializedDagModel).filter(SerializedDagModel.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-            
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(synchronize_session=False)
-            
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
-                synchronize_session=False
-            )
-            self.session.commit()
+            from airflow.utils.retries import run_with_db_retries
+
+            for attempt in run_with_db_retries(logger=self.log):
+                with attempt:
+                    dag_ids = list(self.dagbag.dag_ids)
+                    if not dag_ids:
+                        return
+                    # To isolate problems here with problems from elsewhere on 
the session object
+                    self.session.flush()
+
+                    self.session.query(SerializedDagModel).filter(
+                        SerializedDagModel.dag_id.in_(dag_ids)
+                    ).delete(synchronize_session=False)
+                    
self.session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(TaskInstance).filter(TaskInstance.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(XCom).filter(XCom.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    
self.session.query(DagModel).filter(DagModel.dag_id.in_(dag_ids)).delete(
+                        synchronize_session=False
+                    )
+                    self.session.commit()
 
     factory = DagFactory()
 

Reply via email to