This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0ce5349610b48f2d24c33be1877fabd253564e3e
Author: ORuteMa <[email protected]>
AuthorDate: Fri May 5 19:55:38 2023 +0800

    support requirepass redis sentinel (#30352)
    
    (cherry picked from commit 2c270db714b7693a624ce70d178744ccc5f9e73e)
---
 airflow/config_templates/config.yml                   | 14 +++++++++++++-
 airflow/config_templates/default_airflow.cfg          | 11 ++++++++++-
 airflow/config_templates/default_celery.py            | 16 +++++++++++++---
 docs/apache-airflow/core-concepts/executor/celery.rst |  3 ++-
 4 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index a65012c7fa..1c08f7388d 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2083,11 +2083,23 @@ celery_broker_transport_options:
         ETA you're planning to use.
         visibility_timeout is only supported for Redis and SQS celery brokers.
         See:
-        
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
+        
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
       version_added: ~
       type: string
       example: "21600"
       default: ~
+    sentinel_kwargs:
+      description: |
+        The sentinel_kwargs parameter allows passing additional options to the 
Sentinel client.
+        In a typical scenario where Redis Sentinel is used as the broker and 
Redis servers are
+        password-protected, the password needs to be passed through this 
parameter. Although its
+        type is string, it is required to pass a string that conforms to the 
dictionary format.
+        See:
+        
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
+      version_added: 2.7.0
+      type: string
+      example: '{"password": "password_for_redis_server"}'
+      default: ~
 dask:
   description: |
     This section only applies if you are using the DaskExecutor in
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 684a78992d..26d8cb99b0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1055,10 +1055,19 @@ worker_precheck = False
 # ETA you're planning to use.
 # visibility_timeout is only supported for Redis and SQS celery brokers.
 # See:
-# 
http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options
+# 
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
 # Example: visibility_timeout = 21600
 # visibility_timeout =
 
+# The sentinel_kwargs parameter allows passing additional options to the 
Sentinel client.
+# In a typical scenario where Redis Sentinel is used as the broker and Redis 
servers are
+# password-protected, the password needs to be passed through this parameter. 
Although its
+# type is string, it is required to pass a string that conforms to the 
dictionary format.
+# See:
+# 
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
+# Example: sentinel_kwargs = {{"password": "password_for_redis_server"}}
+# sentinel_kwargs =
+
 [dask]
 
 # This section only applies if you are using the DaskExecutor in
diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
index d3d5a4adf1..9ec320097e 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -26,7 +26,7 @@ from airflow.exceptions import AirflowConfigException, 
AirflowException
 
 
 def _broker_supports_visibility_timeout(url):
-    return url.startswith("redis://") or url.startswith("sqs://")
+    return url.startswith("redis://") or url.startswith("sqs://") or 
url.startswith("sentinel://")
 
 
 log = logging.getLogger(__name__)
@@ -38,6 +38,16 @@ if "visibility_timeout" not in broker_transport_options:
     if _broker_supports_visibility_timeout(broker_url):
         broker_transport_options["visibility_timeout"] = 21600
 
+broker_transport_options_for_celery: dict = dict.copy(broker_transport_options)
+if "sentinel_kwargs" in broker_transport_options:
+    try:
+        sentinel_kwargs = conf.getjson("celery_broker_transport_options", 
"sentinel_kwargs")
+        if not isinstance(sentinel_kwargs, dict):
+            raise ValueError
+        broker_transport_options_for_celery["sentinel_kwargs"] = 
sentinel_kwargs
+    except Exception:
+        raise AirflowException("sentinel_kwargs should be written in the 
correct dictionary format.")
+
 if conf.has_option("celery", "RESULT_BACKEND"):
     result_backend = conf.get_mandatory_value("celery", "RESULT_BACKEND")
 else:
@@ -53,7 +63,7 @@ DEFAULT_CELERY_CONFIG = {
     "task_default_exchange": conf.get("operators", "DEFAULT_QUEUE"),
     "task_track_started": conf.getboolean("celery", "task_track_started"),
     "broker_url": broker_url,
-    "broker_transport_options": broker_transport_options,
+    "broker_transport_options": broker_transport_options_for_celery,
     "result_backend": result_backend,
     "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY"),
     "worker_enable_remote_control": conf.getboolean("celery", 
"worker_enable_remote_control"),
@@ -74,7 +84,7 @@ try:
                 "ca_certs": conf.get("celery", "SSL_CACERT"),
                 "cert_reqs": ssl.CERT_REQUIRED,
             }
-        elif broker_url and "redis://" in broker_url:
+        elif broker_url and ("redis://" in broker_url or "sentinel://" in 
broker_url):
             broker_use_ssl = {
                 "ssl_keyfile": conf.get("celery", "SSL_KEY"),
                 "ssl_certfile": conf.get("celery", "SSL_CERT"),
diff --git a/docs/apache-airflow/core-concepts/executor/celery.rst 
b/docs/apache-airflow/core-concepts/executor/celery.rst
index e16cc7e3f8..ca137fa8e3 100644
--- a/docs/apache-airflow/core-concepts/executor/celery.rst
+++ b/docs/apache-airflow/core-concepts/executor/celery.rst
@@ -22,7 +22,7 @@ Celery Executor
 ===============
 
 ``CeleryExecutor`` is one of the ways you can scale out the number of workers. 
For this
-to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and
+to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, **Redis 
Sentinel** ...) and
 change your ``airflow.cfg`` to point the executor parameter to
 ``CeleryExecutor`` and provide the related Celery settings.
 
@@ -83,6 +83,7 @@ Some caveats:
 
 - Make sure to use a database backed result backend
 - Make sure to set a visibility timeout in 
``[celery_broker_transport_options]`` that exceeds the ETA of your longest 
running task
+- Make sure to specify the password for Redis Server in the 
``[celery_broker_transport_options]`` section if you are using Redis Sentinel 
as your broker and the Redis servers are password-protected
 - Make sure to set umask in ``[worker_umask]`` to set permissions for newly 
created files by workers.
 - Tasks can consume resources. Make sure your worker has enough resources to 
run ``worker_concurrency`` tasks
 - Queue names are limited to 256 characters, but each broker backend might 
have its own restrictions

Reply via email to