This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ea4050d41 fix(amazon): add return statement to yield within a while
loop in triggers (#38396)
9ea4050d41 is described below
commit 9ea4050d41aa22c98523d6e6dfa6c4204fc8cf93
Author: Wei Lee <[email protected]>
AuthorDate: Fri Mar 22 18:43:48 2024 +0800
fix(amazon): add return statement to yield within a while loop in triggers
(#38396)
---
airflow/providers/amazon/aws/triggers/redshift_cluster.py | 1 +
airflow/providers/amazon/aws/triggers/s3.py | 6 ++++--
airflow/providers/amazon/aws/triggers/sagemaker.py | 10 ++++++----
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/amazon/aws/triggers/redshift_cluster.py
b/airflow/providers/amazon/aws/triggers/redshift_cluster.py
index eebbfce380..37ca6b6d9f 100644
--- a/airflow/providers/amazon/aws/triggers/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/triggers/redshift_cluster.py
@@ -311,6 +311,7 @@ class RedshiftClusterTrigger(BaseTrigger):
"status"
] == "error":
yield TriggerEvent(res)
+ return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
diff --git a/airflow/providers/amazon/aws/triggers/s3.py
b/airflow/providers/amazon/aws/triggers/s3.py
index fbded51d2e..adecde9aa1 100644
--- a/airflow/providers/amazon/aws/triggers/s3.py
+++ b/airflow/providers/amazon/aws/triggers/s3.py
@@ -98,8 +98,9 @@ class S3KeyTrigger(BaseTrigger):
)
await asyncio.sleep(self.poke_interval)
yield TriggerEvent({"status": "running", "files":
s3_objects})
-
- yield TriggerEvent({"status": "success"})
+ else:
+ yield TriggerEvent({"status": "success"})
+ return
self.log.info("Sleeping for %s seconds",
self.poke_interval)
await asyncio.sleep(self.poke_interval)
@@ -204,6 +205,7 @@ class S3KeysUnchangedTrigger(BaseTrigger):
)
if result.get("status") in ("success", "error"):
yield TriggerEvent(result)
+ return
elif result.get("status") == "pending":
self.previous_objects = result.get("previous_objects",
set())
self.last_activity_time =
result.get("last_activity_time")
diff --git a/airflow/providers/amazon/aws/triggers/sagemaker.py
b/airflow/providers/amazon/aws/triggers/sagemaker.py
index b67759bf89..8f10418763 100644
--- a/airflow/providers/amazon/aws/triggers/sagemaker.py
+++ b/airflow/providers/amazon/aws/triggers/sagemaker.py
@@ -245,8 +245,8 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
job_already_completed = status not in self.hook.non_terminal_states
state = LogState.COMPLETE if job_already_completed else
LogState.TAILING
last_describe_job_call = time.time()
- while True:
- try:
+ try:
+ while True:
(
state,
last_description,
@@ -267,6 +267,7 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
reason = last_description.get("FailureReason", "(No reason
provided)")
error_message = f"SageMaker job failed because {reason}"
yield TriggerEvent({"status": "error", "message":
error_message})
+ return
else:
billable_seconds = SageMakerHook.count_billable_seconds(
training_start_time=last_description["TrainingStartTime"],
@@ -275,5 +276,6 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
)
self.log.info("Billable seconds: %d", billable_seconds)
yield TriggerEvent({"status": "success", "message":
last_description})
- except Exception as e:
- yield TriggerEvent({"status": "error", "message": str(e)})
+ return
+ except Exception as e:
+ yield TriggerEvent({"status": "error", "message": str(e)})