dacort commented on code in PR #32151:
URL: https://github.com/apache/airflow/pull/32151#discussion_r1267061309


##########
airflow/providers/amazon/aws/hooks/emr.py:
##########
@@ -169,16 +172,21 @@ def add_job_flow_steps(
         if wait_for_completion:
             waiter = self.get_conn().get_waiter("step_complete")
             for step_id in response["StepIds"]:
-                waiter.wait(
-                    ClusterId=job_flow_id,
-                    StepId=step_id,
-                    WaiterConfig=prune_dict(
-                        {
-                            "Delay": waiter_delay,
-                            "MaxAttempts": waiter_max_attempts,
-                        }
-                    ),
-                )
+                try:
+                    wait(
+                        waiter=waiter,
+                        max_attempts=waiter_max_attempts,
+                        waiter_delay=waiter_delay,
+                        args={"ClusterId": job_flow_id, "StepId": step_id},
+                        failure_message=f"EMR Steps failed: {step_id}",
+                        status_message="EMR Step status is",
+                        status_args=["Step.Status.State", 
"Step.Status.StateChangeReason"],
+                    )
+                except AirflowException as ex:
+                    if "EMR Steps failed" in str(ex):
+                        resp = 
self.get_conn().describe_step(ClusterId=job_flow_id, StepId=step_id)
+                        self.log.error("EMR Steps failed: %s", 
resp["Step"]["Status"]["StateChangeReason"])

Review Comment:
   @o-nikolas `FailureDetails` should surface common reasons that a step failed 
and it will do so before the logs get synced to S3. I'm not as familiar with 
`StageChangedReason`, but I'm not sure it will have potential details about 
_why_ the job failed. I think it's more related to service-specific reasons why 
a step state changed (e.g. somebody cancelled it). Here's an example of a job I 
ran with an invalid entrypoint file. `FailureDetails` is populated immediately 
and the log file it shows didn't sync to S3 until 4 minutes after the step 
failed.
   
   ```json
   {
       "Step": {
           "Id": "s-00928142VC3M3WYAG8XN",
           "Name": "test-fail",
           "Config": {
               "Jar": "command-runner.jar",
               "Properties": {},
               "Args": [
                   "spark-submit",
                   "--deploy-mode",
                   "cluster",
                   "/tmp/fail.py"
               ]
           },
           "ActionOnFailure": "CONTINUE",
           "Status": {
               "State": "FAILED",
               "StateChangeReason": {},
               "FailureDetails": {
                   "Reason": "The given path is invalid.",
                   "Message": "Exception in thread \"main\" 
java.io.FileNotFoundException: File file:/mnt/tmp/fail.py does not exist",
                   "LogFile": 
"s3://aws-logs-568026268536-us-west-2/elasticmapreduce/j-2WPQL41AO1169/steps/s-00928142VC3M3WYAG8XN/stderr.gz"
               },
               "Timeline": {
                   "CreationDateTime": "2023-07-18T09:41:40.728000-07:00",
                   "StartDateTime": "2023-07-18T09:41:57.422000-07:00",
                   "EndDateTime": "2023-07-18T09:42:03.499000-07:00"
               }
           }
       }
   }
   ```
   
   And here's one where I cancelled the job before it ran with 
`StateChangeReason` populated. 
   
   ```json
   {
       "Step": {
           "Id": "s-07516842NY2M4IHHH71T",
           "Name": "test-fail",
           "Config": {
               "Jar": "command-runner.jar",
               "Properties": {},
               "Args": [
                   "spark-submit",
                   "--deploy-mode",
                   "cluster",
                   "/tmp/fail.py"
               ]
           },
           "ActionOnFailure": "CONTINUE",
           "Status": {
               "State": "CANCELLED",
               "StateChangeReason": {
                   "Message": "Cancelled by user"
               },
               "Timeline": {
                   "CreationDateTime": "2023-07-18T09:48:45.141000-07:00"
               }
           }
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to