nuclearpinguin commented on a change in pull request #7146: [AIRFLOW-6541] Use
EmrJobFlowSensor for other states
URL: https://github.com/apache/airflow/pull/7146#discussion_r367496595
##########
File path: airflow/contrib/sensors/emr_job_flow_sensor.py
##########
@@ -16,48 +16,96 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.utils.decorators import apply_defaults
class EmrJobFlowSensor(EmrBaseSensor):
"""
- Asks for the state of the JobFlow until it reaches a terminal state.
+ Asks for the state of the EMR JobFlow (Cluster) until it reaches
+ any of the target states.
If it fails the sensor errors, failing the task.
+ With the default target states, sensor waits cluster to be terminated.
+ When target_states is set to ['RUNNING', 'WAITING'] sensor waits
+ until job flow to be ready (after 'STARTING' and 'BOOTSTRAPPING' states)
+
:param job_flow_id: job_flow_id to check the state of
:type job_flow_id: str
+
+ :param target_states: the target states, sensor waits until
+ job flow reaches any of these states
+ :type target_states: list[str]
+
+ :param failed_states: the failure states, sensor fails when
+ job flow reaches any of these states
+ :type failed_states: list[str]
"""
- NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING',
- 'WAITING', 'TERMINATING']
- FAILED_STATE = ['TERMINATED_WITH_ERRORS']
- template_fields = ['job_flow_id']
+ template_fields = ['job_flow_id', 'target_states', 'failed_states']
template_ext = ()
@apply_defaults
def __init__(self,
job_flow_id,
+ target_states=None,
+ failed_states=None,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.job_flow_id = job_flow_id
+ if target_states is None:
+ target_states = ['TERMINATED']
+ self.target_states = target_states
+ if failed_states is None:
+ failed_states = ['TERMINATED_WITH_ERRORS']
+ self.failed_states = failed_states
def get_emr_response(self):
Review comment:
```suggestion
def get_emr_response(self) -> Dict[str, Any]:
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services