Repository: incubator-airflow
Updated Branches:
  refs/heads/master aedf8de61 -> 65f3b468a


[AIRFLOW-1527] Refactor celery config

The celery config is currently part of the celery executor definition.
This is really inflexible for users wanting to change it. In addition
Celery 4 is moving to lowercase.

Closes #2542 from bolkedebruin/upgrade_celery


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/65f3b468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/65f3b468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/65f3b468

Branch: refs/heads/master
Commit: 65f3b468a28a3232618558e2aebf6239dc87f3c5
Parents: aedf8de
Author: Bolke de Bruin <[email protected]>
Authored: Mon Sep 25 11:19:14 2017 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Mon Sep 25 11:19:16 2017 -0700

----------------------------------------------------------------------
 .travis.yml                                  |  1 +
 airflow/config_templates/default_airflow.cfg |  2 +
 airflow/config_templates/default_celery.py   | 58 +++++++++++++++++++++++
 airflow/executors/celery_executor.py         | 54 +++++----------------
 scripts/ci/airflow_travis.cfg                |  4 +-
 setup.py                                     |  2 +-
 tests/executors/test_celery_executor.py      | 55 +++++++++++++++++++++
 7 files changed, 131 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 90f33e3..d3cd216 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,6 +19,7 @@ jdk:
 services:
   - mysql
   - postgresql
+  - rabbitmq
 addons:
   apt:
     packages:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 94efe60..ef586b8 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -296,6 +296,8 @@ flower_port = 5555
 # Default queue that tasks get assigned to and that worker listen on.
 default_queue = default
 
+# Import path for celery configuration options
+celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
 
 [dask]
 # This section only applies if you are using the DaskExecutor in

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
new file mode 100644
index 0000000..48611cb
--- /dev/null
+++ b/airflow/config_templates/default_celery.py
@@ -0,0 +1,58 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import ssl
+
+from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow import configuration
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+DEFAULT_CELERY_CONFIG = {
+    'accept_content': ['json', 'pickle'],
+    'event_serializer': 'json',
+    'result_serializer': 'pickle',
+    'worker_prefetch_multiplier': 1,
+    'task_acks_late': True,
+    'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
+    'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
+    'broker_url': configuration.get('celery', 'BROKER_URL'),
+    'broker_transport_options': {'visibility_timeout': 21600},
+    'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
+    'worker_concurrency': configuration.getint('celery', 
'CELERYD_CONCURRENCY'),
+}
+
+celery_ssl_active = False
+try:
+    celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
+except AirflowConfigException as e:
+    log = LoggingMixin().log
+    log.warning("Celery Executor will run without SSL")
+
+try:
+    if celery_ssl_active:
+        broker_use_ssl = {'keyfile': configuration.get('celery', 
'CELERY_SSL_KEY'),
+                          'certfile': configuration.get('celery', 
'CELERY_SSL_CERT'),
+                          'ca_certs': configuration.get('celery', 
'CELERY_SSL_CACERT'),
+                          'cert_reqs': ssl.CERT_REQUIRED}
+        DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
+except AirflowConfigException as e:
+    raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, 
'
+                           'please ensure CELERY_SSL_KEY, '
+                           'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
+except Exception as e:
+    raise AirflowException('Exception: There was an unknown Celery SSL Error. '
+                           'Please ensure you want to use '
+                           'SSL and/or have all necessary certs and key 
({}).'.format(e))
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 360a276..7e363db 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -12,20 +12,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from builtins import object
 import subprocess
-import ssl
 import time
-import traceback
 
 from celery import Celery
 from celery import states as celery_states
 
-from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
+from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
 '''
@@ -33,45 +32,14 @@ To start the celery worker, run the command:
 airflow worker
 '''
 
-DEFAULT_QUEUE = configuration.get('celery', 'DEFAULT_QUEUE')
-
-
-class CeleryConfig(object):
-    CELERY_ACCEPT_CONTENT = ['json', 'pickle']
-    CELERY_EVENT_SERIALIZER = 'json'
-    CELERY_RESULT_SERIALIZER = 'pickle'
-    CELERY_TASK_SERIALIZER = 'pickle'
-    CELERYD_PREFETCH_MULTIPLIER = 1
-    CELERY_ACKS_LATE = True
-    BROKER_URL = configuration.get('celery', 'BROKER_URL')
-    CELERY_RESULT_BACKEND = configuration.get('celery', 
'CELERY_RESULT_BACKEND')
-    CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY')
-    CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE
-    CELERY_DEFAULT_EXCHANGE = DEFAULT_QUEUE
-
-    celery_ssl_active = False
-    try:
-        celery_ssl_active = configuration.getboolean('celery', 
'CELERY_SSL_ACTIVE')
-    except AirflowConfigException as e:
-        log = LoggingMixin().log
-        log.warning("Celery Executor will run without SSL")
-
-    try:
-        if celery_ssl_active:
-            BROKER_USE_SSL = {'keyfile': configuration.get('celery', 
'CELERY_SSL_KEY'),
-                              'certfile': configuration.get('celery', 
'CELERY_SSL_CERT'),
-                              'ca_certs': configuration.get('celery', 
'CELERY_SSL_CACERT'),
-                              'cert_reqs': ssl.CERT_REQUIRED}
-    except AirflowConfigException as e:
-        raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is 
True, please ensure CELERY_SSL_KEY, '
-                               'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
-    except Exception as e:
-        raise AirflowException('Exception: There was an unknown Celery SSL 
Error.  Please ensure you want to use '
-                               'SSL and/or have all necessary certs and key.')
+if configuration.has_option('celery', 'celery_config_options'):
+    celery_configuration = configuration.get('celery', 'celery_config_options')
+else:
+    celery_configuration = DEFAULT_CELERY_CONFIG
 
 app = Celery(
     configuration.get('celery', 'CELERY_APP_NAME'),
-    config_source=CeleryConfig)
+    config_source=celery_configuration)
 
 
 @app.task
