feluelle commented on a change in pull request #11883:
URL: https://github.com/apache/airflow/pull/11883#discussion_r517300737



##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -21,24 +21,34 @@
 from airflow.providers.amazon.aws.sensors.emr_base import EmrBaseSensor
 from airflow.utils.decorators import apply_defaults
 
+from airflow.exceptions import AirflowException
+from airflow.models.xcom import XCOM_RETURN_KEY
+
 
 class EmrStepSensor(EmrBaseSensor):
     """
     Asks for the state of the step until it reaches any of the target states.
+    If no step Id has been provided the sensor retrieves all steps and current 
states
+    and will continue to ask until all steps reaches any of the target states.
+
     If it fails the sensor errors, failing the task.
 
     With the default target states, sensor waits step to be completed.
 
     :param job_flow_id: job_flow_id which contains the step check the state of
     :type job_flow_id: str
     :param step_id: step to check the state of
-    :type step_id: str
+    :type step_id: str or list

Review comment:
       ```suggestion
       :type step_id: Optional[str]
   ```

##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -50,16 +60,72 @@ def __init__(
         *,
         job_flow_id: str,
         step_id: str,

Review comment:
       ```suggestion
           step_id: Optional[str],
   ```

##########
File path: airflow/providers/amazon/aws/sensors/emr_step.py
##########
@@ -21,24 +21,34 @@
 from airflow.providers.amazon.aws.sensors.emr_base import EmrBaseSensor
 from airflow.utils.decorators import apply_defaults
 
+from airflow.exceptions import AirflowException
+from airflow.models.xcom import XCOM_RETURN_KEY
+
 
 class EmrStepSensor(EmrBaseSensor):
     """
     Asks for the state of the step until it reaches any of the target states.
+    If no step Id has been provided the sensor retrieves all steps and current 
states
+    and will continue to ask until all steps reaches any of the target states.

Review comment:
       > We've tested with EmrJobFlowSensor but it isn't behaving as we'd like. 
The cluster moves in to TERMINATED state even if some steps fail, I guess 
because we are setting the steps with CONTINUE status. In our use case the 
steps are independent and we therefore want the others to run even if some do 
fail.
   > 
   > I would have expected the cluster to move into TERMINATED_WITH_ERRORS but 
this isn't clearly documented by AWS at all. We've logged a support case to get 
clarity on it. Maybe it is a bug, but unlikely.... They will probably just say 
they will update the documentation.
   
   Fine :) That's what I thought - that you can differ between terminated state 
with succeeded tasks and failed tasks. If that's not the case we can do it like 
proposed in the PR.
   
   Regarding the seperation of the `EmrStepSensor` I've changed my mind. It 
makes sense to keep it in `EmrStepSensor` like we have the `ExternalTaskSensor` 
also for the DAG - not only for a specific task.
   
   > We could add to the EmrJobFlowSensor to optionally also check the status 
of the steps before succeeding, and enable that check by default. I think it 
would be unexpected behavior that the operator is successful even if some steps 
weren't successful.
   
   SGTM, too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to