uranusjr commented on code in PR #30330:
URL: https://github.com/apache/airflow/pull/30330#discussion_r1161512150


##########
airflow/www/utils.py:
##########
@@ -812,6 +819,56 @@ def get_col_default(self, col_name: str) -> Any:
     filter_converter_class = AirflowFilterConverter
 
 
+class DagRunCustomSQLAInterface(CustomSQLAInterface):
+    """
+    Overwrite of custom delete and delete_all methods for speeding up
+    the DagRun deletion when DagRun has a lot of task instances.
+    The default cascade deletion was performing very slowly when task 
instances were more than 10k.
+
+    """
+
+    def delete(self, item: Model, raise_exception: bool = False) -> bool:
+        try:
+            self.session.query(TaskInstance).where(TaskInstance.run_id == 
item.run_id).delete()
+            return super().delete(item, raise_exception=raise_exception)
+        except IntegrityError as e:
+            self.message = (as_unicode(self.delete_integrity_error_message), 
"warning")
+            log.warning(LOGMSG_WAR_DBI_DEL_INTEGRITY.format(str(e)))
+            self.session.rollback()
+            if raise_exception:
+                raise e
+            return False
+        except Exception as e:
+            self.message = (
+                as_unicode(self.general_error_message + " " + 
str(sys.exc_info()[0])),
+                "danger",
+            )
+            log.exception(LOGMSG_ERR_DBI_DEL_GENERIC.format(str(e)))
+            self.session.rollback()
+            if raise_exception:
+                raise e
+            return False

Review Comment:
   ```suggestion
           self.session.query(TaskInstance).where(TaskInstance.run_id == 
item.run_id).delete()
           return super().delete(item, raise_exception=raise_exception)
   ```
   
   I’m not 100% sure, but since `super().delete` contains some re-raising, 
duplicating the exception blocks here would cause an error to be logged twice 
(one in `super()` and one here). And since those errors can only happen when 
you call `commit()` (in `super()`), error handling here is never reached.



##########
airflow/www/utils.py:
##########
@@ -812,6 +819,56 @@ def get_col_default(self, col_name: str) -> Any:
     filter_converter_class = AirflowFilterConverter
 
 
+class DagRunCustomSQLAInterface(CustomSQLAInterface):
+    """
+    Overwrite of custom delete and delete_all methods for speeding up
+    the DagRun deletion when DagRun has a lot of task instances.
+    The default cascade deletion was performing very slowly when task 
instances were more than 10k.
+
+    """
+
+    def delete(self, item: Model, raise_exception: bool = False) -> bool:
+        try:
+            self.session.query(TaskInstance).where(TaskInstance.run_id == 
item.run_id).delete()
+            return super().delete(item, raise_exception=raise_exception)
+        except IntegrityError as e:
+            self.message = (as_unicode(self.delete_integrity_error_message), 
"warning")
+            log.warning(LOGMSG_WAR_DBI_DEL_INTEGRITY.format(str(e)))
+            self.session.rollback()
+            if raise_exception:
+                raise e
+            return False
+        except Exception as e:
+            self.message = (
+                as_unicode(self.general_error_message + " " + 
str(sys.exc_info()[0])),
+                "danger",
+            )
+            log.exception(LOGMSG_ERR_DBI_DEL_GENERIC.format(str(e)))
+            self.session.rollback()
+            if raise_exception:
+                raise e
+            return False
+
+    def delete_all(self, items: list[Model]) -> bool:
+        try:
+            for item in items:
+                self.session.query(TaskInstance).where(TaskInstance.run_id == 
item.run_id).delete()
+            return super().delete_all(items)
+        except IntegrityError as e:
+            self.message = (as_unicode(self.delete_integrity_error_message), 
"warning")
+            log.warning(LOGMSG_WAR_DBI_DEL_INTEGRITY.format(str(e)))
+            self.session.rollback()
+            return False
+        except Exception as e:
+            self.message = (
+                as_unicode(self.general_error_message + " " + 
str(sys.exc_info()[0])),
+                "danger",
+            )
+            log.exception(LOGMSG_ERR_DBI_DEL_GENERIC.format(str(e)))
+            self.session.rollback()
+            return False

Review Comment:
   See above, the same should apply here (if I’m correct). I also wonder if 
it’s better to use `IN` here:
   
   ```python
   run_ids = item.run_id for item in items
   
self.session.query(TaskInstance).where(TaskInstance.run_id.in_(run_ids)).delete()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to