Lee-W commented on code in PR #32513:
URL: https://github.com/apache/airflow/pull/32513#discussion_r1265491042


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -256,9 +256,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
         kwargs["client_type"] = "emr-serverless"
         super().__init__(*args, **kwargs)
 
-    def cancel_running_jobs(self, application_id: str, waiter_config: dict = 
{}):
+    def cancel_running_jobs(
+        self, application_id: str, waiter_config: dict = {}, 
wait_for_completion: bool = True

Review Comment:
   It's suggested not to use mutable object `{}` as default value.



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped 
successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:

Review Comment:
   nitpick: missing type annotation



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped 
successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.hook.conn.stop_application(applicationId=self.application_id)
+            self.defer(
+                trigger=EmrServerlessStopApplicationTrigger(
+                    application_id=self.application_id,
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.log.info("EMR serverless application %s stopped 
successfully", self.application_id)

Review Comment:
   Do we need to handle the non-success cases as other operators?



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_stopped",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessDeleteApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be deleted.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1355,6 +1438,24 @@ def execute(self, context: Context) -> None:
             )
             self.log.info("EMR serverless application %s stopped 
successfully", self.application_id)
 
+    def stop_application(self, context, event=None) -> None:
+        if event["status"] == "success":
+            self.hook.conn.stop_application(applicationId=self.application_id)
+            self.defer(
+                trigger=EmrServerlessStopApplicationTrigger(
+                    application_id=self.application_id,
+                    aws_conn_id=self.aws_conn_id,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context, event=None) -> None:

Review Comment:
   nitpick: missing type annotation



##########
airflow/providers/amazon/aws/triggers/emr.py:
##########
@@ -283,3 +283,180 @@ def __init__(
 
     def hook(self) -> AwsGenericHook:
         return EmrHook(self.aws_conn_id)
+
+
+class EmrServerlessCreateApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be created.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_created",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application creation failed",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStartApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be started.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_started",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessStopApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be stopped.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_stopped",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessDeleteApplicationTrigger(AwsBaseWaiterTrigger):
+    """
+    Poll an Emr Serverless application and wait for it to be deleted.
+
+    :param application_id: The ID of the application being polled.
+    :waiter_delay: polling period in seconds to check for the status
+    :param waiter_max_attempts: The maximum number of attempts to be made
+    :param aws_conn_id: Reference to AWS connection id
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 60,
+        aws_conn_id: str = "aws_default",
+    ):
+        super().__init__(
+            serialized_fields={"application_id": application_id},
+            waiter_name="serverless_app_terminated",
+            waiter_args={"applicationId": application_id},
+            failure_message="Application failed to start",
+            status_message="Application status is",
+            status_queries=["application.state", "application.stateDetails"],
+            return_key="application_id",
+            return_value=application_id,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EmrServerlessHook(self.aws_conn_id)
+
+
+class EmrServerlessCancelJobsTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger for canceling a list of jobs in an EMR Serverless application.
+
+    :param application_id: EMR Serverless application ID
+    :param aws_conn_id: Reference to AWS connection id
+    :param waiter_delay: Delay in seconds between each attempt to check the 
status
+    :param waiter_max_attempts: Maximum number of attempts to check the status
+    """
+
+    def __init__(
+        self,
+        application_id: str,
+        aws_conn_id: str,
+        waiter_delay: int,
+        waiter_max_attempts: int,
+    ):

Review Comment:
   ```suggestion
       ) -> None:
   ```



##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -256,9 +256,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
         kwargs["client_type"] = "emr-serverless"
         super().__init__(*args, **kwargs)
 
-    def cancel_running_jobs(self, application_id: str, waiter_config: dict = 
{}):
+    def cancel_running_jobs(
+        self, application_id: str, waiter_config: dict = {}, 
wait_for_completion: bool = True
+    ):

Review Comment:
   nitpick: missing return annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to