Repository: incubator-airflow
Updated Branches:
refs/heads/master 2b93f8888 -> 6c3c8f445
[AIRFLOW-2542][AIRFLOW-1790] Rename AWS Batch Operator queue to job_queue
- Improved the retries times to jobs below 60s
- Renamed property queue to job_queue to prevent
AWS Batch and CeleryExecutor queue conflict
- Added Breaking Chain note for the UPDATING.md
master
- Fixed operator infinit loop
- Added documentation warning about the Breaking
chain
- Fixed the commit parameter to keep it on Airflow
guidelines
- Fixed logging typo
- rebased with master
Changes to be committed:
modified: ../../../UPDATING.md
modified: awsbatch_operator.py
modified: ../../../tests/contrib/operators/test_
awsbatch_operator.py
Closes #3436 from hprudent/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6c3c8f44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6c3c8f44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6c3c8f44
Branch: refs/heads/master
Commit: 6c3c8f445959dfec1a7ebea84b8941c4c2265e9d
Parents: 2b93f88
Author: Hugo Prudente <[email protected]>
Authored: Tue Jun 19 10:00:42 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Jun 19 10:00:47 2018 +0200
----------------------------------------------------------------------
UPDATING.md | 1 +
airflow/contrib/operators/awsbatch_operator.py | 23 +++++++++++---------
.../contrib/operators/test_awsbatch_operator.py | 4 ++--
3 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 1f2eb83..3a66e73 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -35,6 +35,7 @@ Run `airflow webserver` to start the new UI. This will bring
up a log in page, e
There are five roles created for Airflow by default: Admin, User, Op, Viewer,
and Public. To configure roles/permissions, go to the `Security` tab and click
`List Roles` in the new UI.
#### Breaking changes
+- AWS Batch Operator renamed property queue to job_queue to prevent conflict
with the internal queue from CeleryExecutor - AIRFLOW-2542
- Users created and stored in the old users table will not be migrated
automatically. FAB's built-in authentication support must be reconfigured.
- Airflow dag home page is now `/home` (instead of `/admin`).
- All ModelViews in Flask-AppBuilder follow a different pattern from
Flask-Admin. The `/admin` part of the url path will no longer exist. For
example: `/admin/connection` becomes `/connection/list`,
`/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit`
becomes `/connection/edit`, etc.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/airflow/contrib/operators/awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/awsbatch_operator.py
b/airflow/contrib/operators/awsbatch_operator.py
index 6e8e2a4..d23b44e 100644
--- a/airflow/contrib/operators/awsbatch_operator.py
+++ b/airflow/contrib/operators/awsbatch_operator.py
@@ -33,17 +33,20 @@ class AWSBatchOperator(BaseOperator):
"""
Execute a job on AWS Batch Service
+ .. warning: the queue parameter was renamed to job_queue to segreggate the
+ internal CeleryExecutor queue from the AWS Batch internal
queue.
+
:param job_name: the name for the job that will run on AWS Batch
:type job_name: str
:param job_definition: the job definition name on AWS Batch
:type job_definition: str
- :param queue: the queue name on AWS Batch
- :type queue: str
+ :param job_queue: the queue name on AWS Batch
+ :type job_queue: str
:param: overrides: the same parameter that boto3 will receive on
containerOverrides (templated):
http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job
:type: overrides: dict
- :param max_retries: exponential backoff retries while waiter is not merged
+ :param max_retries: exponential backoff retries while waiter is not
merged, 4200 = 48 hours
:type max_retries: int
:param aws_conn_id: connection id of AWS credentials / region name. If
None,
credential boto3 strategy will be used
@@ -59,7 +62,7 @@ class AWSBatchOperator(BaseOperator):
template_fields = ('overrides',)
@apply_defaults
- def __init__(self, job_name, job_definition, queue, overrides,
max_retries=288,
+ def __init__(self, job_name, job_definition, job_queue, overrides,
max_retries=4200,
aws_conn_id=None, region_name=None, **kwargs):
super(AWSBatchOperator, self).__init__(**kwargs)
@@ -67,7 +70,7 @@ class AWSBatchOperator(BaseOperator):
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.job_definition = job_definition
- self.queue = queue
+ self.job_queue = job_queue
self.overrides = overrides
self.max_retries = max_retries
@@ -79,7 +82,7 @@ class AWSBatchOperator(BaseOperator):
def execute(self, context):
self.log.info(
'Running AWS Batch Job - Job definition: %s - on queue %s',
- self.job_definition, self.queue
+ self.job_definition, self.job_queue
)
self.log.info('AWSBatchOperator overrides: %s', self.overrides)
@@ -91,7 +94,7 @@ class AWSBatchOperator(BaseOperator):
try:
response = self.client.submit_job(
jobName=self.job_name,
- jobQueue=self.queue,
+ jobQueue=self.job_queue,
jobDefinition=self.job_definition,
containerOverrides=self.overrides)
@@ -128,14 +131,15 @@ class AWSBatchOperator(BaseOperator):
retry = True
retries = 0
- while retries < self.max_retries or retry:
+ while retries < self.max_retries and retry:
+ self.log.info('AWS Batch retry in the next %s seconds',
retries)
response = self.client.describe_jobs(
jobs=[self.jobId]
)
if response['jobs'][-1]['status'] in ['SUCCEEDED', 'FAILED']:
retry = False
- sleep(pow(2, retries) * 100)
+ sleep( 1 + pow(retries * 0.1, 2))
retries += 1
def _check_success_task(self):
@@ -153,7 +157,6 @@ class AWSBatchOperator(BaseOperator):
for container in containers:
if (job['status'] == 'FAILED' or
container['container']['exitCode'] != 0):
- print("@@@@")
raise AirflowException(
'This containers encounter an error during '
'execution {}'.format(job))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6c3c8f44/tests/contrib/operators/test_awsbatch_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_awsbatch_operator.py
b/tests/contrib/operators/test_awsbatch_operator.py
index f7ee8fa..66540bc 100644
--- a/tests/contrib/operators/test_awsbatch_operator.py
+++ b/tests/contrib/operators/test_awsbatch_operator.py
@@ -50,7 +50,7 @@ class TestAWSBatchOperator(unittest.TestCase):
self.batch = AWSBatchOperator(
task_id='task',
job_name='51455483-c62c-48ac-9b88-53a6a725baa3',
- queue='queue',
+ job_queue='queue',
job_definition='hello-world',
max_retries=5,
overrides={},
@@ -60,7 +60,7 @@ class TestAWSBatchOperator(unittest.TestCase):
def test_init(self):
self.assertEqual(self.batch.job_name,
'51455483-c62c-48ac-9b88-53a6a725baa3')
- self.assertEqual(self.batch.queue, 'queue')
+ self.assertEqual(self.batch.job_queue, 'queue')
self.assertEqual(self.batch.job_definition, 'hello-world')
self.assertEqual(self.batch.max_retries, 5)
self.assertEqual(self.batch.overrides, {})