kaxil commented on code in PR #66584:
URL: https://github.com/apache/airflow/pull/66584#discussion_r3268776893
##########
providers/standard/src/airflow/providers/standard/triggers/file.py:
##########
@@ -132,3 +132,83 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent(True)
return
await asyncio.sleep(self.poke_interval)
+
+
+class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ """
+ Fire once when ``filename`` appears in ``directory``, then delete it.
+
+ Functionally equivalent to ``FileDeleteTrigger`` for a single file, but
+ sibling triggers that point at the same ``directory`` and ``poke_interval``
+ share a single underlying directory scan in the triggerer; each instance
+ only fires for its own ``filename``. This is useful when many assets are
+ driven by per-flag-file events landing in a shared inbox directory.
+
+ :param directory: Directory to scan.
+ :param filename: File name (without directory) whose appearance fires this
+ trigger. The matched file is deleted before the event is yielded.
+ :param poke_interval: Time to wait between scans.
+ """
+
+ def __init__(self, *, directory: str, filename: str, poke_interval: float
= 5.0) -> None:
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serialize DirectoryFileDeleteTrigger arguments and classpath."""
+ return (
+
"airflow.providers.standard.triggers.file.DirectoryFileDeleteTrigger",
+ {
+ "directory": self.directory,
+ "filename": self.filename,
+ "poke_interval": self.poke_interval,
+ },
+ )
+
+ def shared_stream_key(self) -> Hashable | None:
+ """All triggers on the same directory + cadence share one scan."""
+ return ("directory-scan", self.directory, self.poke_interval)
Review Comment:
The directory string is keyed verbatim, so AssetWatchers configured with
`"/tmp/flags"` and `"/tmp/flags/"` (or `"./flags"` vs the equivalent absolute
path) won't share a scan despite watching the same directory. Normalising via
`os.path.normpath(self.directory)` (or `os.path.realpath` if symlinks are a
concern) before tupling would make sharing robust to trivial path variants. The
cost of getting this wrong is silent: nothing fails, you just quietly run N
separate scans where one was intended.
##########
providers/standard/src/airflow/providers/standard/triggers/file.py:
##########
@@ -132,3 +132,83 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent(True)
return
await asyncio.sleep(self.poke_interval)
+
+
+class DirectoryFileDeleteTrigger(BaseEventTrigger):
+ """
+ Fire once when ``filename`` appears in ``directory``, then delete it.
+
+ Functionally equivalent to ``FileDeleteTrigger`` for a single file, but
+ sibling triggers that point at the same ``directory`` and ``poke_interval``
+ share a single underlying directory scan in the triggerer; each instance
+ only fires for its own ``filename``. This is useful when many assets are
+ driven by per-flag-file events landing in a shared inbox directory.
+
+ :param directory: Directory to scan.
+ :param filename: File name (without directory) whose appearance fires this
+ trigger. The matched file is deleted before the event is yielded.
+ :param poke_interval: Time to wait between scans.
+ """
+
+ def __init__(self, *, directory: str, filename: str, poke_interval: float
= 5.0) -> None:
+ super().__init__()
+ self.directory = directory
+ self.filename = filename
+ self.poke_interval = poke_interval
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ """Serialize DirectoryFileDeleteTrigger arguments and classpath."""
+ return (
+
"airflow.providers.standard.triggers.file.DirectoryFileDeleteTrigger",
+ {
+ "directory": self.directory,
+ "filename": self.filename,
+ "poke_interval": self.poke_interval,
+ },
+ )
+
+ def shared_stream_key(self) -> Hashable | None:
+ """All triggers on the same directory + cadence share one scan."""
+ return ("directory-scan", self.directory, self.poke_interval)
+
+ @classmethod
+ async def open_shared_stream(cls, kwargs: dict[str, Any]) ->
AsyncIterator[Any]:
+ """Drive one directory-listing loop and broadcast each snapshot."""
+ directory = anyio.Path(kwargs["directory"])
+ poke_interval: float = kwargs["poke_interval"]
+ while True:
+ try:
+ names = {p.name async for p in directory.iterdir()}
+ except FileNotFoundError:
Review Comment:
Only `FileNotFoundError` is recovered here. `PermissionError` or any other
`OSError` from `iterdir()` propagates out of `open_shared_stream`, triggers the
`_PollFailure` path in `SharedStreamManager`, and fails every sibling
subscriber on this directory at once. That may be the intent (treat config /
perms problems as loud, not silent), but `FileDeleteTrigger.run()` above just
logs-and-continues on the next poke for the equivalent case, so the behaviours
diverge. Worth either a docstring sentence on `open_shared_stream` noting that
non-existence is the only recoverable case (with a rationale), or widening the
`except` to `OSError` and logging at warning level so a brief perms blip
doesn't cascade-fail every watcher on the directory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]