syedahsn commented on code in PR #30463:
URL: https://github.com/apache/airflow/pull/30463#discussion_r1164719523
##########
airflow/providers/amazon/aws/operators/emr.py:
##########
@@ -992,21 +1036,40 @@ def execute(self, context: Context) -> dict:
self.log.info("EMR serverless job started: %s", response["jobRunId"])
if self.wait_for_completion:
- # This should be replaced with a boto waiter when available.
- waiter(
- get_state_callable=self.hook.conn.get_job_run,
- get_state_args={
- "applicationId": self.application_id,
- "jobRunId": response["jobRunId"],
- },
- parse_response=["jobRun", "state"],
- desired_state=EmrServerlessHook.JOB_SUCCESS_STATES,
- failure_states=EmrServerlessHook.JOB_FAILURE_STATES,
- object_type="job",
- action="run",
- countdown=self.waiter_countdown,
- check_interval_seconds=self.waiter_check_interval_seconds,
- )
+ try:
+ self.hook.get_waiter("serverless_job_running").wait(
+ applicationId=self.application_id,
+ jobRunId=response["jobRunId"],
+ WaiterConfig=prune_dict(
+ {
+ "Delay": self.waiter_delay,
+ "MaxAttempts": self.waiter_max_attempts,
+ }
+ ),
+ )
+ self.log.info("EMR serverless job is running: %s",
response["jobRunId"])
+ self.hook.get_waiter("serverless_job_completed").wait(
+ applicationId=self.application_id,
+ jobRunId=response["jobRunId"],
+ WaiterConfig=prune_dict(
+ {
+ "Delay": self.waiter_delay,
+ "MaxAttempts": self.waiter_max_attempts,
+ }
+ ),
+ )
Review Comment:
The custom boto waiters don't log anything unfortunately. In order to see
that a state change has happened, we need to use a separate waiter. In this
case, the extra log message is important because it informs the user that the
job is actually running.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]