zach-overflow commented on code in PR #59239:
URL: https://github.com/apache/airflow/pull/59239#discussion_r2645745144


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -311,21 +313,33 @@ def submit_failure(cls, trigger_id, exc=None, session: 
Session = NEW_SESSION) ->
 
     @classmethod
     @provide_session
-    def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION) 
-> list[int]:
+    def ids_for_triggerer(
+        cls, triggerer_id, consume_trigger_queues: set[str] | None = None, 
session: Session = NEW_SESSION
+    ) -> list[int]:
         """Retrieve a list of trigger ids."""
-        return list(session.scalars(select(cls.id).where(cls.triggerer_id == 
triggerer_id)).all())
+        query = select(cls.id).where(cls.triggerer_id == triggerer_id)
+        # By default, there is no trigger queue assignment. Only filter by 
queue when explicitly set in the triggerer CLI.
+        if consume_trigger_queues:
+            query = query.filter(cls.trigger_queue.in_(consume_trigger_queues))

Review Comment:
   Good catch -- you are right this is not in agreement with the docs. I will 
update this code since this was a change I missed during an early refactor. 
More info on my other related comment 
[here](https://github.com/apache/airflow/pull/59239#discussion_r2645743768).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to