jkramer-ginkgo commented on a change in pull request #21770:
URL: https://github.com/apache/airflow/pull/21770#discussion_r813290958
##########
File path: tests/models/test_trigger.py
##########
@@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance):
updated_task_instance = session.query(TaskInstance).one()
assert updated_task_instance.state == State.SCHEDULED
assert updated_task_instance.next_method == "__fail__"
+
+
+def test_assign_unassigned(session, create_task_instance):
+ """
+ Tests that unassigned triggers of all appropriate states are assigned.
+ """
+ finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS)
+ finished_triggerer.end_date = timezone.utcnow() -
datetime.timedelta(hours=1)
+ session.add(finished_triggerer)
+ assert not finished_triggerer.is_alive()
+ healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+ session.add(healthy_triggerer)
+ assert healthy_triggerer.is_alive()
+ new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+ session.add(new_triggerer)
+ assert new_triggerer.is_alive()
+ session.commit()
+ trigger_on_healthy_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_on_healthy_triggerer.id = 1
+ trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+ trigger_on_killed_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_on_killed_triggerer.id = 2
+ trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
+ trigger_unassigned_to_triggerer =
Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+ trigger_unassigned_to_triggerer.id = 3
+ assert trigger_unassigned_to_triggerer.triggerer_id is None
+ session.add(trigger_on_healthy_triggerer)
+ session.add(trigger_on_killed_triggerer)
+ session.add(trigger_unassigned_to_triggerer)
+ session.commit()
+ assert session.query(Trigger).count() == 3
+ Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+ session.expire_all()
Review comment:
had a bunch of trouble here (and I suspect it's because of the
`synchronize_session=False` in `Trigger.assign_unassigned`). at some point I
did try various combinations of `session.flush()`, `session.commit()`, but this
is what worked in getting the state synced. I'm no sqlalchemy expert either!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]