This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b26a76803 Add sorting logic by created_date for fetching triggers 
(#31151)
2b26a76803 is described below

commit 2b26a76803bb41abd05b13e444fb4b51620525cd
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed May 10 01:35:03 2023 +0530

    Add sorting logic by created_date for fetching triggers (#31151)
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow/models/trigger.py    | 21 ++++++++++++++-------
 tests/models/test_trigger.py | 31 +++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 7 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index adad7ba1c3..c69ac89ff2 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -220,16 +220,23 @@ class Trigger(Base):
 
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
-        trigger_ids_query = with_row_locks(
-            session.query(cls.id)
-            .filter(or_(cls.triggerer_id.is_(None), 
cls.triggerer_id.notin_(alive_triggerer_ids)))
-            .limit(capacity),
-            session,
-            skip_locked=True,
-        ).all()
+        trigger_ids_query = cls.get_sorted_triggers(
+            capacity=capacity, alive_triggerer_ids=alive_triggerer_ids, 
session=session
+        )
         if trigger_ids_query:
             session.query(cls).filter(cls.id.in_([i.id for i in 
trigger_ids_query])).update(
                 {cls.triggerer_id: triggerer_id},
                 synchronize_session=False,
             )
         session.commit()
+
+    @classmethod
+    def get_sorted_triggers(cls, capacity, alive_triggerer_ids, session):
+        return with_row_locks(
+            session.query(cls.id)
+            .filter(or_(cls.triggerer_id.is_(None), 
cls.triggerer_id.notin_(alive_triggerer_ids)))
+            .order_by(cls.created_date)
+            .limit(capacity),
+            session,
+            skip_locked=True,
+        ).all()
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index ac5bba4606..4abd7e9e42 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import datetime
 
 import pytest
+import pytz
 
 from airflow.jobs.job import Job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -184,3 +185,33 @@ def test_assign_unassigned(session, create_task_instance):
         session.query(Trigger).filter(Trigger.id == 
trigger_on_healthy_triggerer.id).one().triggerer_id
         == healthy_triggerer.id
     )
+
+
+def test_get_sorted_triggers(session, create_task_instance):
+    """
+    Tests that triggers are sorted by the creation_date.
+    """
+    trigger_old = Trigger(
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        kwargs={},
+        created_date=datetime.datetime(
+            2023, 5, 9, 12, 16, 14, 474415, 
tzinfo=pytz.timezone("Africa/Abidjan")
+        ),
+    )
+    trigger_old.id = 1
+    trigger_new = Trigger(
+        classpath="airflow.triggers.testing.SuccessTrigger",
+        kwargs={},
+        created_date=datetime.datetime(
+            2023, 5, 9, 12, 17, 14, 474415, 
tzinfo=pytz.timezone("Africa/Abidjan")
+        ),
+    )
+    trigger_new.id = 2
+    session.add(trigger_old)
+    session.add(trigger_new)
+    session.commit()
+    assert session.query(Trigger).count() == 2
+
+    trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, 
alive_triggerer_ids=[], session=session)
+
+    assert trigger_ids_query == [(1,), (2,)]

Reply via email to