o-nikolas commented on code in PR #31169:
URL: https://github.com/apache/airflow/pull/31169#discussion_r1189219476


##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -1052,7 +1054,41 @@ def execute(self, context: Context) -> dict:
                 countdown=self.waiter_countdown,
                 check_interval_seconds=self.waiter_check_interval_seconds,
             )
-        return response["jobRunId"]
+        return self.job_id
+
+    def on_kill(self) -> None:
+        """Cancel the submitted job run"""
+        if self.job_id:
+            self.log.info("Stopping job run with jobId - %s", self.job_id)
+            response = 
self.hook.conn.cancel_job_run(applicationId=self.application_id, 
jobRunId=self.job_id)
+            http_status_code = None
+            try:
+                http_status_code = 
response["ResponseMetadata"]["HTTPStatusCode"]
+            except Exception as ex:
+                self.log.error("Exception while cancelling query: %s", ex)
+            finally:
+                if http_status_code is None or http_status_code != 200:
+                    self.log.error("Unable to request query cancel on EMR 
Serverless. Exiting")
+                else:
+                    self.log.info(
+                        "Polling EMR Serverless for query with id %s to reach 
final state",
+                        self.job_id,
+                    )
+                    # This should be replaced with a boto waiter when 
available.
+                    waiter(
+                        get_state_callable=self.hook.conn.get_job_run,
+                        get_state_args={
+                            "applicationId": self.application_id,
+                            "jobRunId": self.job_id,
+                        },
+                        parse_response=["jobRun", "state"],
+                        desired_state=EmrServerlessHook.JOB_TERMINAL_STATES,
+                        failure_states=set(),
+                        object_type="job",
+                        action="cancelled",
+                        countdown=self.waiter_countdown,
+                        
check_interval_seconds=self.waiter_check_interval_seconds,
+                    )

Review Comment:
   Was the main concern with wrapping the index into `response` with `except 
Exception` that the key wouldn't exist? If so you can use `.get()` which will 
return None or a given default.
   Also the waiter being buried in the else of an if inside a finally of a 
try/except feels a little odd.
   
   Here's a suggested refactoring with both those points in mind (**NOTE**: 
this is not tested and was written in the crummy github text editor, so do 
double check the logic/indenting):
   ```suggestion
              
               http_status_code = response.get("ResponseMetadata", 
{}).get("HTTPStatusCode") if response else None
               if http_status_code is None or http_status_code != 200:
                   self.log.error("Unable to request query cancel on EMR 
Serverless. Exiting")
                   return
                   
               self.log.info(
                   "Polling EMR Serverless for query with id %s to reach final 
state",
                   self.job_id,
               )
               # This should be replaced with a boto waiter when available.
               waiter(
                   get_state_callable=self.hook.conn.get_job_run,
                   get_state_args={
                       "applicationId": self.application_id,
                       "jobRunId": self.job_id,
                   },
                   parse_response=["jobRun", "state"],
                   desired_state=EmrServerlessHook.JOB_TERMINAL_STATES,
                   failure_states=set(),
                   object_type="job",
                   action="cancelled",
                   countdown=self.waiter_countdown,
                   check_interval_seconds=self.waiter_check_interval_seconds,
               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to