This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new db1b51d  Make celery worker_prefetch_multiplier configurable (#8695)
db1b51d is described below

commit db1b51df54af76245a0e93d0b2229a96efc7d3d9
Author: Ben Nadler <[email protected]>
AuthorDate: Sat May 9 10:46:13 2020 -0700

    Make celery worker_prefetch_multiplier configurable (#8695)
---
 airflow/config_templates/config.yml          | 13 +++++++++++++
 airflow/config_templates/default_airflow.cfg | 10 ++++++++++
 airflow/config_templates/default_celery.py   |  2 +-
 3 files changed, 24 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 1fb204a..0543cbc 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1130,6 +1130,19 @@
       type: string
       example: 16,12
       default: ~
+    - name: worker_prefetch_multiplier
+      description: |
+        Used to increase the number of tasks that a worker prefetches which 
can improve performance.
+        The number of processes multiplied by worker_prefetch_multiplier is 
the number of tasks
+        that are prefetched by a worker.  A value greater than 1 can result in 
tasks being unnecessarily
+        blocked if there are multiple workers and one worker prefetches tasks 
that sit behind long
+        running tasks while another worker has unutilized processes that are 
unable to process the already
+        claimed blocked tasks.
+        
https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+      version_added: ~
+      type: int
+      example: "1"
+      default: ~
     - name: worker_log_server_port
       description: |
         When you start an airflow worker, airflow starts a tiny web server
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index f133e1c..ff82613 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -550,6 +550,16 @@ worker_concurrency = 8
 # Example: worker_autoscale = 16,12
 # worker_autoscale =
 
+# Used to increase the number of tasks that a worker prefetches which can 
improve performance.
+# The number of processes multiplied by worker_prefetch_multiplier is the 
number of tasks
+# that are prefetched by a worker.  A value greater than 1 can result in tasks 
being unnecessarily
+# blocked if there are multiple workers and one worker prefetches tasks that 
sit behind long
+# running tasks while another worker has unutilized processes that are unable 
to process the already
+# claimed blocked tasks.
+# 
https://docs.celeryproject.org/en/stable/userguide/optimizing.html#prefetch-limits
+# Example: worker_prefetch_multiplier = 1
+# worker_prefetch_multiplier =
+
 # When you start an airflow worker, airflow starts a tiny web server
 # subprocess to serve the workers local log files to the airflow main
 # web server, who then builds pages and sends them to users. This defines
diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
index 1b88b31..bf0b03b 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -39,7 +39,7 @@ if 'visibility_timeout' not in broker_transport_options:
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json'],
     'event_serializer': 'json',
-    'worker_prefetch_multiplier': 1,
+    'worker_prefetch_multiplier': conf.getint('celery', 
'worker_prefetch_multiplier', fallback=1),
     'task_acks_late': True,
     'task_default_queue': conf.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': conf.get('celery', 'DEFAULT_QUEUE'),

Reply via email to