Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2b93f8888 -> 6c3c8f445


[AIRFLOW-2542][AIRFLOW-1790] Rename AWS Batch Operator queue to job_queue

- Improved the retries times to jobs below 60s
- Renamed property queue to job_queue to prevent
AWS Batch and CeleryExecutor queue conflict
- Added Breaking Chain note for the UPDATING.md
master
- Fixed operator infinit loop
- Added documentation warning about the Breaking
chain
- Fixed the commit parameter to keep it on Airflow
guidelines
- Fixed logging typo
- rebased with master

Changes to be committed:
        modified:   ../../../UPDATING.md
        modified:   awsbatch_operator.py
        modified:   ../../../tests/contrib/operators/test_
awsbatch_operator.py

Closes #3436 from hprudent/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6c3c8f44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6c3c8f44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6c3c8f44

Branch: refs/heads/master
Commit: 6c3c8f445959dfec1a7ebea84b8941c4c2265e9d
Parents: 2b93f88
Author: Hugo Prudente <[email protected]>
Authored: Tue Jun 19 10:00:42 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Jun 19 10:00:47 2018 +0200

----------------------------------------------------------------------
 UPDATING.md                                     |  1 +
 airflow/contrib/operators/awsbatch_operator.py  | 23 +++++++++++---------
 .../contrib/operators/test_awsbatch_operator.py |  4 ++--
 3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 1f2eb83..3a66e73 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -35,6 +35,7 @@ Run `airflow webserver` to start the new UI. This will bring 
up a log in page, e
 There are five roles created for Airflow by default: Admin, User, Op, Viewer, 
and Public. To configure roles/permissions, go to the `Security` tab and click 
`List Roles` in the new UI.
 
 #### Breaking changes
+- AWS Batch Operator renamed property queue to job_queue to prevent conflict 
with the internal queue from CeleryExecutor - AIRFLOW-2542
 - Users created and stored in the old users table will not be migrated 
automatically. FAB's built-in authentication support must be reconfigured.
 - Airflow dag home page is now `/home` (instead of `/admin`).
 - All ModelViews in Flask-AppBuilder follow a different pattern from 
Flask-Admin. The `/admin` part of the url path will no longer exist. For 
example: `/admin/connection` becomes `/connection/list`, 
`/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` 
becomes `/connection/edit`, etc.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/airflow/contrib/operators/awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/awsbatch_operator.py 
b/airflow/contrib/operators/awsbatch_operator.py
index 6e8e2a4..d23b44e 100644
--- a/airflow/contrib/operators/awsbatch_operator.py
+++ b/airflow/contrib/operators/awsbatch_operator.py
@@ -33,17 +33,20 @@ class AWSBatchOperator(BaseOperator):
     """
     Execute a job on AWS Batch Service
 
+    .. warning: the queue parameter was renamed to job_queue to segreggate the
+                internal CeleryExecutor queue from the AWS Batch internal 
queue.
+
     :param job_name: the name for the job that will run on AWS Batch
     :type job_name: str
     :param job_definition: the job definition name on AWS Batch
     :type job_definition: str
-    :param queue: the queue name on AWS Batch
-    :type queue: str
+    :param job_queue: the queue name on AWS Batch
+    :type job_queue: str
     :param: overrides: the same parameter that boto3 will receive on
             containerOverrides (templated):
             
http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job
     :type: overrides: dict
-    :param max_retries: exponential backoff retries while waiter is not merged
+    :param max_retries: exponential backoff retries while waiter is not 
merged, 4200 = 48 hours
     :type max_retries: int
     :param aws_conn_id: connection id of AWS credentials / region name. If 
None,
             credential boto3 strategy will be used
@@ -59,7 +62,7 @@ class AWSBatchOperator(BaseOperator):
     template_fields = ('overrides',)
 
     @apply_defaults
-    def __init__(self, job_name, job_definition, queue, overrides, 
max_retries=288,
+    def __init__(self, job_name, job_definition, job_queue, overrides, 
max_retries=4200,
                  aws_conn_id=None, region_name=None, **kwargs):
         super(AWSBatchOperator, self).__init__(**kwargs)
 
@@ -67,7 +70,7 @@ class AWSBatchOperator(BaseOperator):
         self.aws_conn_id = aws_conn_id
         self.region_name = region_name
         self.job_definition = job_definition
-        self.queue = queue
+        self.job_queue = job_queue
         self.overrides = overrides
         self.max_retries = max_retries
 
@@ -79,7 +82,7 @@ class AWSBatchOperator(BaseOperator):
     def execute(self, context):
         self.log.info(
             'Running AWS Batch Job - Job definition: %s - on queue %s',
-            self.job_definition, self.queue
+            self.job_definition, self.job_queue
         )
         self.log.info('AWSBatchOperator overrides: %s', self.overrides)
 
@@ -91,7 +94,7 @@ class AWSBatchOperator(BaseOperator):
         try:
             response = self.client.submit_job(
                 jobName=self.job_name,
-                jobQueue=self.queue,
+                jobQueue=self.job_queue,
                 jobDefinition=self.job_definition,
                 containerOverrides=self.overrides)
 
@@ -128,14 +131,15 @@ class AWSBatchOperator(BaseOperator):
             retry = True
             retries = 0
 
-            while retries < self.max_retries or retry:
+            while retries < self.max_retries and retry:
+                self.log.info('AWS Batch retry in the next %s seconds', 
retries)
                 response = self.client.describe_jobs(
                     jobs=[self.jobId]
                 )
                 if response['jobs'][-1]['status'] in ['SUCCEEDED', 'FAILED']:
                     retry = False
 
-                sleep(pow(2, retries) * 100)
+                sleep( 1 + pow(retries * 0.1, 2))
                 retries += 1
 
     def _check_success_task(self):
@@ -153,7 +157,6 @@ class AWSBatchOperator(BaseOperator):
                 for container in containers:
                     if (job['status'] == 'FAILED' or
                             container['container']['exitCode'] != 0):
-                        print("@@@@")
                         raise AirflowException(
                             'This containers encounter an error during '
                             'execution {}'.format(job))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/tests/contrib/operators/test_awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_awsbatch_operator.py 
b/tests/contrib/operators/test_awsbatch_operator.py
index f7ee8fa..66540bc 100644
--- a/tests/contrib/operators/test_awsbatch_operator.py
+++ b/tests/contrib/operators/test_awsbatch_operator.py
@@ -50,7 +50,7 @@ class TestAWSBatchOperator(unittest.TestCase):
         self.batch = AWSBatchOperator(
             task_id='task',
             job_name='51455483-c62c-48ac-9b88-53a6a725baa3',
-            queue='queue',
+            job_queue='queue',
             job_definition='hello-world',
             max_retries=5,
             overrides={},
@@ -60,7 +60,7 @@ class TestAWSBatchOperator(unittest.TestCase):
     def test_init(self):
 
         self.assertEqual(self.batch.job_name, 
'51455483-c62c-48ac-9b88-53a6a725baa3')
-        self.assertEqual(self.batch.queue, 'queue')
+        self.assertEqual(self.batch.job_queue, 'queue')
         self.assertEqual(self.batch.job_definition, 'hello-world')
         self.assertEqual(self.batch.max_retries, 5)
         self.assertEqual(self.batch.overrides, {})

Reply via email to