This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ecd5faddf fix(core): add return statement to yield within a while
loop in triggers (#38389)
7ecd5faddf is described below
commit 7ecd5faddfc54cc8784638642270eefbbbf5d36d
Author: Wei Lee <[email protected]>
AuthorDate: Fri Mar 22 18:37:13 2024 +0800
fix(core): add return statement to yield within a while loop in triggers
(#38389)
---
airflow/triggers/external_task.py | 2 ++
airflow/triggers/file.py | 2 ++
2 files changed, 4 insertions(+)
diff --git a/airflow/triggers/external_task.py
b/airflow/triggers/external_task.py
index 98305ea4f1..5c7361a15b 100644
--- a/airflow/triggers/external_task.py
+++ b/airflow/triggers/external_task.py
@@ -121,6 +121,7 @@ class WorkflowTrigger(BaseTrigger):
)
if skipped_count > 0:
yield TriggerEvent({"status": "skipped"})
+ return
allowed_count = _get_count(
self.execution_dates,
self.external_task_ids,
@@ -289,6 +290,7 @@ class DagStateTrigger(BaseTrigger):
num_dags = await self.count_dags() # type: ignore[call-arg]
if num_dags == len(self.execution_dates):
yield TriggerEvent(self.serialize())
+ return
await asyncio.sleep(self.poll_interval)
@sync_to_async
diff --git a/airflow/triggers/file.py b/airflow/triggers/file.py
index 4a1df581bb..5f40dd4d2d 100644
--- a/airflow/triggers/file.py
+++ b/airflow/triggers/file.py
@@ -79,7 +79,9 @@ class FileTrigger(BaseTrigger):
mod_time =
datetime.datetime.fromtimestamp(mod_time_f).strftime("%Y%m%d%H%M%S")
self.log.info("Found File %s last modified: %s", path,
mod_time)
yield TriggerEvent(True)
+ return
for _, _, files in os.walk(self.filepath):
if files:
yield TriggerEvent(True)
+ return
await asyncio.sleep(self.poke_interval)