This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f42722e627 Set monotonically increasing worker capacity in
start-druid-main (#13581)
f42722e627 is described below
commit f42722e62725c3829c295fa815c6e48aac1e0600
Author: Rishabh Singh <[email protected]>
AuthorDate: Fri Dec 16 15:34:30 2022 +0530
Set monotonically increasing worker capacity in start-druid-main (#13581)
This commit updates the task memory allocation logic.
- min task count is 2 and max task count is number of cpus on the machine
- task count increases wrt total task memory
- task memory increases from 512m to 2g
---
examples/bin/start-druid-main.py | 46 +++++++++++++++++-----------------------
1 file changed, 19 insertions(+), 27 deletions(-)
diff --git a/examples/bin/start-druid-main.py b/examples/bin/start-druid-main.py
index 5b7e293ccc..1bbcc3542d 100644
--- a/examples/bin/start-druid-main.py
+++ b/examples/bin/start-druid-main.py
@@ -36,14 +36,6 @@ TASK_JAVA_OPTS_ARRAY = ["-server", "-Duser.timezone=UTC",
"-Dfile.encoding=UTF-8
TASK_JAVA_OPTS_PROPERTY = "druid.indexer.runner.javaOptsArray"
TASK_WORKER_CAPACITY_PROPERTY = "druid.worker.capacity"
TASK_COUNT = "task-count"
-TASK_MEM_TYPE_LOW = "low"
-TASK_MEM_TYPE_MEDIUM = "medium"
-TASK_MEM_TYPE_HIGH = "high"
-TASK_MEM_MAP = {
- TASK_MEM_TYPE_LOW: ["-Xms256m", "-Xmx256m",
"-XX:MaxDirectMemorySize=256m"],
- TASK_MEM_TYPE_MEDIUM: ["-Xms512m", "-Xmx512m",
"-XX:MaxDirectMemorySize=512m"],
- TASK_MEM_TYPE_HIGH: ["-Xms1g", "-Xmx1g", "-XX:MaxDirectMemorySize=1g"]
-}
BROKER = "broker"
ROUTER = "router"
@@ -436,43 +428,43 @@ def check_memory_constraint(total_memory, services):
return int(total_memory * 0.8)
-def build_mm_task_java_opts_array(memory_type):
- task_memory = '-D{0}=['.format(TASK_JAVA_OPTS_PROPERTY)
-
- mem_array = TASK_MEM_MAP.get(memory_type)
+def build_mm_task_java_opts_array(task_memory):
+ memory = int(task_memory / 2)
+ mem_array = ["-Xms{0}m".format(memory), "-Xmx{0}m".format(memory),
"-XX:MaxDirectMemorySize={0}m".format(memory)]
java_opts_list = TASK_JAVA_OPTS_ARRAY + mem_array
+ task_java_opts_value = ''
+
for item in java_opts_list:
- task_memory += '\"{0}\",'.format(item)
+ task_java_opts_value += '\"{0}\",'.format(item)
- task_memory = task_memory[:-1]
- task_memory += ']'
- return task_memory
+ task_java_opts_value = task_java_opts_value[:-1]
+ task_memory_config = '-D{0}=[{1}]'.format(TASK_JAVA_OPTS_PROPERTY,
task_java_opts_value)
+
+ return task_memory_config
def compute_tasks_memory(allocated_memory):
- if allocated_memory >= 4096:
- task_count = int(allocated_memory / 2048)
- memory_type = TASK_MEM_TYPE_HIGH
- task_memory_mb = 2048
+ cpu_count = multiprocessing.cpu_count()
+
+ if allocated_memory >= cpu_count * 1024:
+ task_count = cpu_count
+ task_memory_mb = min(2048, int(allocated_memory / cpu_count))
elif allocated_memory >= 2048:
task_count = int(allocated_memory / 1024)
- memory_type = TASK_MEM_TYPE_MEDIUM
task_memory_mb = 1024
else:
task_count = 2
- memory_type = TASK_MEM_TYPE_LOW
- task_memory_mb = 512
- task_count = min(task_count, multiprocessing.cpu_count())
+ task_memory_mb = int(allocated_memory / task_count)
- return memory_type, task_count, task_memory_mb
+ return task_count, task_memory_mb
def build_memory_config(service, allocated_memory):
if service == TASKS:
- memory_type, task_count, task_memory =
compute_tasks_memory(allocated_memory)
- java_opts_array = build_mm_task_java_opts_array(memory_type)
+ task_count, task_memory = compute_tasks_memory(allocated_memory)
+ java_opts_array = build_mm_task_java_opts_array(task_memory)
return ['-D{0}={1}'.format(TASK_WORKER_CAPACITY_PROPERTY, task_count),
java_opts_array], task_memory * task_count
elif service == INDEXER:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]