vandonr-amz commented on code in PR #32534:
URL: https://github.com/apache/airflow/pull/32534#discussion_r1268554818


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
 
         return self.job_id
 
+    def start_job_run_after_defer(self, context: Context, event: dict[str, 
Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            self.log.info("Application %s failed to start", 
self.application_id)
+        response = self.hook.conn.start_job_run(
+            clientToken=self.client_request_token,
+            applicationId=self.application_id,
+            executionRoleArn=self.execution_role_arn,
+            jobDriver=self.job_driver,
+            configurationOverrides=self.configuration_overrides,
+            name=self.name,
+            **self.config,
+        )
+
+        if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+            raise AirflowException(f"EMR serverless job failed to start: 
{response}")
+
+        self.defer(
+            trigger=EmrServerlessStartJobTrigger(
+                application_id=self.application_id,
+                job_id=response["jobRunId"],
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                aws_conn_id=self.aws_conn_id,
+            ),
+            method_name="execute_complete",
+            timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "success":
+            self.log.info("Serverless job completed")
+            return event["job_id"]

Review Comment:
   ok, I see it a bit as "validation" that we didn't get crap data for whatever 
reason, but you are also right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to