Repository: incubator-airflow Updated Branches: refs/heads/master a8dd6d86d -> 0c5ebcbd1
[AIRFLOW-2706] AWS Batch Operator should use top-level job state to determine status Rather than inspecting the state of job attempts, the operator should use the top-level job status to determine the overall success or failure of the task. This means the following cases are handled correctly: 1. Any infrastructure failure that results in no attempts being performed is now detected. 2. Any retry policy that AWS Batch will do is now honored -- the job isn't marked FAILED until all attempts to retry have failed. Previously, the first failed *attempt* would make the task as failed. Closes #3567 from craigforster/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0c5ebcbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0c5ebcbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0c5ebcbd Branch: refs/heads/master Commit: 0c5ebcbd1e1b26664061f2db889748f0085d02fe Parents: a8dd6d8 Author: Craig Forster <[email protected]> Authored: Fri Jul 6 22:30:02 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jul 6 22:30:02 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/operators/awsbatch_operator.py | 22 +++++++++++--------- .../contrib/operators/test_awsbatch_operator.py | 6 ++++-- 2 files changed, 16 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0c5ebcbd/airflow/contrib/operators/awsbatch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/awsbatch_operator.py b/airflow/contrib/operators/awsbatch_operator.py index e07838b..75706aa 100644 --- a/airflow/contrib/operators/awsbatch_operator.py +++ b/airflow/contrib/operators/awsbatch_operator.py @@ -152,17 +152,19 @@ class AWSBatchOperator(BaseOperator): raise AirflowException('No job found for {}'.format(response)) for job in response['jobs']: - if 'attempts' in job: - containers = job['attempts'] - for container in containers: - if (job['status'] == 'FAILED' or - container['container']['exitCode'] != 0): - raise AirflowException( - 'This containers encounter an error during ' - 'execution {}'.format(job)) - elif job['status'] is not 'SUCCEEDED': + job_status = job['status'] + if job_status is 'FAILED': + reason = job['statusReason'] + raise AirflowException('Job failed with status {}'.format(reason)) + elif job_status in [ + 'SUBMITTED', + 'PENDING', + 'RUNNABLE', + 'STARTING', + 'RUNNING' + ]: raise AirflowException( - 'This task is still pending {}'.format(job['status'])) + 'This task is still pending {}'.format(job_status)) def get_hook(self): return AwsHook( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0c5ebcbd/tests/contrib/operators/test_awsbatch_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_awsbatch_operator.py b/tests/contrib/operators/test_awsbatch_operator.py index d09327d..11725ca 100644 --- a/tests/contrib/operators/test_awsbatch_operator.py +++ b/tests/contrib/operators/test_awsbatch_operator.py @@ -147,6 +147,7 @@ class TestAWSBatchOperator(unittest.TestCase): client_mock.describe_jobs.return_value = { 'jobs': [{ 'status': 'FAILED', + 'statusReason': 'This is an error reason', 'attempts': [{ 'exitCode': 1 }] @@ -157,7 +158,7 @@ class TestAWSBatchOperator(unittest.TestCase): self.batch._check_success_task() # Ordering of str(dict) is not guaranteed. - self.assertIn('This containers encounter an error during execution ', str(e.exception)) + self.assertIn('Job failed with status ', str(e.exception)) def test_check_success_tasks_raises_pending(self): client_mock = mock.Mock() @@ -184,6 +185,7 @@ class TestAWSBatchOperator(unittest.TestCase): client_mock.describe_jobs.return_value = { 'jobs': [{ 'status': 'FAILED', + 'statusReason': 'This is an error reason', 'attempts': [{ 'exitCode': 1 }, { @@ -196,7 +198,7 @@ class TestAWSBatchOperator(unittest.TestCase): self.batch._check_success_task() # Ordering of str(dict) is not guaranteed. - self.assertIn('This containers encounter an error during execution ', str(e.exception)) + self.assertIn('Job failed with status ', str(e.exception)) def test_check_success_task_not_raises(self): client_mock = mock.Mock()
