Repository: incubator-airflow Updated Branches: refs/heads/master e92d6bf72 -> d02e8eb9d
[AIRFLOW-1265] Fix celery executor parsing CELERY_SSL_ACTIVE Changed retrieval of celery/celery_ssl_active to use configuration.getboolean() Add correct except block and log warning if celery/celery_ssl_active key is left undefined Closes #2341 from holygits/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d02e8eb9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d02e8eb9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d02e8eb9 Branch: refs/heads/master Commit: d02e8eb9dbcfd78a0819177fe65189e1eed45673 Parents: e92d6bf Author: Jordan <[email protected]> Authored: Thu Jun 29 16:40:01 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Thu Jun 29 16:40:01 2017 -0700 ---------------------------------------------------------------------- airflow/executors/celery_executor.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d02e8eb9/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index d7f74c6..17c343b 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -22,7 +22,7 @@ import traceback from celery import Celery from celery import states as celery_states -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowConfigException, AirflowException from airflow.executors.base_executor import BaseExecutor from airflow import configuration @@ -48,18 +48,25 @@ class CeleryConfig(object): CELERYD_CONCURRENCY = configuration.getint('celery', 'CELERYD_CONCURRENCY') CELERY_DEFAULT_QUEUE = DEFAULT_QUEUE CELERY_DEFAULT_EXCHANGE = DEFAULT_QUEUE - if configuration.getboolean('celery', 'CELERY_SSL_ACTIVE'): - try: + + celery_ssl_active = False + try: + celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE') + except AirflowConfigException as e: + logging.warning("Celery Executor will run without SSL") + + try: + if celery_ssl_active: BROKER_USE_SSL = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'), 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'), 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'), 'cert_reqs': ssl.CERT_REQUIRED} - except ValueError: - raise AirflowException('ValueError: CELERY_SSL_ACTIVE is True, please ensure CELERY_SSL_KEY, ' - 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') - except Exception as e: - raise AirflowException('Exception: There was an unknown Celery SSL Error. Please ensure you want to use ' - 'SSL and/or have all necessary certs and key.') + except AirflowConfigException as e: + raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, please ensure CELERY_SSL_KEY, ' + 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') + except Exception as e: + raise AirflowException('Exception: There was an unknown Celery SSL Error. Please ensure you want to use ' + 'SSL and/or have all necessary certs and key.') app = Celery( configuration.get('celery', 'CELERY_APP_NAME'),
