jason810496 commented on code in PR #66584: URL: https://github.com/apache/airflow/pull/66584#discussion_r3287754985
########## airflow-core/docs/authoring-and-scheduling/event-scheduling.rst: ########## @@ -64,6 +64,128 @@ event-driven scheduling, then a new trigger must be created. This new trigger must inherit ``BaseEventTrigger`` and ensure it properly works with event-driven scheduling. It might inherit from the existing trigger as well if both triggers share some common code. +Sharing one poll across sibling triggers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 3.3 + +When several ``AssetWatcher`` instances on different assets back triggers that read from the **same upstream resource** +— a directory of flag files, a polling REST endpoint, a Kafka topic with auto-commit, and similar idempotent or +subscriber-side-effect sources — the triggerer would otherwise spin up one independent poll loop per trigger. For a +shared source with twenty subscribers that means twenty poll loops, twenty connections, twenty sets of API calls per +cadence. See "Suitable upstreams" below for the precise scope. Review Comment: Even though we tracked the "add producer-side ack channel" in https://github.com/apache/airflow/issues/67179. Would it better to not include the Kafka, SQS in the doc here? Just in case https://github.com/apache/airflow/issues/67179 is not included in 3.3 release. ########## airflow-core/src/airflow/triggers/shared_stream.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Shared underlying I/O between :class:`BaseEventTrigger` instances in the triggerer. + +When multiple triggers declare the same non-``None`` +:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the +triggerer routes them through :class:`SharedStreamManager` so that one +underlying poll loop produces raw events that are broadcast to every +participating trigger. Each trigger then runs +:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to +convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent` +instances. Triggers that opt out (the default) keep their independent +``run()``-based poll loops untouched. + +Scope and the missing ack channel +--------------------------------- + +The shared-stream channel is **one-way**: events flow from +``open_shared_stream`` out to each subscriber's ``filter_shared_stream``, +with no path back. Subscribers cannot tell the producer "I accepted this +event; please advance / commit / ack". The pattern is therefore only safe +for upstreams whose consumption does not need a producer-side side effect +tied to a subscriber's accept / reject decision: + +* Idempotent / read-only reads (filesystem listings, polling REST APIs). +* Auto-commit Kafka consumers (``enable.auto.commit=true``). +* Subscriber-side-effect cleanup (``unlink``, local marking, …) where the + per-event action goes through APIs the subscriber owns independently. + +Kafka manual-commit consumers, SQS delete-on-process / visibility +extension, and similar message-broker patterns where progress is per-message +and tied to the subscriber's decision are **not** in scope here today. A +producer-side ack channel to cover them is a follow-up that should be +designed against a concrete Kafka or SQS consumer rather than against an +abstract API. See :class:`~airflow.triggers.base.BaseEventTrigger` for the +matching subclass-facing notes. + +Lifecycle invariants +-------------------- + +The manager and groups cooperate to keep a single invariant true at every +``await``-point: + + A key is present in :attr:`SharedStreamManager._groups` only while its + group's poll task is alive and accepting new subscribers. + +This rules out the late-subscriber races that the naive design admits — a +new subscriber for a key whose poll has died or is in the middle of being +torn down always falls through to "create a fresh group" rather than +attaching to a dead one and hanging on an empty queue. The invariant is +maintained synchronously: + +* When ``_poll`` ends for any reason other than cancellation (the upstream + iterator raised, or returned), the group's ``finally`` block evicts the + key from ``_groups`` and broadcasts a terminal sentinel to current + subscribers — all without yielding, so no other coroutine can interleave. +* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe` + evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a + new subscriber arriving while we wait for cancellation creates a fresh + group. +* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous + step before awaiting any stop, applying the same rule to shutdown. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +import structlog + +if TYPE_CHECKING: + from structlog.stdlib import BoundLogger + + from airflow.triggers.base import BaseEventTrigger + +log = structlog.get_logger(__name__) + +DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024 +"""Default per-subscriber queue size for shared streams. + +The :class:`SharedStreamManager` admits up to this many unconsumed raw events +per subscriber before treating the subscriber as too slow to keep up — at +which point the subscriber's trigger is failed with +:class:`_SubscriberOverflow` rather than the queue growing without bound. + +Used as the fallback when no value is passed to ``SharedStreamManager``; +in the triggerer this is overridden from the +``[triggerer] shared_stream_subscriber_queue_size`` config option. +""" + + +class _PollTerminated(Exception): + """ + Raised inside subscribers when ``open_shared_stream`` returns without yielding more events. + + Implementations are expected to run for the lifetime of the group; an + early return would otherwise leave subscribers waiting forever on an + empty queue. + """ + + +class _SubscriberOverflow(Exception): + """ + Raised in a subscriber whose queue exceeded its maxsize. + + Surfaces the slow subscriber loudly through the standard trigger-failure + path (rather than silently dropping events) so Airflow's retry / failure + semantics apply. Other subscribers in the same group are unaffected. + """ + + +class _PollFailure: + """Sentinel propagated through subscriber queues when the shared poll ends.""" + + __slots__ = ("exc",) + + def __init__(self, exc: BaseException) -> None: + self.exc = exc + + +async def _drain(queue: asyncio.Queue) -> AsyncGenerator[Any, None]: + """ + Yield items from ``queue`` until a poll termination sentinel arrives. + + Subscribers exit either by their consuming task being cancelled + (Airflow's standard idiom — :class:`CancelledError` propagates through + ``queue.get()``) or by the shared poll ending, in which case the + :class:`_PollFailure` sentinel re-raises here. + """ + while True: + item = await queue.get() + if isinstance(item, _PollFailure): + raise item.exc + yield item + + +class _SharedStreamGroup: + """One shared poll loop broadcasting raw events to N subscriber queues.""" + + def __init__( + self, + *, + key: Hashable, + trigger_class: type[BaseEventTrigger], + kwargs: dict[str, Any], + on_poll_terminate: Callable[[_SharedStreamGroup], None], + max_subscriber_queue: int, + log: BoundLogger, + ) -> None: + self.key = key + self.trigger_class = trigger_class + self.kwargs = kwargs + self.log = log + self._on_poll_terminate = on_poll_terminate + self._max_subscriber_queue = max_subscriber_queue + self._subscribers: dict[int, asyncio.Queue] = {} + self._overflowed: set[int] = set() + self._poll_task: asyncio.Task | None = None + + def start(self) -> None: + """Start the underlying poll loop. Call exactly once per group.""" + if self._poll_task is not None: + raise RuntimeError(f"Shared stream group {self.key!r} already started") + self._poll_task = asyncio.create_task( + self._poll(), + name=f"shared-stream-poll[{self.key!r}]", + ) + + async def _poll(self) -> None: + terminal_exc: BaseException | None = None + try: + async for raw_event in self.trigger_class.open_shared_stream(self.kwargs): + for trigger_id, queue in self._subscribers.items(): + if trigger_id in self._overflowed: + # Subscriber has been force-failed on a previous + # overflow; the failure sentinel is already in its + # queue and unsubscribe will drop it on next pass. + continue + try: + queue.put_nowait(raw_event) + except asyncio.QueueFull: + self._fail_overflowed_subscriber(trigger_id, queue) + terminal_exc = _PollTerminated( + f"open_shared_stream for {self.key!r} returned without raising; " + "shared streams are expected to run for the lifetime of the group" + ) + except asyncio.CancelledError: + # ``stop()`` initiated this; the manager has already evicted the + # group and is awaiting our exit. Do not run the terminate path. + raise + except Exception as exc: + terminal_exc = exc + self.log.exception("Shared stream poll failed; propagating to subscribers", key=self.key) + finally: + if terminal_exc is not None: + # Synchronous: evict from the manager and broadcast the + # sentinel before returning to the loop, so no coroutine can + # observe ``_groups[key]`` pointing at a dead poll. + self._on_poll_terminate(self) + failure = _PollFailure(terminal_exc) + for queue in self._subscribers.values(): + # Drain stale events then put the failure sentinel so every + # subscriber wakes up even if its queue was at capacity. + self._drain_and_offer_failure(queue, failure) + + def subscribe(self, trigger_id: int) -> AsyncIterator[Any]: + """Register ``trigger_id`` as a subscriber and return its raw event stream.""" + if trigger_id in self._subscribers: + raise RuntimeError(f"Trigger {trigger_id} already subscribed to shared stream {self.key!r}") + queue: asyncio.Queue = asyncio.Queue(maxsize=self._max_subscriber_queue) + self._subscribers[trigger_id] = queue + return _drain(queue) + + def unsubscribe(self, trigger_id: int) -> None: + # Active subscribers exit through their consuming task being cancelled + # (Airflow's standard idiom); dropping the queue is enough here. + self._subscribers.pop(trigger_id, None) + self._overflowed.discard(trigger_id) + + def _fail_overflowed_subscriber(self, trigger_id: int, queue: asyncio.Queue) -> None: + """ + Force a slow subscriber to fail with :class:`_SubscriberOverflow`. + + The broadcast hit ``QueueFull`` for this subscriber's queue, which + means the subscriber's :meth:`filter_shared_stream` is falling behind + the upstream cadence. Rather than dropping events silently — which + would invisibly violate Asset event-driven semantics — we drain + whatever stale events are pending and replace them with a + :class:`_PollFailure` so the subscriber's ``run_trigger`` sees the + error on its next ``__anext__``. Other subscribers in the same group + are unaffected. + """ + self.log.warning( + "Shared stream subscriber overflowed; failing this trigger", + key=self.key, + trigger_id=trigger_id, + queue_maxsize=queue.maxsize, + ) + self._drain_and_offer_failure( + queue, + _PollFailure( + _SubscriberOverflow( + f"shared stream {self.key!r} fell behind for trigger {trigger_id}: " + f"subscriber queue exceeded maxsize={queue.maxsize}" + ) + ), + ) + self._overflowed.add(trigger_id) + + def _drain_and_offer_failure(self, queue: asyncio.Queue, failure: _PollFailure) -> None: + """ + Drain ``queue`` and put ``failure`` so the subscriber wakes on the failure. + + The drain releases capacity so the subsequent ``put_nowait`` cannot raise + ``QueueFull``; this is the single point that both the terminal-broadcast + and the per-subscriber overflow path go through. + """ + while not queue.empty(): + try: + queue.get_nowait() + except asyncio.QueueEmpty: + break + queue.put_nowait(failure) + + def is_empty(self) -> bool: + return not self._subscribers + + async def stop(self) -> None: + """Cancel the poll task if it is still running and wait for it to exit.""" + if self._poll_task is None or self._poll_task.done(): + return + self._poll_task.cancel() + with suppress(asyncio.CancelledError): + await self._poll_task + + +class SharedStreamManager: + """ + Coordinate :class:`BaseEventTrigger` instances that share underlying I/O. + + The manager owns one :class:`_SharedStreamGroup` per distinct + ``shared_stream_key``. Each group runs a single async task that drives + ``open_shared_stream``; subscribers receive raw events through their own + asyncio queues and convert them to :class:`TriggerEvent` instances + independently. + + The manager is single-event-loop and not thread-safe. The triggerer's + ``TriggerRunner`` is its sole owner. + """ + + def __init__( + self, + *, + log: BoundLogger | None = None, + max_subscriber_queue: int = DEFAULT_SUBSCRIBER_QUEUE_MAX, + ) -> None: + self.log = log or structlog.get_logger(__name__) + self._max_subscriber_queue = max_subscriber_queue + self._groups: dict[Hashable, _SharedStreamGroup] = {} + + def subscribe( + self, + *, + trigger_id: int, + trigger: BaseEventTrigger, + key: Hashable, + ) -> AsyncIterator[Any]: + """ + Subscribe a trigger to the shared stream identified by ``key``. + + On first subscriber for a given key the group is created and the + underlying poll loop is started. Returns an async iterator of raw + events the trigger should feed into ``filter_shared_stream``. + """ + if key is None: + raise ValueError("shared stream key must not be None") + group = self._groups.get(key) + if group is None: Review Comment: ```suggestion if (group := self._groups.get(key)) is None: ``` ########## airflow-core/docs/authoring-and-scheduling/event-scheduling.rst: ########## @@ -64,6 +64,128 @@ event-driven scheduling, then a new trigger must be created. This new trigger must inherit ``BaseEventTrigger`` and ensure it properly works with event-driven scheduling. It might inherit from the existing trigger as well if both triggers share some common code. +Sharing one poll across sibling triggers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. versionadded:: 3.3 + +When several ``AssetWatcher`` instances on different assets back triggers that read from the **same upstream resource** +— a directory of flag files, a polling REST endpoint, a Kafka topic with auto-commit, and similar idempotent or +subscriber-side-effect sources — the triggerer would otherwise spin up one independent poll loop per trigger. For a +shared source with twenty subscribers that means twenty poll loops, twenty connections, twenty sets of API calls per +cadence. See "Suitable upstreams" below for the precise scope. + +``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a single underlying poll, while each +trigger keeps its own DB row, its own ``run_trigger`` task, and its own per-instance filtering. To participate, a +subclass overrides three hooks: + +* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` — return a key identifying the shared + upstream (typically a tuple of strings). Triggers whose key compares equal will share one poll. Returning ``None`` + (the default) opts out — the trigger runs its own independent ``run()`` loop, exactly as before. The return value + is read **once** when the triggerer starts this trigger; changing it mid-lifetime has no effect on group + membership, so siblings that should share a poll must return the same key from the outset. + +* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a ``@classmethod`` coroutine the triggerer + drives **once per shared-stream group** to yield raw events from the upstream. Because the triggerer reuses one + trigger's kwargs to drive the shared poll, only rely on fields whose values participate in ``shared_stream_key``. + +* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an instance method that consumes the + broadcast raw stream and yields the ``TriggerEvent`` instances this trigger should fire. Per-trigger filtering + (e.g. only events matching this instance's ``filename``) lives here. + +Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag file appears in a shared inbox directory: + +.. code-block:: python + + from collections.abc import AsyncIterator, Hashable + from typing import Any + + from airflow.triggers.base import BaseEventTrigger, TriggerEvent + + + class DirectoryFileDeleteTrigger(BaseEventTrigger): + def __init__(self, *, directory, filename, poke_interval=5.0): + super().__init__() + self.directory = directory + self.filename = filename + self.poke_interval = poke_interval + + def shared_stream_key(self) -> Hashable | None: + # All triggers on the same directory + cadence share one scan. + return ("directory-scan", self.directory, self.poke_interval) Review Comment: Inspired by https://github.com/apache/airflow/pull/66859#discussion_r3264426096. How about mentioning the `shared_stream_key` should be deterministic in the user-facing docs and the base interface docstring? That user should not rely on current timestamp or runtime generated uuid. ########## airflow-core/src/airflow/triggers/shared_stream.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Shared underlying I/O between :class:`BaseEventTrigger` instances in the triggerer. + +When multiple triggers declare the same non-``None`` +:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the +triggerer routes them through :class:`SharedStreamManager` so that one +underlying poll loop produces raw events that are broadcast to every +participating trigger. Each trigger then runs +:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to +convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent` +instances. Triggers that opt out (the default) keep their independent +``run()``-based poll loops untouched. + +Scope and the missing ack channel +--------------------------------- + +The shared-stream channel is **one-way**: events flow from +``open_shared_stream`` out to each subscriber's ``filter_shared_stream``, +with no path back. Subscribers cannot tell the producer "I accepted this +event; please advance / commit / ack". The pattern is therefore only safe +for upstreams whose consumption does not need a producer-side side effect +tied to a subscriber's accept / reject decision: + +* Idempotent / read-only reads (filesystem listings, polling REST APIs). +* Auto-commit Kafka consumers (``enable.auto.commit=true``). +* Subscriber-side-effect cleanup (``unlink``, local marking, …) where the + per-event action goes through APIs the subscriber owns independently. + +Kafka manual-commit consumers, SQS delete-on-process / visibility +extension, and similar message-broker patterns where progress is per-message +and tied to the subscriber's decision are **not** in scope here today. A +producer-side ack channel to cover them is a follow-up that should be +designed against a concrete Kafka or SQS consumer rather than against an +abstract API. See :class:`~airflow.triggers.base.BaseEventTrigger` for the +matching subclass-facing notes. + +Lifecycle invariants +-------------------- + +The manager and groups cooperate to keep a single invariant true at every +``await``-point: + + A key is present in :attr:`SharedStreamManager._groups` only while its + group's poll task is alive and accepting new subscribers. + +This rules out the late-subscriber races that the naive design admits — a +new subscriber for a key whose poll has died or is in the middle of being +torn down always falls through to "create a fresh group" rather than +attaching to a dead one and hanging on an empty queue. The invariant is +maintained synchronously: + +* When ``_poll`` ends for any reason other than cancellation (the upstream + iterator raised, or returned), the group's ``finally`` block evicts the + key from ``_groups`` and broadcasts a terminal sentinel to current + subscribers — all without yielding, so no other coroutine can interleave. +* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe` + evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a + new subscriber arriving while we wait for cancellation creates a fresh + group. +* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous + step before awaiting any stop, applying the same rule to shutdown. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +import structlog + +if TYPE_CHECKING: + from structlog.stdlib import BoundLogger + + from airflow.triggers.base import BaseEventTrigger + +log = structlog.get_logger(__name__) + +DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024 +"""Default per-subscriber queue size for shared streams. + +The :class:`SharedStreamManager` admits up to this many unconsumed raw events +per subscriber before treating the subscriber as too slow to keep up — at +which point the subscriber's trigger is failed with +:class:`_SubscriberOverflow` rather than the queue growing without bound. + +Used as the fallback when no value is passed to ``SharedStreamManager``; +in the triggerer this is overridden from the +``[triggerer] shared_stream_subscriber_queue_size`` config option. +""" + + +class _PollTerminated(Exception): + """ + Raised inside subscribers when ``open_shared_stream`` returns without yielding more events. + + Implementations are expected to run for the lifetime of the group; an + early return would otherwise leave subscribers waiting forever on an + empty queue. + """ + + +class _SubscriberOverflow(Exception): + """ + Raised in a subscriber whose queue exceeded its maxsize. + + Surfaces the slow subscriber loudly through the standard trigger-failure + path (rather than silently dropping events) so Airflow's retry / failure + semantics apply. Other subscribers in the same group are unaffected. + """ + + +class _PollFailure: + """Sentinel propagated through subscriber queues when the shared poll ends.""" + + __slots__ = ("exc",) + + def __init__(self, exc: BaseException) -> None: + self.exc = exc + + +async def _drain(queue: asyncio.Queue) -> AsyncGenerator[Any, None]: + """ + Yield items from ``queue`` until a poll termination sentinel arrives. + + Subscribers exit either by their consuming task being cancelled + (Airflow's standard idiom — :class:`CancelledError` propagates through + ``queue.get()``) or by the shared poll ending, in which case the + :class:`_PollFailure` sentinel re-raises here. + """ + while True: + item = await queue.get() + if isinstance(item, _PollFailure): + raise item.exc + yield item + + +class _SharedStreamGroup: + """One shared poll loop broadcasting raw events to N subscriber queues.""" + + def __init__( + self, + *, + key: Hashable, + trigger_class: type[BaseEventTrigger], + kwargs: dict[str, Any], + on_poll_terminate: Callable[[_SharedStreamGroup], None], + max_subscriber_queue: int, + log: BoundLogger, + ) -> None: + self.key = key + self.trigger_class = trigger_class + self.kwargs = kwargs + self.log = log + self._on_poll_terminate = on_poll_terminate + self._max_subscriber_queue = max_subscriber_queue + self._subscribers: dict[int, asyncio.Queue] = {} + self._overflowed: set[int] = set() + self._poll_task: asyncio.Task | None = None + + def start(self) -> None: + """Start the underlying poll loop. Call exactly once per group.""" + if self._poll_task is not None: + raise RuntimeError(f"Shared stream group {self.key!r} already started") + self._poll_task = asyncio.create_task( + self._poll(), + name=f"shared-stream-poll[{self.key!r}]", + ) + + async def _poll(self) -> None: + terminal_exc: BaseException | None = None + try: + async for raw_event in self.trigger_class.open_shared_stream(self.kwargs): + for trigger_id, queue in self._subscribers.items(): + if trigger_id in self._overflowed: + # Subscriber has been force-failed on a previous + # overflow; the failure sentinel is already in its + # queue and unsubscribe will drop it on next pass. + continue + try: + queue.put_nowait(raw_event) + except asyncio.QueueFull: + self._fail_overflowed_subscriber(trigger_id, queue) + terminal_exc = _PollTerminated( + f"open_shared_stream for {self.key!r} returned without raising; " + "shared streams are expected to run for the lifetime of the group" + ) + except asyncio.CancelledError: + # ``stop()`` initiated this; the manager has already evicted the + # group and is awaiting our exit. Do not run the terminate path. + raise + except Exception as exc: + terminal_exc = exc + self.log.exception("Shared stream poll failed; propagating to subscribers", key=self.key) + finally: + if terminal_exc is not None: + # Synchronous: evict from the manager and broadcast the + # sentinel before returning to the loop, so no coroutine can + # observe ``_groups[key]`` pointing at a dead poll. + self._on_poll_terminate(self) + failure = _PollFailure(terminal_exc) + for queue in self._subscribers.values(): + # Drain stale events then put the failure sentinel so every + # subscriber wakes up even if its queue was at capacity. + self._drain_and_offer_failure(queue, failure) + + def subscribe(self, trigger_id: int) -> AsyncIterator[Any]: + """Register ``trigger_id`` as a subscriber and return its raw event stream.""" + if trigger_id in self._subscribers: + raise RuntimeError(f"Trigger {trigger_id} already subscribed to shared stream {self.key!r}") + queue: asyncio.Queue = asyncio.Queue(maxsize=self._max_subscriber_queue) + self._subscribers[trigger_id] = queue + return _drain(queue) + + def unsubscribe(self, trigger_id: int) -> None: + # Active subscribers exit through their consuming task being cancelled + # (Airflow's standard idiom); dropping the queue is enough here. + self._subscribers.pop(trigger_id, None) + self._overflowed.discard(trigger_id) + + def _fail_overflowed_subscriber(self, trigger_id: int, queue: asyncio.Queue) -> None: + """ + Force a slow subscriber to fail with :class:`_SubscriberOverflow`. Review Comment: Do we need to mention how to solve the overflow issue in the user-facing docs? e.g. Increase the threshold or re-design `shared_stream_key` method to let less elements produce same hash. ########## airflow-core/src/airflow/triggers/base.py: ########## @@ -269,6 +313,80 @@ def hash(classpath: str, kwargs: dict[str, Any]) -> int: normalized = encode_trigger({"classpath": classpath, "kwargs": kwargs})["kwargs"] return hash((classpath, json.dumps(BaseSerialization.serialize(normalized)).encode("utf-8"))) + def shared_stream_key(self) -> Hashable | None: + """ + Identify an underlying I/O stream that can be shared with sibling triggers. + + Two trigger instances whose ``shared_stream_key()`` return values + compare equal (and are not ``None``) will share a single underlying + poll loop in the triggerer. Each instance still receives the events + it cares about through its own :meth:`filter_shared_stream` call. + + Returning ``None`` (the default) opts out of sharing — the trigger + runs its own independent poll loop via :meth:`run`, exactly as today. + + The return value is read **once** when ``run_trigger`` first starts + this trigger; any change to the key afterwards has no effect on + group membership for this instance. To share one poll across a set + of sibling triggers, ensure every trigger in the set returns the + same key from the outset. + + .. note:: + + This method is called **after** :meth:`render_template_fields`, + so any templated attribute (for example a ``directory`` derived + from a Jinja expression) is already resolved when the key is + constructed. Two sibling triggers that render to the same path + will correctly share their poll. + """ + return None + + @classmethod + async def open_shared_stream(cls, kwargs: dict[str, Any]) -> AsyncIterator[Any]: Review Comment: Not sure if it's a good idea to introduce method-level generic so that user could define TypedDict or internal data model themself that shared for `open_shared_stream` and `filter_shared_stream`. e.g. `async def open_shared_stream(cls, kwargs: dict[str, Any]) -> AsyncIterator[MyType]:` ########## airflow-core/src/airflow/triggers/shared_stream.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Shared underlying I/O between :class:`BaseEventTrigger` instances in the triggerer. + +When multiple triggers declare the same non-``None`` +:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key`, the +triggerer routes them through :class:`SharedStreamManager` so that one +underlying poll loop produces raw events that are broadcast to every +participating trigger. Each trigger then runs +:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` to +convert the broadcast into its own :class:`~airflow.triggers.base.TriggerEvent` +instances. Triggers that opt out (the default) keep their independent +``run()``-based poll loops untouched. + +Scope and the missing ack channel +--------------------------------- + +The shared-stream channel is **one-way**: events flow from +``open_shared_stream`` out to each subscriber's ``filter_shared_stream``, +with no path back. Subscribers cannot tell the producer "I accepted this +event; please advance / commit / ack". The pattern is therefore only safe +for upstreams whose consumption does not need a producer-side side effect +tied to a subscriber's accept / reject decision: + +* Idempotent / read-only reads (filesystem listings, polling REST APIs). +* Auto-commit Kafka consumers (``enable.auto.commit=true``). +* Subscriber-side-effect cleanup (``unlink``, local marking, …) where the + per-event action goes through APIs the subscriber owns independently. + +Kafka manual-commit consumers, SQS delete-on-process / visibility +extension, and similar message-broker patterns where progress is per-message +and tied to the subscriber's decision are **not** in scope here today. A +producer-side ack channel to cover them is a follow-up that should be +designed against a concrete Kafka or SQS consumer rather than against an +abstract API. See :class:`~airflow.triggers.base.BaseEventTrigger` for the +matching subclass-facing notes. + +Lifecycle invariants +-------------------- + +The manager and groups cooperate to keep a single invariant true at every +``await``-point: + + A key is present in :attr:`SharedStreamManager._groups` only while its + group's poll task is alive and accepting new subscribers. + +This rules out the late-subscriber races that the naive design admits — a +new subscriber for a key whose poll has died or is in the middle of being +torn down always falls through to "create a fresh group" rather than +attaching to a dead one and hanging on an empty queue. The invariant is +maintained synchronously: + +* When ``_poll`` ends for any reason other than cancellation (the upstream + iterator raised, or returned), the group's ``finally`` block evicts the + key from ``_groups`` and broadcasts a terminal sentinel to current + subscribers — all without yielding, so no other coroutine can interleave. +* When the last subscriber leaves, :meth:`SharedStreamManager.unsubscribe` + evicts the key from ``_groups`` *before* awaiting ``group.stop()``, so a + new subscriber arriving while we wait for cancellation creates a fresh + group. +* :meth:`SharedStreamManager.stop_all` clears ``_groups`` in one synchronous + step before awaiting any stop, applying the same rule to shutdown. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator, AsyncIterator, Callable, Hashable +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +import structlog + +if TYPE_CHECKING: + from structlog.stdlib import BoundLogger + + from airflow.triggers.base import BaseEventTrigger + +log = structlog.get_logger(__name__) + +DEFAULT_SUBSCRIBER_QUEUE_MAX = 1024 +"""Default per-subscriber queue size for shared streams. + +The :class:`SharedStreamManager` admits up to this many unconsumed raw events +per subscriber before treating the subscriber as too slow to keep up — at +which point the subscriber's trigger is failed with +:class:`_SubscriberOverflow` rather than the queue growing without bound. + +Used as the fallback when no value is passed to ``SharedStreamManager``; +in the triggerer this is overridden from the +``[triggerer] shared_stream_subscriber_queue_size`` config option. +""" + + +class _PollTerminated(Exception): Review Comment: Do these exception need to inherent from `AirflowException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
