jscheffl commented on code in PR #59239:
URL: https://github.com/apache/airflow/pull/59239#discussion_r2645305654
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -311,21 +313,33 @@ def submit_failure(cls, trigger_id, exc=None, session:
Session = NEW_SESSION) ->
@classmethod
@provide_session
- def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION)
-> list[int]:
+ def ids_for_triggerer(
+ cls, triggerer_id, consume_trigger_queues: set[str] | None = None,
session: Session = NEW_SESSION
+ ) -> list[int]:
"""Retrieve a list of trigger ids."""
- return list(session.scalars(select(cls.id).where(cls.triggerer_id ==
triggerer_id)).all())
+ query = select(cls.id).where(cls.triggerer_id == triggerer_id)
+ # By default, there is no trigger queue assignment. Only filter by
queue when explicitly set in the triggerer CLI.
+ if consume_trigger_queues:
+ query = query.filter(cls.trigger_queue.in_(consume_trigger_queues))
Review Comment:
With this filter only conditionally applied it is inconsistent to the
documentation above. If no queue is provided, no filter is applied and a
triggerer would pull _all_ items. (Not only the ones which have no queue
assigned!)
I assume the docs above are wrong. I think the behavior is OK and one must
be aware of that selective triggerers need to be set-up per queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]