Repository: incubator-airflow
Updated Branches:
  refs/heads/master a8dd6d86d -> 0c5ebcbd1


[AIRFLOW-2706] AWS Batch Operator should use top-level job state to determine 
status

Rather than inspecting the state of job attempts,
the operator should use the top-level job status
to determine the overall success or failure of the
task. This means the following cases are handled
correctly:

1. Any infrastructure failure that results in no
attempts being performed is now detected.
2. Any retry policy that AWS Batch will do is now
honored -- the job isn't marked FAILED until all
   attempts to retry have failed. Previously, the
first failed *attempt* would make the task as
   failed.

Closes #3567 from craigforster/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0c5ebcbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0c5ebcbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0c5ebcbd

Branch: refs/heads/master
Commit: 0c5ebcbd1e1b26664061f2db889748f0085d02fe
Parents: a8dd6d8
Author: Craig Forster <[email protected]>
Authored: Fri Jul 6 22:30:02 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Fri Jul 6 22:30:02 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/awsbatch_operator.py  | 22 +++++++++++---------
 .../contrib/operators/test_awsbatch_operator.py |  6 ++++--
 2 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0c5ebcbd/airflow/contrib/operators/awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/awsbatch_operator.py 
b/airflow/contrib/operators/awsbatch_operator.py
index e07838b..75706aa 100644
--- a/airflow/contrib/operators/awsbatch_operator.py
+++ b/airflow/contrib/operators/awsbatch_operator.py
@@ -152,17 +152,19 @@ class AWSBatchOperator(BaseOperator):
             raise AirflowException('No job found for {}'.format(response))
 
         for job in response['jobs']:
-            if 'attempts' in job:
-                containers = job['attempts']
-                for container in containers:
-                    if (job['status'] == 'FAILED' or
-                            container['container']['exitCode'] != 0):
-                        raise AirflowException(
-                            'This containers encounter an error during '
-                            'execution {}'.format(job))
-            elif job['status'] is not 'SUCCEEDED':
+            job_status = job['status']
+            if job_status is 'FAILED':
+                reason = job['statusReason']
+                raise AirflowException('Job failed with status 
{}'.format(reason))
+            elif job_status in [
+                'SUBMITTED',
+                'PENDING',
+                'RUNNABLE',
+                'STARTING',
+                'RUNNING'
+            ]:
                 raise AirflowException(
-                    'This task is still pending {}'.format(job['status']))
+                    'This task is still pending {}'.format(job_status))
 
     def get_hook(self):
         return AwsHook(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0c5ebcbd/tests/contrib/operators/test_awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_awsbatch_operator.py 
b/tests/contrib/operators/test_awsbatch_operator.py
index d09327d..11725ca 100644
--- a/tests/contrib/operators/test_awsbatch_operator.py
+++ b/tests/contrib/operators/test_awsbatch_operator.py
@@ -147,6 +147,7 @@ class TestAWSBatchOperator(unittest.TestCase):
         client_mock.describe_jobs.return_value = {
             'jobs': [{
                 'status': 'FAILED',
+                'statusReason': 'This is an error reason',
                 'attempts': [{
                     'exitCode': 1
                 }]
@@ -157,7 +158,7 @@ class TestAWSBatchOperator(unittest.TestCase):
             self.batch._check_success_task()
 
         # Ordering of str(dict) is not guaranteed.
-        self.assertIn('This containers encounter an error during execution ', 
str(e.exception))
+        self.assertIn('Job failed with status ', str(e.exception))
 
     def test_check_success_tasks_raises_pending(self):
         client_mock = mock.Mock()
@@ -184,6 +185,7 @@ class TestAWSBatchOperator(unittest.TestCase):
         client_mock.describe_jobs.return_value = {
             'jobs': [{
                 'status': 'FAILED',
+                'statusReason': 'This is an error reason',
                 'attempts': [{
                     'exitCode': 1
                 }, {
@@ -196,7 +198,7 @@ class TestAWSBatchOperator(unittest.TestCase):
             self.batch._check_success_task()
 
         # Ordering of str(dict) is not guaranteed.
-        self.assertIn('This containers encounter an error during execution ', 
str(e.exception))
+        self.assertIn('Job failed with status ', str(e.exception))
 
     def test_check_success_task_not_raises(self):
         client_mock = mock.Mock()

Reply via email to