jason810496 commented on code in PR #59239:
URL: https://github.com/apache/airflow/pull/59239#discussion_r2616126652
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -2402,6 +2402,25 @@ triggerer:
type: integer
example: ~
default: "50"
+ default_trigger_queue:
+ description: |
+ Default queue to assign triggers to. If using a non-default value,
make sure this default is also
+ present in ``[triggerer] consume_trigger_queues`` otherwise triggers
created without an explicit
+ ``trigger_queue``.
+ will not run.
+ version_added: 3.2.0
+ type: string
+ example: ~
+ default: "default"
+ consume_trigger_queues:
Review Comment:
Which means all the Triggerer instances could still consume from the same
default one (if all the SQL statement doesn't apply `trigger_queue.in_` filter)
without these new config.
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -311,19 +314,30 @@ def submit_failure(cls, trigger_id, exc=None, session:
Session = NEW_SESSION) ->
@classmethod
@provide_session
- def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION)
-> list[int]:
+ def ids_for_triggerer(
+ cls, triggerer_id, trigger_queues: set[str], session: Session =
NEW_SESSION
+ ) -> list[int]:
"""Retrieve a list of trigger ids."""
- return list(session.scalars(select(cls.id).where(cls.triggerer_id ==
triggerer_id)).all())
+ return list(
+ session.scalars(
+ select(cls.id).where(cls.triggerer_id == triggerer_id,
cls.trigger_queue.in_(trigger_queues))
+ ).all()
+ )
Review Comment:
So that `trigger_queue` can still be pass as `set[str] | None` along the
path and only add the `trigger_queue.in_` filter if `consume-trigger-queues` is
explicitly provided.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]