This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0ce5349610b48f2d24c33be1877fabd253564e3e Author: ORuteMa <[email protected]> AuthorDate: Fri May 5 19:55:38 2023 +0800 support requirepass redis sentinel (#30352) (cherry picked from commit 2c270db714b7693a624ce70d178744ccc5f9e73e) --- airflow/config_templates/config.yml | 14 +++++++++++++- airflow/config_templates/default_airflow.cfg | 11 ++++++++++- airflow/config_templates/default_celery.py | 16 +++++++++++++--- docs/apache-airflow/core-concepts/executor/celery.rst | 3 ++- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a65012c7fa..1c08f7388d 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2083,11 +2083,23 @@ celery_broker_transport_options: ETA you're planning to use. visibility_timeout is only supported for Redis and SQS celery brokers. See: - http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout version_added: ~ type: string example: "21600" default: ~ + sentinel_kwargs: + description: | + The sentinel_kwargs parameter allows passing additional options to the Sentinel client. + In a typical scenario where Redis Sentinel is used as the broker and Redis servers are + password-protected, the password needs to be passed through this parameter. Although its + type is string, it is required to pass a string that conforms to the dictionary format. + See: + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration + version_added: 2.7.0 + type: string + example: '{"password": "password_for_redis_server"}' + default: ~ dask: description: | This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 684a78992d..26d8cb99b0 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1055,10 +1055,19 @@ worker_precheck = False # ETA you're planning to use. # visibility_timeout is only supported for Redis and SQS celery brokers. # See: -# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout # Example: visibility_timeout = 21600 # visibility_timeout = +# The sentinel_kwargs parameter allows passing additional options to the Sentinel client. +# In a typical scenario where Redis Sentinel is used as the broker and Redis servers are +# password-protected, the password needs to be passed through this parameter. Although its +# type is string, it is required to pass a string that conforms to the dictionary format. +# See: +# https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration +# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}} +# sentinel_kwargs = + [dask] # This section only applies if you are using the DaskExecutor in diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index d3d5a4adf1..9ec320097e 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -26,7 +26,7 @@ from airflow.exceptions import AirflowConfigException, AirflowException def _broker_supports_visibility_timeout(url): - return url.startswith("redis://") or url.startswith("sqs://") + return url.startswith("redis://") or url.startswith("sqs://") or url.startswith("sentinel://") log = logging.getLogger(__name__) @@ -38,6 +38,16 @@ if "visibility_timeout" not in broker_transport_options: if _broker_supports_visibility_timeout(broker_url): broker_transport_options["visibility_timeout"] = 21600 +broker_transport_options_for_celery: dict = dict.copy(broker_transport_options) +if "sentinel_kwargs" in broker_transport_options: + try: + sentinel_kwargs = conf.getjson("celery_broker_transport_options", "sentinel_kwargs") + if not isinstance(sentinel_kwargs, dict): + raise ValueError + broker_transport_options_for_celery["sentinel_kwargs"] = sentinel_kwargs + except Exception: + raise AirflowException("sentinel_kwargs should be written in the correct dictionary format.") + if conf.has_option("celery", "RESULT_BACKEND"): result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND") else: @@ -53,7 +63,7 @@ DEFAULT_CELERY_CONFIG = { "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"), "task_track_started": conf.getboolean("celery", "task_track_started"), "broker_url": broker_url, - "broker_transport_options": broker_transport_options, + "broker_transport_options": broker_transport_options_for_celery, "result_backend": result_backend, "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control"), @@ -74,7 +84,7 @@ try: "ca_certs": conf.get("celery", "SSL_CACERT"), "cert_reqs": ssl.CERT_REQUIRED, } - elif broker_url and "redis://" in broker_url: + elif broker_url and ("redis://" in broker_url or "sentinel://" in broker_url): broker_use_ssl = { "ssl_keyfile": conf.get("celery", "SSL_KEY"), "ssl_certfile": conf.get("celery", "SSL_CERT"), diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst b/docs/apache-airflow/core-concepts/executor/celery.rst index e16cc7e3f8..ca137fa8e3 100644 --- a/docs/apache-airflow/core-concepts/executor/celery.rst +++ b/docs/apache-airflow/core-concepts/executor/celery.rst @@ -22,7 +22,7 @@ Celery Executor =============== ``CeleryExecutor`` is one of the ways you can scale out the number of workers. For this -to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and +to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis Sentinel** ...) and change your ``airflow.cfg`` to point the executor parameter to ``CeleryExecutor`` and provide the related Celery settings. @@ -83,6 +83,7 @@ Some caveats: - Make sure to use a database backed result backend - Make sure to set a visibility timeout in ``[celery_broker_transport_options]`` that exceeds the ETA of your longest running task +- Make sure to specify the password for Redis Server in the ``[celery_broker_transport_options]`` section if you are using Redis Sentinel as your broker and the Redis servers are password-protected - Make sure to set umask in ``[worker_umask]`` to set permissions for newly created files by workers. - Tasks can consume resources. Make sure your worker has enough resources to run ``worker_concurrency`` tasks - Queue names are limited to 256 characters, but each broker backend might have its own restrictions
