This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2c270db714 support requirepass redis sentinel (#30352)
2c270db714 is described below
commit 2c270db714b7693a624ce70d178744ccc5f9e73e
Author: ORuteMa <[email protected]>
AuthorDate: Fri May 5 19:55:38 2023 +0800
support requirepass redis sentinel (#30352)
---
airflow/config_templates/config.yml | 14 +++++++++++++-
airflow/config_templates/default_airflow.cfg | 11 ++++++++++-
airflow/config_templates/default_celery.py | 16 +++++++++++++---
docs/apache-airflow/core-concepts/executor/celery.rst | 3 ++-
4 files changed, 38 insertions(+), 6 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 8b038e2274..4925da3b43 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2146,11 +2146,23 @@ celery_broker_transport_options:
ETA you're planning to use.
visibility_timeout is only supported for Redis and SQS celery brokers.
See:
-
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
+
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
version_added: ~
type: string
example: "21600"
default: ~
+ sentinel_kwargs:
+ description: |
+ The sentinel_kwargs parameter allows passing additional options to the
Sentinel client.
+ In a typical scenario where Redis Sentinel is used as the broker and
Redis servers are
+ password-protected, the password needs to be passed through this
parameter. Although its
+ type is string, it is required to pass a string that conforms to the
dictionary format.
+ See:
+
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
+ version_added: 2.7.0
+ type: string
+ example: '{"password": "password_for_redis_server"}'
+ default: ~
dask:
description: |
This section only applies if you are using the DaskExecutor in
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 582b10f09c..e8a471fe45 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1105,10 +1105,19 @@ worker_precheck = False
# ETA you're planning to use.
# visibility_timeout is only supported for Redis and SQS celery brokers.
# See:
-#
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
+#
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
# Example: visibility_timeout = 21600
# visibility_timeout =
+# The sentinel_kwargs parameter allows passing additional options to the
Sentinel client.
+# In a typical scenario where Redis Sentinel is used as the broker and Redis
servers are
+# password-protected, the password needs to be passed through this parameter.
Although its
+# type is string, it is required to pass a string that conforms to the
dictionary format.
+# See:
+#
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
+# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}}
+# sentinel_kwargs =
+
[dask]
# This section only applies if you are using the DaskExecutor in
diff --git a/airflow/config_templates/default_celery.py
b/airflow/config_templates/default_celery.py
index d3d5a4adf1..9ec320097e 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -26,7 +26,7 @@ from airflow.exceptions import AirflowConfigException,
AirflowException
def _broker_supports_visibility_timeout(url):
- return url.startswith("redis://") or url.startswith("sqs://")
+ return url.startswith("redis://") or url.startswith("sqs://") or
url.startswith("sentinel://")
log = logging.getLogger(__name__)
@@ -38,6 +38,16 @@ if "visibility_timeout" not in broker_transport_options:
if _broker_supports_visibility_timeout(broker_url):
broker_transport_options["visibility_timeout"] = 21600
+broker_transport_options_for_celery: dict = dict.copy(broker_transport_options)
+if "sentinel_kwargs" in broker_transport_options:
+ try:
+ sentinel_kwargs = conf.getjson("celery_broker_transport_options",
"sentinel_kwargs")
+ if not isinstance(sentinel_kwargs, dict):
+ raise ValueError
+ broker_transport_options_for_celery["sentinel_kwargs"] =
sentinel_kwargs
+ except Exception:
+ raise AirflowException("sentinel_kwargs should be written in the
correct dictionary format.")
+
if conf.has_option("celery", "RESULT_BACKEND"):
result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND")
else:
@@ -53,7 +63,7 @@ DEFAULT_CELERY_CONFIG = {
"task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
"task_track_started": conf.getboolean("celery", "task_track_started"),
"broker_url": broker_url,
- "broker_transport_options": broker_transport_options,
+ "broker_transport_options": broker_transport_options_for_celery,
"result_backend": result_backend,
"worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"),
"worker_enable_remote_control": conf.getboolean("celery",
"worker_enable_remote_control"),
@@ -74,7 +84,7 @@ try:
"ca_certs": conf.get("celery", "SSL_CACERT"),
"cert_reqs": ssl.CERT_REQUIRED,
}
- elif broker_url and "redis://" in broker_url:
+ elif broker_url and ("redis://" in broker_url or "sentinel://" in
broker_url):
broker_use_ssl = {
"ssl_keyfile": conf.get("celery", "SSL_KEY"),
"ssl_certfile": conf.get("celery", "SSL_CERT"),
diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst
b/docs/apache-airflow/core-concepts/executor/celery.rst
index e16cc7e3f8..ca137fa8e3 100644
--- a/docs/apache-airflow/core-concepts/executor/celery.rst
+++ b/docs/apache-airflow/core-concepts/executor/celery.rst
@@ -22,7 +22,7 @@ Celery Executor
===============
``CeleryExecutor`` is one of the ways you can scale out the number of workers.
For this
-to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and
+to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis
Sentinel** ...) and
change your ``airflow.cfg`` to point the executor parameter to
``CeleryExecutor`` and provide the related Celery settings.
@@ -83,6 +83,7 @@ Some caveats:
- Make sure to use a database backed result backend
- Make sure to set a visibility timeout in
``[celery_broker_transport_options]`` that exceeds the ETA of your longest
running task
+- Make sure to specify the password for Redis Server in the
``[celery_broker_transport_options]`` section if you are using Redis Sentinel
as your broker and the Redis servers are password-protected
- Make sure to set umask in ``[worker_umask]`` to set permissions for newly
created files by workers.
- Tasks can consume resources. Make sure your worker has enough resources to
run ``worker_concurrency`` tasks
- Queue names are limited to 256 characters, but each broker backend might
have its own restrictions