This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12d38e1a55 Clean up unused triggers in a single query for all dialects
except MySQL (#38663)
12d38e1a55 is described below
commit 12d38e1a5592e230a9843aae9090837285b51b97
Author: Hussein Awala <[email protected]>
AuthorDate: Tue Apr 2 06:14:35 2024 +0200
Clean up unused triggers in a single query for all dialects except MySQL
(#38663)
---
airflow/models/trigger.py | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c3d6919645..225e729489 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -169,14 +169,16 @@ class Trigger(Base):
.values(trigger_id=None)
)
- # Get all triggers that have no task instances depending on them...
- ids = session.scalars(
+ # Get all triggers that have no task instances depending on them and
delete them
+ ids = (
select(cls.id)
.join(TaskInstance, cls.id == TaskInstance.trigger_id,
isouter=True)
.group_by(cls.id)
.having(func.count(TaskInstance.trigger_id) == 0)
- ).all()
- # ...and delete them (we can't do this in one query due to MySQL)
+ )
+ if session.bind.dialect.name == "mysql":
+ # MySQL doesn't support DELETE with JOIN, so we need to do it in
two steps
+ ids = session.scalars(ids).all()
session.execute(
delete(Trigger).where(Trigger.id.in_(ids)).execution_options(synchronize_session=False)
)