This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bbea9217a Get failure information on EMR job failure (#32151)
8bbea9217a is described below

commit 8bbea9217ae0a3f1049cd10bac838b041b07b7af
Author: Syed Hussain <[email protected]>
AuthorDate: Tue Aug 8 09:10:22 2023 -0700

    Get failure information on EMR job failure (#32151)
    
    * Use util wait method in wait_for_completion.
    * Add logs to display failure reason if EMR Job fails
    * Fix waiter parameters, use FailureDetails instead of StateChangeReason
    * Only log failure details if it is inlcuded in the response
---
 airflow/providers/amazon/aws/hooks/emr.py          | 32 +++++++++++-------
 airflow/providers/amazon/aws/operators/emr.py      |  4 +--
 tests/providers/amazon/aws/hooks/test_emr.py       | 38 ++++++++++++++++++++++
 .../amazon/aws/operators/test_emr_add_steps.py     |  4 +--
 4 files changed, 63 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/emr.py 
b/airflow/providers/amazon/aws/hooks/emr.py
index dea61c0858..185c7ab13f 100644
--- a/airflow/providers/amazon/aws/hooks/emr.py
+++ b/airflow/providers/amazon/aws/hooks/emr.py
@@ -26,7 +26,7 @@ from botocore.exceptions import ClientError
 
 from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.utils.helpers import prune_dict
+from airflow.providers.amazon.aws.utils.waiter_with_logging import wait
 
 
 class EmrHook(AwsBaseHook):
@@ -158,6 +158,9 @@ class EmrHook(AwsBaseHook):
         :param execution_role_arn: The ARN of the runtime role for a step on 
the cluster.
         """
         config = {}
+        waiter_delay = waiter_delay or 30
+        waiter_max_attempts = waiter_max_attempts or 60
+
         if execution_role_arn:
             config["ExecutionRoleArn"] = execution_role_arn
         response = self.get_conn().add_job_flow_steps(JobFlowId=job_flow_id, 
Steps=steps, **config)
@@ -169,16 +172,23 @@ class EmrHook(AwsBaseHook):
         if wait_for_completion:
             waiter = self.get_conn().get_waiter("step_complete")
             for step_id in response["StepIds"]:
-                waiter.wait(
-                    ClusterId=job_flow_id,
-                    StepId=step_id,
-                    WaiterConfig=prune_dict(
-                        {
-                            "Delay": waiter_delay,
-                            "MaxAttempts": waiter_max_attempts,
-                        }
-                    ),
-                )
+                try:
+                    wait(
+                        waiter=waiter,
+                        waiter_max_attempts=waiter_max_attempts,
+                        waiter_delay=waiter_delay,
+                        args={"ClusterId": job_flow_id, "StepId": step_id},
+                        failure_message=f"EMR Steps failed: {step_id}",
+                        status_message="EMR Step status is",
+                        status_args=["Step.Status.State", 
"Step.Status.StateChangeReason"],
+                    )
+                except AirflowException as ex:
+                    if "EMR Steps failed" in str(ex):
+                        resp = 
self.get_conn().describe_step(ClusterId=job_flow_id, StepId=step_id)
+                        failure_details = 
resp["Step"]["Status"].get("FailureDetails", None)
+                        if failure_details:
+                            self.log.error("EMR Steps failed: %s", 
failure_details)
+                    raise
         return response["StepIds"]
 
     def test_connection(self):
diff --git a/airflow/providers/amazon/aws/operators/emr.py 
b/airflow/providers/amazon/aws/operators/emr.py
index f75b12327f..1bf2375a16 100644
--- a/airflow/providers/amazon/aws/operators/emr.py
+++ b/airflow/providers/amazon/aws/operators/emr.py
@@ -100,8 +100,8 @@ class EmrAddStepsOperator(BaseOperator):
         aws_conn_id: str = "aws_default",
         steps: list[dict] | str | None = None,
         wait_for_completion: bool = False,
-        waiter_delay: int | None = None,
-        waiter_max_attempts: int | None = None,
+        waiter_delay: int | None = 30,
+        waiter_max_attempts: int | None = 60,
         execution_role_arn: str | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
diff --git a/tests/providers/amazon/aws/hooks/test_emr.py 
b/tests/providers/amazon/aws/hooks/test_emr.py
index 0507ad3718..c68b25fbdf 100644
--- a/tests/providers/amazon/aws/hooks/test_emr.py
+++ b/tests/providers/amazon/aws/hooks/test_emr.py
@@ -22,6 +22,7 @@ from unittest import mock
 
 import boto3
 import pytest
+from botocore.exceptions import WaiterError
 from moto import mock_emr
 
 from airflow.exceptions import AirflowException
@@ -113,6 +114,43 @@ class TestEmrHook:
 
         mock_conn.get_waiter.assert_called_once_with("step_complete")
 
+    @mock.patch("time.sleep", return_value=True)
+    @mock.patch.object(EmrHook, "conn")
+    def test_add_job_flow_steps_raises_exception_on_failure(self, mock_conn, 
mock_sleep, caplog):
+        hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default", 
region_name="us-east-1")
+        mock_conn.describe_step.return_value = {
+            "Step": {
+                "Status": {
+                    "State": "FAILED",
+                    "FailureDetails": "test failure details",
+                }
+            }
+        }
+        mock_conn.add_job_flow_steps.return_value = {
+            "StepIds": [
+                "step_id",
+            ],
+            "ResponseMetadata": {"HTTPStatusCode": 200},
+        }
+        steps = [
+            {
+                "ActionOnFailure": "test_step",
+                "HadoopJarStep": {
+                    "Args": ["test args"],
+                    "Jar": "test.jar",
+                },
+                "Name": "step_1",
+            }
+        ]
+        waiter_error = WaiterError(name="test_error", reason="test_reason", 
last_response={})
+        waiter_error_failure = WaiterError(name="test_error", reason="terminal 
failure", last_response={})
+        mock_conn.get_waiter().wait.side_effect = [waiter_error, 
waiter_error_failure]
+
+        with pytest.raises(AirflowException):
+            hook.add_job_flow_steps(job_flow_id="job_flow_id", steps=steps, 
wait_for_completion=True)
+        assert "test failure details" in caplog.messages[-1]
+        mock_conn.get_waiter.assert_called_with("step_complete")
+
     @mock_emr
     def test_create_job_flow_extra_args(self):
         """
diff --git a/tests/providers/amazon/aws/operators/test_emr_add_steps.py 
b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
index 0b279c051f..a2c69b6599 100644
--- a/tests/providers/amazon/aws/operators/test_emr_add_steps.py
+++ b/tests/providers/amazon/aws/operators/test_emr_add_steps.py
@@ -241,8 +241,8 @@ class TestEmrAddStepsOperator:
             job_flow_id=job_flow_id,
             steps=[],
             wait_for_completion=False,
-            waiter_delay=None,
-            waiter_max_attempts=None,
+            waiter_delay=30,
+            waiter_max_attempts=60,
             execution_role_arn=None,
         )
 

Reply via email to