vandonr-amz commented on code in PR #32534:
URL: https://github.com/apache/airflow/pull/32534#discussion_r1268555763


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1227,6 +1256,45 @@ def execute(self, context: Context) -> str | None:
 
         return self.job_id
 
+    def start_job_run_after_defer(self, context: Context, event: dict[str, 
Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            self.log.info("Application %s failed to start", 
self.application_id)
+        response = self.hook.conn.start_job_run(
+            clientToken=self.client_request_token,
+            applicationId=self.application_id,
+            executionRoleArn=self.execution_role_arn,
+            jobDriver=self.job_driver,
+            configurationOverrides=self.configuration_overrides,
+            name=self.name,
+            **self.config,
+        )
+
+        if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
+            raise AirflowException(f"EMR serverless job failed to start: 
{response}")
+
+        self.defer(
+            trigger=EmrServerlessStartJobTrigger(
+                application_id=self.application_id,
+                job_id=response["jobRunId"],
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                aws_conn_id=self.aws_conn_id,
+            ),
+            method_name="execute_complete",
+            timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "success":
+            self.log.info("Serverless job completed")
+            return event["job_id"]
+
     def on_kill(self) -> None:

Review Comment:
   it's the operator's on_kill method, not the executor. And as soon as you 
deffer, you're not in the operator anymore, so this won't get called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to