feluelle commented on a change in pull request #11883:
URL: https://github.com/apache/airflow/pull/11883#discussion_r517300737
##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -21,24 +21,34 @@
from airflow.providers.amazon.aws.sensors.emr_base import EmrBaseSensor
from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+from airflow.models.xcom import XCOM_RETURN_KEY
+
class EmrStepSensor(EmrBaseSensor):
"""
Asks for the state of the step until it reaches any of the target states.
+ If no step Id has been provided the sensor retrieves all steps and current
states
+ and will continue to ask until all steps reaches any of the target states.
+
If it fails the sensor errors, failing the task.
With the default target states, sensor waits step to be completed.
:param job_flow_id: job_flow_id which contains the step check the state of
:type job_flow_id: str
:param step_id: step to check the state of
- :type step_id: str
+ :type step_id: str or list
Review comment:
```suggestion
:type step_id: Optional[str]
```
##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -50,16 +60,72 @@ def __init__(
*,
job_flow_id: str,
step_id: str,
Review comment:
```suggestion
step_id: Optional[str],
```
##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -21,24 +21,34 @@
from airflow.providers.amazon.aws.sensors.emr_base import EmrBaseSensor
from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import AirflowException
+from airflow.models.xcom import XCOM_RETURN_KEY
+
class EmrStepSensor(EmrBaseSensor):
"""
Asks for the state of the step until it reaches any of the target states.
+ If no step Id has been provided the sensor retrieves all steps and current
states
+ and will continue to ask until all steps reaches any of the target states.
Review comment:
> We've tested with EmrJobFlowSensor but it isn't behaving as we'd like.
The cluster moves in to TERMINATED state even if some steps fail, I guess
because we are setting the steps with CONTINUE status. In our use case the
steps are independent and we therefore want the others to run even if some do
fail.
>
> I would have expected the cluster to move into TERMINATED_WITH_ERRORS but
this isn't clearly documented by AWS at all. We've logged a support case to get
clarity on it. Maybe it is a bug, but unlikely.... They will probably just say
they will update the documentation.
Fine :) That's what I thought - that you can differ between terminated state
with succeeded tasks and failed tasks. If that's not the case we can do it like
proposed in the PR.
Regarding the seperation of the `EmrStepSensor` I've changed my mind. It
makes sense to keep it in `EmrStepSensor` like we have the `ExternalTaskSensor`
also for the DAG - not only for a specific task.
> We could add to the EmrJobFlowSensor to optionally also check the status
of the steps before succeeding, and enable that check by default. I think it
would be unexpected behavior that the operator is successful even if some steps
weren't successful.
SGTM, too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]