Repository: incubator-airflow Updated Branches: refs/heads/master 61419ddc8 -> 404bee8d8
[AIRFLOW-1436][AIRFLOW-1475] EmrJobFlowSensor considers Cancelled step as Successful Closes #2937 from Swalloow/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/404bee8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/404bee8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/404bee8d Branch: refs/heads/master Commit: 404bee8d859ee110ae1cbc80372aadf36edc58a5 Parents: 61419dd Author: Swalloow <[email protected]> Authored: Fri Jan 12 09:26:10 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jan 12 09:26:10 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/sensors/emr_base_sensor.py | 2 +- airflow/contrib/sensors/emr_job_flow_sensor.py | 2 +- airflow/contrib/sensors/emr_step_sensor.py | 4 +- tests/contrib/sensors/test_emr_base_sensor.py | 9 +-- tests/contrib/sensors/test_emr_step_sensor.py | 87 +++++++++++++++------ 5 files changed, 69 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py index 3ecaa42..c6f96f8 100644 --- a/airflow/contrib/sensors/emr_base_sensor.py +++ b/airflow/contrib/sensors/emr_base_sensor.py @@ -45,7 +45,7 @@ class EmrBaseSensor(BaseSensorOperator): if state in self.NON_TERMINAL_STATES: return False - if state == self.FAILED_STATE: + if state in self.FAILED_STATE: raise AirflowException('EMR job failed') return True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_job_flow_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py index 87b65c8..a437fc3 100644 --- a/airflow/contrib/sensors/emr_job_flow_sensor.py +++ b/airflow/contrib/sensors/emr_job_flow_sensor.py @@ -26,7 +26,7 @@ class EmrJobFlowSensor(EmrBaseSensor): """ NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING'] - FAILED_STATE = 'TERMINATED_WITH_ERRORS' + FAILED_STATE = ['TERMINATED_WITH_ERRORS'] template_fields = ['job_flow_id'] template_ext = () http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/airflow/contrib/sensors/emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py index 003d2d1..c5a450d 100644 --- a/airflow/contrib/sensors/emr_step_sensor.py +++ b/airflow/contrib/sensors/emr_step_sensor.py @@ -21,14 +21,14 @@ class EmrStepSensor(EmrBaseSensor): Asks for the state of the step until it reaches a terminal state. If it fails the sensor errors, failing the task. - :param job_flow_id: job_flow_idwhich contains the step check the state of + :param job_flow_id: job_flow_id which contains the step check the state of :type job_flow_id: string :param step_id: step to check the state of :type step_id: string """ NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' + FAILED_STATE = ['CANCELLED', 'FAILED'] template_fields = ['job_flow_id', 'step_id'] template_ext = () http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py index 9c39abb..970d189 100644 --- a/tests/contrib/sensors/test_emr_base_sensor.py +++ b/tests/contrib/sensors/test_emr_base_sensor.py @@ -26,7 +26,7 @@ class TestEmrBaseSensor(unittest.TestCase): def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self): class EmrBaseSensorSubclass(EmrBaseSensor): NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' + FAILED_STATE = ['FAILED'] def get_emr_response(self): return { @@ -49,7 +49,7 @@ class TestEmrBaseSensor(unittest.TestCase): def test_poke_returns_false_when_state_is_a_non_terminal_state(self): class EmrBaseSensorSubclass(EmrBaseSensor): NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' + FAILED_STATE = ['FAILED'] def get_emr_response(self): return { @@ -72,7 +72,7 @@ class TestEmrBaseSensor(unittest.TestCase): def test_poke_returns_false_when_http_response_is_bad(self): class EmrBaseSensorSubclass(EmrBaseSensor): NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' + FAILED_STATE = ['FAILED'] def get_emr_response(self): return { @@ -96,7 +96,7 @@ class TestEmrBaseSensor(unittest.TestCase): def test_poke_raises_error_when_job_has_failed(self): class EmrBaseSensorSubclass(EmrBaseSensor): NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' + FAILED_STATE = ['FAILED'] def get_emr_response(self): return { @@ -118,7 +118,6 @@ class TestEmrBaseSensor(unittest.TestCase): operator.poke(None) - self.assertIn('EMR job failed', str(context.exception)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/404bee8d/tests/contrib/sensors/test_emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_emr_step_sensor.py b/tests/contrib/sensors/test_emr_step_sensor.py index 58ee461..b5d43fb 100644 --- a/tests/contrib/sensors/test_emr_step_sensor.py +++ b/tests/contrib/sensors/test_emr_step_sensor.py @@ -13,12 +13,11 @@ # limitations under the License. import unittest -import datetime -from dateutil.tz import tzlocal +from datetime import datetime from mock import MagicMock, patch -import boto3 +from dateutil.tz import tzlocal -from airflow import configuration +from airflow import configuration, AirflowException from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor DESCRIBE_JOB_STEP_RUNNING_RETURN = { @@ -43,8 +42,37 @@ DESCRIBE_JOB_STEP_RUNNING_RETURN = { 'State': 'RUNNING', 'StateChangeReason': {}, 'Timeline': { - 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), - 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) + 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()), + 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal()) + } + } + } +} + +DESCRIBE_JOB_STEP_CANCELLED_RETURN = { + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6' + }, + 'Step': { + 'ActionOnFailure': 'CONTINUE', + 'Config': { + 'Args': [ + '/usr/lib/spark/bin/run-example', + 'SparkPi', + '10' + ], + 'Jar': 'command-runner.jar', + 'Properties': {} + }, + 'Id': 's-VK57YR1Z9Z5N', + 'Name': 'calculate_pi', + 'Status': { + 'State': 'CANCELLED', + 'StateChangeReason': {}, + 'Timeline': { + 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()), + 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal()) } } } @@ -72,8 +100,8 @@ DESCRIBE_JOB_STEP_COMPLETED_RETURN = { 'State': 'COMPLETED', 'StateChangeReason': {}, 'Timeline': { - 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), - 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) + 'CreationDateTime': datetime(2016, 6, 20, 19, 0, 18, tzinfo=tzlocal()), + 'StartDateTime': datetime(2016, 6, 20, 19, 2, 34, tzinfo=tzlocal()) } } } @@ -84,35 +112,42 @@ class TestEmrStepSensor(unittest.TestCase): def setUp(self): configuration.load_test_config() - # Mock out the emr_client (moto has incorrect response) - self.mock_emr_client = MagicMock() - self.mock_emr_client.describe_step.side_effect = [ + self.emr_client_mock = MagicMock() + self.sensor = EmrStepSensor( + task_id='test_task', + poke_interval=1, + job_flow_id='j-8989898989', + step_id='s-VK57YR1Z9Z5N', + aws_conn_id='aws_default', + ) + + def test_step_completed(self): + self.emr_client_mock.describe_step.side_effect = [ DESCRIBE_JOB_STEP_RUNNING_RETURN, DESCRIBE_JOB_STEP_COMPLETED_RETURN ] - # Mock out the emr_client creator - self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client) - + self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock) - def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self): with patch('boto3.client', self.boto3_client_mock): + self.sensor.execute(None) - operator = EmrStepSensor( - task_id='test_task', - poke_interval=1, - job_flow_id='j-8989898989', - step_id='s-VK57YR1Z9Z5N', - aws_conn_id='aws_default', + self.assertEqual(self.emr_client_mock.describe_step.call_count, 2) + self.emr_client_mock.describe_step.assert_called_with( + ClusterId='j-8989898989', + StepId='s-VK57YR1Z9Z5N' ) - operator.execute(None) + def test_step_cancelled(self): + self.emr_client_mock.describe_step.side_effect = [ + DESCRIBE_JOB_STEP_RUNNING_RETURN, + DESCRIBE_JOB_STEP_CANCELLED_RETURN + ] - # make sure we called twice - self.assertEqual(self.mock_emr_client.describe_step.call_count, 2) + self.boto3_client_mock = MagicMock(return_value=self.emr_client_mock) - # make sure it was called with the job_flow_id and step_id - self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N') + with patch('boto3.client', self.boto3_client_mock): + self.assertRaises(AirflowException, self.sensor.execute, None) if __name__ == '__main__':
