dstandish commented on a change in pull request #21770:
URL: https://github.com/apache/airflow/pull/21770#discussion_r813261301



##########
File path: airflow/models/trigger.py
##########
@@ -184,7 +184,10 @@ def assign_unassigned(cls, triggerer_id, capacity, 
session=None):
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
         trigger_ids_query = (
-            
session.query(cls.id).filter(cls.triggerer_id.notin_(alive_triggerer_ids)).limit(capacity).all()
+            session.query(cls.id)
+            .filter(or_(cls.triggerer_id.notin_(alive_triggerer_ids), 
cls.triggerer_id == None))  # noqa: E711

Review comment:
       if the triggerer_id == None, then wouldn't  it already be true that it's 
not in `alive_triggerer_ids`?

##########
File path: airflow/models/trigger.py
##########
@@ -175,7 +175,7 @@ def assign_unassigned(cls, triggerer_id, capacity, 
session=None):
         alive_triggerer_ids = [
             row[0]
             for row in session.query(BaseJob.id).filter(
-                BaseJob.end_date is None,
+                BaseJob.end_date == None,  # noqa: E711

Review comment:
       ```suggestion
                   BaseJob.end_date.is_(None),
   ```
   this looks a little better to me... and i _think_ we can do, no?

##########
File path: tests/models/test_trigger.py
##########
@@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance):
     updated_task_instance = session.query(TaskInstance).one()
     assert updated_task_instance.state == State.SCHEDULED
     assert updated_task_instance.next_method == "__fail__"
+
+
+def test_assign_unassigned(session, create_task_instance):
+    """
+    Tests that unassigned triggers of all appropriate states are assigned.
+    """
+    finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS)
+    finished_triggerer.end_date = timezone.utcnow() - 
datetime.timedelta(hours=1)
+    session.add(finished_triggerer)
+    assert not finished_triggerer.is_alive()
+    healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(healthy_triggerer)
+    assert healthy_triggerer.is_alive()
+    new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(new_triggerer)
+    assert new_triggerer.is_alive()
+    session.commit()
+    trigger_on_healthy_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_healthy_triggerer.id = 1
+    trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+    trigger_on_killed_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_killed_triggerer.id = 2
+    trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
+    trigger_unassigned_to_triggerer = 
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_unassigned_to_triggerer.id = 3
+    assert trigger_unassigned_to_triggerer.triggerer_id is None
+    session.add(trigger_on_healthy_triggerer)
+    session.add(trigger_on_killed_triggerer)
+    session.add(trigger_unassigned_to_triggerer)
+    session.commit()
+    assert session.query(Trigger).count() == 3
+    Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+    session.expire_all()

Review comment:
       why not `commit` instead?  genuine question -- still learning sqlalchemy 
here. but from the doc it indicates that when you commit, then everything gets 
expired anyway.  and committing seems like a more intuitive thing to do here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to