This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2b26a76803 Add sorting logic by created_date for fetching triggers
(#31151)
2b26a76803 is described below
commit 2b26a76803bb41abd05b13e444fb4b51620525cd
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed May 10 01:35:03 2023 +0530
Add sorting logic by created_date for fetching triggers (#31151)
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow/models/trigger.py | 21 ++++++++++++++-------
tests/models/test_trigger.py | 31 +++++++++++++++++++++++++++++++
2 files changed, 45 insertions(+), 7 deletions(-)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index adad7ba1c3..c69ac89ff2 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -220,16 +220,23 @@ class Trigger(Base):
# Find triggers who do NOT have an alive triggerer_id, and then assign
# up to `capacity` of those to us.
- trigger_ids_query = with_row_locks(
- session.query(cls.id)
- .filter(or_(cls.triggerer_id.is_(None),
cls.triggerer_id.notin_(alive_triggerer_ids)))
- .limit(capacity),
- session,
- skip_locked=True,
- ).all()
+ trigger_ids_query = cls.get_sorted_triggers(
+ capacity=capacity, alive_triggerer_ids=alive_triggerer_ids,
session=session
+ )
if trigger_ids_query:
session.query(cls).filter(cls.id.in_([i.id for i in
trigger_ids_query])).update(
{cls.triggerer_id: triggerer_id},
synchronize_session=False,
)
session.commit()
+
+ @classmethod
+ def get_sorted_triggers(cls, capacity, alive_triggerer_ids, session):
+ return with_row_locks(
+ session.query(cls.id)
+ .filter(or_(cls.triggerer_id.is_(None),
cls.triggerer_id.notin_(alive_triggerer_ids)))
+ .order_by(cls.created_date)
+ .limit(capacity),
+ session,
+ skip_locked=True,
+ ).all()
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index ac5bba4606..4abd7e9e42 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import datetime
import pytest
+import pytz
from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -184,3 +185,33 @@ def test_assign_unassigned(session, create_task_instance):
session.query(Trigger).filter(Trigger.id ==
trigger_on_healthy_triggerer.id).one().triggerer_id
== healthy_triggerer.id
)
+
+
+def test_get_sorted_triggers(session, create_task_instance):
+ """
+ Tests that triggers are sorted by the creation_date.
+ """
+ trigger_old = Trigger(
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ kwargs={},
+ created_date=datetime.datetime(
+ 2023, 5, 9, 12, 16, 14, 474415,
tzinfo=pytz.timezone("Africa/Abidjan")
+ ),
+ )
+ trigger_old.id = 1
+ trigger_new = Trigger(
+ classpath="airflow.triggers.testing.SuccessTrigger",
+ kwargs={},
+ created_date=datetime.datetime(
+ 2023, 5, 9, 12, 17, 14, 474415,
tzinfo=pytz.timezone("Africa/Abidjan")
+ ),
+ )
+ trigger_new.id = 2
+ session.add(trigger_old)
+ session.add(trigger_new)
+ session.commit()
+ assert session.query(Trigger).count() == 2
+
+ trigger_ids_query = Trigger.get_sorted_triggers(capacity=100,
alive_triggerer_ids=[], session=session)
+
+ assert trigger_ids_query == [(1,), (2,)]