dacort commented on code in PR #31169:
URL: https://github.com/apache/airflow/pull/31169#discussion_r1189392447


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1052,7 +1054,41 @@ def execute(self, context: Context) -> dict:
                 countdown=self.waiter_countdown,
                 check_interval_seconds=self.waiter_check_interval_seconds,
             )
-        return response["jobRunId"]
+        return self.job_id
+
+    def on_kill(self) -> None:
+        """Cancel the submitted job run"""
+        if self.job_id:
+            self.log.info("Stopping job run with jobId - %s", self.job_id)
+            response = 
self.hook.conn.cancel_job_run(applicationId=self.application_id, 
jobRunId=self.job_id)
+            http_status_code = None
+            try:
+                http_status_code = 
response["ResponseMetadata"]["HTTPStatusCode"]
+            except Exception as ex:
+                self.log.error("Exception while cancelling query: %s", ex)
+            finally:
+                if http_status_code is None or http_status_code != 200:
+                    self.log.error("Unable to request query cancel on EMR 
Serverless. Exiting")
+                else:
+                    self.log.info(
+                        "Polling EMR Serverless for query with id %s to reach 
final state",
+                        self.job_id,
+                    )
+                    # This should be replaced with a boto waiter when 
available.
+                    waiter(
+                        get_state_callable=self.hook.conn.get_job_run,
+                        get_state_args={
+                            "applicationId": self.application_id,
+                            "jobRunId": self.job_id,
+                        },
+                        parse_response=["jobRun", "state"],
+                        desired_state=EmrServerlessHook.JOB_TERMINAL_STATES,
+                        failure_states=set(),
+                        object_type="job",
+                        action="cancelled",
+                        countdown=self.waiter_countdown,
+                        
check_interval_seconds=self.waiter_check_interval_seconds,
+                    )

Review Comment:
   Thanks, I like both those suggestions. Will validate the suggestion tomorrow!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to