josh-fell commented on code in PR #26035:
URL: https://github.com/apache/airflow/pull/26035#discussion_r957349674
##########
airflow/providers/amazon/aws/sensors/emr.py:
##########
@@ -470,3 +470,102 @@ def failure_message_from_response(response: Dict[str,
Any]) -> Optional[str]:
f"with message {fail_details.get('Message')} and log file
{fail_details.get('LogFile')}"
)
return None
+
+
+class EmrStepsSensor(EmrBaseSensor):
+ """
+ Asks for the state of the step until it reaches any of the target states.
+ If it fails the sensor errors, failing the task.
+ With the default target states, sensor waits step to be completed.
+ .. seealso::
+ For more information on how to use this sensor, take a look at the
guide:
+ :ref:`howto/sensor:EmrStepSensor`
+ :param job_flow_id: job_flow_id which contains the step check the state of
+ :param step_id: step to check the state of
+ :param target_states: the target states, sensor waits until
+ step reaches any of these states
+ :param failed_states: the failure states, sensor fails when
+ step reaches any of these states
+ """
+
+ template_fields: Sequence[str] = ("job_flow_id", "target_states",
"failed_states")
+ template_ext: Sequence[str] = ()
+
+ def __init__(
+ self,
+ *,
+ job_flow_id: str,
+ target_states: Optional[Iterable[str]] = None,
+ failed_states: Optional[Iterable[str]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.job_flow_id = job_flow_id
+ self.target_states = target_states or ["COMPLETED"]
+ self.failed_states = failed_states or ["CANCELLED", "FAILED",
"INTERRUPTED"]
+
+ def get_emr_response(self) -> Dict[str, Any]:
+ """
+ Make an API call with boto3 and get details about the cluster step.
+ .. seealso::
+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_step
+ :return: response
+ :rtype: dict[str, Any]
+ """
+ emr_client = self.get_hook().get_conn()
+
+ self.log.info("Poking steps on cluster %s", self.job_flow_id)
+ # return emr_client.describe_step(ClusterId=self.job_flow_id,
StepId="qq")
Review Comment:
```suggestion
```
Seems unneeded?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]