Lee-W commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3266395501


##########
airflow-core/docs/authoring-and-scheduling/event-scheduling.rst:
##########
@@ -64,6 +64,97 @@ event-driven scheduling, then a new trigger must be created.
 This new trigger must inherit ``BaseEventTrigger`` and ensure it properly 
works with event-driven scheduling.
 It might inherit from the existing trigger as well if both triggers share some 
common code.
 
+Sharing one poll across sibling triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+When several ``AssetWatcher`` instances on different assets back triggers that 
read from the **same upstream resource**
+— one SQS queue, one Kafka topic, one directory of flag files — the triggerer 
would otherwise spin up one independent
+poll loop per trigger. For a shared queue with twenty subscribers that means 
twenty consumers, twenty connections,
+twenty sets of API calls per cadence.
+
+``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a 
single underlying poll, while each
+trigger keeps its own DB row, its own ``run_trigger`` task, and its own 
per-instance filtering. To participate, a
+subclass overrides three hooks:
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` — 
return a key identifying the shared
+  upstream (typically a tuple of strings). Triggers whose key compares equal 
will share one poll. Returning ``None``
+  (the default) opts out — the trigger runs its own independent ``run()`` 
loop, exactly as before.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a 
``@classmethod`` coroutine the triggerer
+  drives **once per shared-stream group** to yield raw events from the 
upstream. Because the triggerer reuses one
+  trigger's kwargs to drive the shared poll, only rely on fields whose values 
participate in ``shared_stream_key``.
+
+* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an 
instance method that consumes the
+  broadcast raw stream and yields the ``TriggerEvent`` instances this trigger 
should fire. Per-trigger filtering
+  (e.g. only events matching this instance's ``filename``) lives here.
+
+Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag 
file appears in a shared inbox directory:
+
+.. code-block:: python
+
+    from airflow.triggers.base import BaseEventTrigger, TriggerEvent
+
+
+    class DirectoryFileDeleteTrigger(BaseEventTrigger):
+        def __init__(self, *, directory, filename, poke_interval=5.0):
+            super().__init__()
+            self.directory = directory
+            self.filename = filename
+            self.poke_interval = poke_interval
+
+        def shared_stream_key(self):
+            # All triggers on the same directory + cadence share one scan.
+            return ("directory-scan", self.directory, self.poke_interval)
+
+        @classmethod
+        async def open_shared_stream(cls, kwargs):

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to