This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ea4050d41 fix(amazon): add return statement to yield within a while 
loop in triggers (#38396)
9ea4050d41 is described below

commit 9ea4050d41aa22c98523d6e6dfa6c4204fc8cf93
Author: Wei Lee <[email protected]>
AuthorDate: Fri Mar 22 18:43:48 2024 +0800

    fix(amazon): add return statement to yield within a while loop in triggers 
(#38396)
---
 airflow/providers/amazon/aws/triggers/redshift_cluster.py |  1 +
 airflow/providers/amazon/aws/triggers/s3.py               |  6 ++++--
 airflow/providers/amazon/aws/triggers/sagemaker.py        | 10 ++++++----
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/amazon/aws/triggers/redshift_cluster.py 
b/airflow/providers/amazon/aws/triggers/redshift_cluster.py
index eebbfce380..37ca6b6d9f 100644
--- a/airflow/providers/amazon/aws/triggers/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/triggers/redshift_cluster.py
@@ -311,6 +311,7 @@ class RedshiftClusterTrigger(BaseTrigger):
                     "status"
                 ] == "error":
                     yield TriggerEvent(res)
+                    return
                 await asyncio.sleep(self.poke_interval)
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e)})
diff --git a/airflow/providers/amazon/aws/triggers/s3.py 
b/airflow/providers/amazon/aws/triggers/s3.py
index fbded51d2e..adecde9aa1 100644
--- a/airflow/providers/amazon/aws/triggers/s3.py
+++ b/airflow/providers/amazon/aws/triggers/s3.py
@@ -98,8 +98,9 @@ class S3KeyTrigger(BaseTrigger):
                             )
                             await asyncio.sleep(self.poke_interval)
                             yield TriggerEvent({"status": "running", "files": 
s3_objects})
-
-                        yield TriggerEvent({"status": "success"})
+                        else:
+                            yield TriggerEvent({"status": "success"})
+                        return
 
                     self.log.info("Sleeping for %s seconds", 
self.poke_interval)
                     await asyncio.sleep(self.poke_interval)
@@ -204,6 +205,7 @@ class S3KeysUnchangedTrigger(BaseTrigger):
                     )
                     if result.get("status") in ("success", "error"):
                         yield TriggerEvent(result)
+                        return
                     elif result.get("status") == "pending":
                         self.previous_objects = result.get("previous_objects", 
set())
                         self.last_activity_time = 
result.get("last_activity_time")
diff --git a/airflow/providers/amazon/aws/triggers/sagemaker.py 
b/airflow/providers/amazon/aws/triggers/sagemaker.py
index b67759bf89..8f10418763 100644
--- a/airflow/providers/amazon/aws/triggers/sagemaker.py
+++ b/airflow/providers/amazon/aws/triggers/sagemaker.py
@@ -245,8 +245,8 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
         job_already_completed = status not in self.hook.non_terminal_states
         state = LogState.COMPLETE if job_already_completed else 
LogState.TAILING
         last_describe_job_call = time.time()
-        while True:
-            try:
+        try:
+            while True:
                 (
                     state,
                     last_description,
@@ -267,6 +267,7 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
                     reason = last_description.get("FailureReason", "(No reason 
provided)")
                     error_message = f"SageMaker job failed because {reason}"
                     yield TriggerEvent({"status": "error", "message": 
error_message})
+                    return
                 else:
                     billable_seconds = SageMakerHook.count_billable_seconds(
                         
training_start_time=last_description["TrainingStartTime"],
@@ -275,5 +276,6 @@ class SageMakerTrainingPrintLogTrigger(BaseTrigger):
                     )
                     self.log.info("Billable seconds: %d", billable_seconds)
                     yield TriggerEvent({"status": "success", "message": 
last_description})
-            except Exception as e:
-                yield TriggerEvent({"status": "error", "message": str(e)})
+                    return
+        except Exception as e:
+            yield TriggerEvent({"status": "error", "message": str(e)})

Reply via email to