@@ -98,8 +66,10 @@ class CeleryExecutor(BaseExecutor):
         self.tasks = {}
         self.last_state = {}
 
-    def execute_async(self, key, command, queue=DEFAULT_QUEUE):
-        self.log.info("[celery] queuing {key} through celery, 
queue={queue}".format(**locals()))
+    def execute_async(self, key, command,
+                      queue=DEFAULT_CELERY_CONFIG['task_default_queue']):
+        self.log.info( "[celery] queuing {key} through celery, "
+                       "queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index 01bf3a4..6827138 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -44,8 +44,8 @@ smtp_mail_from = [email protected]
 celery_app_name = airflow.executors.celery_executor
 celeryd_concurrency = 16
 worker_log_server_port = 8793
-broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
-celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+broker_url = amqp://guest:guest@localhost:5672/
+celery_result_backend = db+mysql://root@localhost/airflow
 flower_port = 5555
 default_queue = default
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index a97abfd..d52bd3b 100644
--- a/setup.py
+++ b/setup.py
@@ -106,7 +106,7 @@ async = [
 ]
 azure = ['azure-storage>=0.34.0']
 celery = [
-    'celery>=3.1.17',
+    'celery>=4.0.0',
     'flower>=0.7.3'
 ]
 cgroups = [

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65f3b468/tests/executors/test_celery_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
new file mode 100644
index 0000000..1c411e7
--- /dev/null
+++ b/tests/executors/test_celery_executor.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+import sys
+
+from airflow.executors.celery_executor import app
+from airflow.executors.celery_executor import CeleryExecutor
+from airflow.utils.state import State
+from celery.contrib.testing.worker import start_worker
+
+# leave this it is used by the test worker
+import celery.contrib.testing.tasks
+
+
+class CeleryExecutorTest(unittest.TestCase):
+
+    def test_celery_integration(self):
+        executor = CeleryExecutor()
+        executor.start()
+        with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+
+            success_command = 'echo 1'
+            fail_command = 'exit 1'
+
+            executor.execute_async(key='success', command=success_command)
+            # errors are propagated for some reason
+            try:
+                executor.execute_async(key='fail', command=fail_command)
+            except:
+                pass
+            executor.running['success'] = True
+            executor.running['fail'] = True
+
+            executor.end(synchronous=True)
+
+        self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
+        self.assertTrue(executor.event_buffer['fail'], State.FAILED)
+
+        self.assertNotIn('success', executor.tasks)
+        self.assertNotIn('fail', executor.tasks)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to