zach-overflow commented on code in PR #59239:
URL: https://github.com/apache/airflow/pull/59239#discussion_r2645814905
##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -399,7 +423,15 @@ def get_sorted_triggers(cls, capacity: int,
alive_triggerer_ids: list[int] | Sel
# picking up too many triggers and starving other triggerers for
HA setup.
remaining_capacity = min(remaining_capacity,
cls.max_trigger_to_select_per_loop)
- locked_query = with_row_locks(query.limit(remaining_capacity),
session, skip_locked=True)
+ # Filter by trigger_queues if the triggerer explicitly was called
with `--consume-trigger-queues`,
+ # otherwise, filter out Triggers which have an explicit
`trigger_queue` value since there may be other
+ # triggerer hosts explicitly assigned to that queue.
+ if consume_trigger_queues:
+ filtered_query =
query.filter(cls.trigger_queue.in_(consume_trigger_queues))
+ else:
+ filtered_query = query.filter(cls.trigger_queue.is_(None))
Review Comment:
So I believe this concern is addressed by what I described
[here](https://github.com/apache/airflow/pull/59239#discussion_r2645791823), in
that the currently proposed design intentionally does not derive the trigger
queue values from the task queues. The safety issues you are describing are
partly the motivation for keeping the trigger queue and task queue distinct,
rather than tightly coupled.
In other words, the intent here is that trigger queue assignment is specific
to the BaseTrigger instantiation / operator's `defer` call arguments,
effectively making the trigger queue an opt-in feature, which is not derived
from the task queue (unless users explicitly define their trigger queues from
their tasks' queue values).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]