findingrish commented on code in PR #13365:
URL: https://github.com/apache/druid/pull/13365#discussion_r1030019478


##########
examples/bin/start-druid-main:
##########
@@ -0,0 +1,436 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import os
+import psutil
+import pathlib
+import multiprocessing
+import argparse
+
+QUICKSTART_ROOT_CONFIG_PATH = "conf/druid/single-server/quickstart"
+
+MEMORY_GIGABYTES_IDENTIFIER = "g"
+MEMORY_MEGABYTES_IDENTIFIER = "m"
+SERVICE_SEPARATOR = ","
+
+MM_TASK_JAVAOPTS_ARRAY = ["-server", 
"-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+MM_TASK_MEMORY_TYPE_LOW = "low"
+MM_TASK_MEMORY_TYPE_HIGH = "high"
+MM_TASK_MEM_MAP = {
+    MM_TASK_MEMORY_TYPE_LOW: ["-Xms256m", "-Xmx256m", 
"-XX:MaxDirectMemorySize=256g"],
+    MM_TASK_MEMORY_TYPE_HIGH: ["-Xms1g", "-Xmx1g", 
"-XX:MaxDirectMemorySize=1g"]
+}
+
+MM_TASK_JAVAOPTS_PROP = "-Ddruid.indexer.runner.javaOptsArray"
+MM_TASK_WORKER_CAPACITY_PROP = "-Ddruid.worker.capacity"
+
+BROKER = "broker"
+ROUTER = "router"
+COORDINATOR = "coordinator-overlord"
+HISTORICAL = "historical"
+MM = "middleManager"
+MM_TASK = "middleManager-task"
+MM_TASK_COUNT = "task-count"
+
+DEFAULT_SERVICES = [
+    BROKER,
+    ROUTER,
+    COORDINATOR,
+    HISTORICAL,
+    MM
+]
+
+SERVICE_MEMORY_DISTRIBUTION_WEIGHT = {
+    MM: 0.5,
+    ROUTER: 1,
+    COORDINATOR: 15,
+    BROKER: 23,
+    HISTORICAL: 40,
+    MM_TASK: 15
+}
+
+SERVICE_MEMORY_LOWER_BOUND = {
+    MM: 64,
+    ROUTER: 128,
+    MM_TASK: 1024,
+    BROKER: 900,
+    COORDINATOR: 256,
+    HISTORICAL: 900
+}
+
+SERVICE_MEMORY_HEAP_PERCENTAGE = {
+    MM: 1,
+    ROUTER: 1,
+    COORDINATOR: 1,
+    BROKER: 0.60,
+    HISTORICAL: 0.40,
+    MM_TASK: 0.50
+}
+
+LOGGING_ENABLED = False
+
+def custom_print(message):
+    if LOGGING_ENABLED:
+        print(message)
+
+def error_and_exit(message):
+    sys.stderr.write(message + '\n')
+    sys.exit(1)
+
+def configure_parser():
+    parser = argparse.ArgumentParser(
+        prog='Druid quickstart',
+        formatter_class=argparse.RawTextHelpFormatter,
+        epilog=
+"""
+sample usage:
+    start-druid
+            Start up all the services (including zk).
+            50 - 80 percent of system memory is used.
+    start-druid -m=100g
+            Start up all the services (including zk)
+            using the given memory.
+    start-druid -m=100g --compute_only
+            Compute memory distribution and validate
+            arguments.
+    start-druid -m=100g -sl=broker,router
+            Start broker & router service, using 100g of memory.
+            Read config from conf/druid/single-server/quickstart.
+    start-druid -m=100g --sl=broker,router \\
+    -cp=conf/druid/single-server/custom
+            Start broker & router service, using 100g of memory.
+            Read config from <config_path>.
+            Since <memory> is specified, exception is thrown if
+            jvm.config is present for any of the services.
+    start-druid -sl=broker,router \\
+    -cp=conf/druid/single-server/custom
+            Start broker & router service, using system memory.
+            If jvm.config is specified for both the
+            services within <config_path>/<service>,
+            memory distribution is not calculated.
+            If jvm.config is present for either of the services,
+            exception is thrown.
+            If jvm.config is not present for both of the services,
+            memory distribution is calculated.
+    start-druid -m=100g \\
+    -cp=conf/druid/single-server/custom \\
+    -sl=broker,router \\
+    --run_zk
+            Start broker, router and zookeeper.
+            zk config is read from conf/zk.
+"""
+    )
+    parser.add_argument('--memory', '-m', type=str, required=False,
+                        help='Total memory for all processes (services and 
tasks, if any). \n'
+                             'This parameter is ignored if each service 
already has a jvm.config \n'
+                             'in the given conf directory. e.g. 500m, 4g, 
6g\n')
+    parser.add_argument('--service_list', '-sl', type=str, required=False,
+                        help='List of services to be started, subset of \n'
+                             '{broker, router, middleManager, historical, 
coordinator-overlord}. \n'
+                             'If the argument is not given, all services \n'
+                             'and zookeeper is started. e.g. 
-sl=broker,historical')
+    parser.add_argument('--config_path', '-cp', type=str, required=False,
+                        help='Relative path to the directory containing common 
and service \n'
+                             'specific properties to be overridden. \n'
+                             'This directory must contain \'_common\' 
directory with \n'
+                             '\'common.jvm.config\' & 
\'common.runtime.properties\' files. \n'
+                             'If this argument is not given, config from \n'
+                             '\'conf/druid/single-server/quickstart\' 
directory is used.\n')
+    parser.add_argument('--compute_only', action='store_true',
+                        help='Validate the arguments and display memory 
distribution')
+    parser.add_argument('--run_zk',  action='store_true',
+                        help='Specification to run zookeeper, \n'
+                             'zk config is picked up from conf/zk.')
+    parser.add_argument('--verbose', action='store_true', help='Log details')
+
+    parser.set_defaults(run_zk=False)
+    parser.set_defaults(compute_only=False)
+    parser.set_defaults(verbose=False)
+
+    return parser
+
+def parse_arguments(args):
+    service_list = []
+    config_path = ""
+    total_memory = ""
+    compute_only = False
+    run_zk = False
+
+    if args.compute_only:
+        compute_only = True
+    if args.run_zk:
+        run_zk = True
+    if args.config_path is not None:
+        config_path = pathlib.Path(os.path.join(os.getcwd(), 
args.config_path)).resolve()
+        if os.path.exists(config_path) is False:
+            error_and_exit(f'config_path {config_path} doesn\'t exist')
+    if args.memory is not None:
+        total_memory = args.memory
+    if args.service_list is not None:
+        services = args.service_list.split(SERVICE_SEPARATOR)
+
+        for service in services:
+            if service not in DEFAULT_SERVICES:
+                error_and_exit(f'Invalid service name {service}, should be one 
of {DEFAULT_SERVICES}')
+
+            if service in service_list:
+                error_and_exit(f'{service} is specified multiple times')
+
+            service_list.append(service)
+
+    if len(service_list) == 0:
+        # start all services
+        service_list = DEFAULT_SERVICES
+        run_zk = True
+
+    return config_path, total_memory, service_list, run_zk, compute_only
+
+def print_startup_config(service_list, config_path, run_zk):
+    custom_print(f'starting {service_list}')
+    custom_print(f'reading config from {config_path}')
+    if run_zk:
+        zk_config_path = pathlib.Path(f'{os.getcwd()}/../conf/zk').resolve()
+        custom_print(f'starting zk, reading default config from 
{zk_config_path}')
+    custom_print('\n')
+
+def should_compute_memory(config_path, total_memory, service_list):
+    # if jvm file is present for any of the services
+    # it should be present for all services and memory should not be specified
+    # if memory is given, jvm file shouldn't be present for any service
+    jvm_config_count = 0
+    for service in service_list:
+        if pathlib.Path(f'{config_path}/{service}/jvm.config').is_file():
+            jvm_config_count += 1
+        elif jvm_config_count > 0:
+            error_and_exit(f'jvm.config file is missing for service {service}, 
jvm.config should be specified for all the services or none')
+
+    if jvm_config_count > 0 and (jvm_config_count != len(service_list) or 
total_memory != ""):
+        if jvm_config_count != len(service_list):
+            error_and_exit("jvm.config file should be present for all services 
or none")
+        if total_memory != "":
+            error_and_exit("If jvm.config is given for services, memory 
argument shouldn't be specified")
+
+    return jvm_config_count == 0
+
+def compute_system_memory():
+    system_memory = psutil.virtual_memory().total # mem in bytes
+    memory_for_druid = int(system_memory / (1024 * 1024))
+    return memory_for_druid
+
+def convert_total_memory_string(memory):
+    try:
+        if memory == "":
+            computed_memory = compute_system_memory()
+            return computed_memory
+        elif memory.endswith(MEMORY_MEGABYTES_IDENTIFIER):
+            return int(memory[:-1])
+        elif memory.endswith(MEMORY_GIGABYTES_IDENTIFIER):
+            return 1024 * int(memory[:-1])
+        else:
+            error_and_exit('Incorrect format for memory argument, expected 
format is <integer_value><m/g>')
+    except Exception:
+        error_and_exit('Incorrect format for memory argument, expected format 
is <integer_value><m/g>')
+
+def check_memory_constraint(total_memory, service_list):
+    # 80% of total memory >= sum of lower bound service memory should be
+    lower_bound_memory = 0
+
+    for service in service_list:
+        lower_bound_memory += SERVICE_MEMORY_LOWER_BOUND.get(service)
+
+    required_memory = int(lower_bound_memory / 0.8)
+
+    if total_memory < required_memory:
+        error_and_exit(f'Minimum memory required for starting services is 
{required_memory}m')
+
+    if total_memory >= 2 * lower_bound_memory:
+        return int(total_memory / 2)
+    else:
+        return lower_bound_memory
+
+def build_mm_task_javaopts_array(memory_type):
+    task_memory = f'{MM_TASK_JAVAOPTS_PROP}=['
+
+    MEM_ARRAY = MM_TASK_MEM_MAP.get(memory_type)
+
+    javaopts_list = MM_TASK_JAVAOPTS_ARRAY + MEM_ARRAY
+
+    for item in javaopts_list:
+        task_memory += f'\"{item}\";'
+
+    task_memory = task_memory[:-1]
+    task_memory += ']'
+    return task_memory
+
+def build_memory_config_string(service, allocated_memory):
+    if service == MM_TASK:
+        if allocated_memory >= 2048:
+            task_count = int(allocated_memory / 2048)
+            memory_type = MM_TASK_MEMORY_TYPE_HIGH
+            task_memory = 2048
+        else:
+            task_count = int(allocated_memory / 512)
+            memory_type = MM_TASK_MEMORY_TYPE_LOW
+            task_memory = 512
+        task_count = min(task_count, multiprocessing.cpu_count())
+
+        javaopts_array = build_mm_task_javaopts_array(memory_type)
+        return [f'{MM_TASK_WORKER_CAPACITY_PROP}={task_count}', 
javaopts_array], task_memory * task_count
+    else:
+        heap_memory = SERVICE_MEMORY_HEAP_PERCENTAGE.get(service) * 
allocated_memory
+        direct_memory = int(allocated_memory - heap_memory)
+        heap_memory = int(heap_memory)
+
+        if direct_memory == 0:
+            return f'-Xms{heap_memory}m -Xmx{heap_memory}m', allocated_memory
+
+        return f'-Xms{heap_memory}m -Xmx{heap_memory}m 
-XX:MaxDirectMemorySize={direct_memory}m', allocated_memory
+
+def distribute_memory_over_services(services, total_memory):
+    service_memory_config = {}
+
+    memory_weight_sum = 0
+
+    service_list = services.copy()
+    if MM in services:
+        service_list.append(MM_TASK)
+
+    for service in service_list:
+        memory_weight_sum += SERVICE_MEMORY_DISTRIBUTION_WEIGHT.get(service)
+
+    multiplier = total_memory / memory_weight_sum
+
+    lower_bound_memory_allocation = 0
+    allocated_services = set()
+
+    for service in service_list:
+        allocated_memory = SERVICE_MEMORY_DISTRIBUTION_WEIGHT.get(service) * 
multiplier
+        if service in SERVICE_MEMORY_LOWER_BOUND and allocated_memory < 
SERVICE_MEMORY_LOWER_BOUND.get(service):
+            allocated_memory = SERVICE_MEMORY_LOWER_BOUND.get(service)
+            service_memory_config[service], allocated_memory = 
build_memory_config_string(service, allocated_memory)
+            lower_bound_memory_allocation += allocated_memory
+            allocated_services.add(service)
+
+    if lower_bound_memory_allocation > 0:
+        # compute the multiplier again for remaining services
+        memory_weight_sum = 0
+        for service in service_list:
+            if service in allocated_services:
+                continue
+            memory_weight_sum += 
SERVICE_MEMORY_DISTRIBUTION_WEIGHT.get(service)
+        multiplier = (total_memory - lower_bound_memory_allocation) / 
memory_weight_sum

Review Comment:
   No, there is already a check in `check_memory_constraint` method to ensure 
that the given memory is greater 
   than sum of lower bound memory for the services
   
   ```
   def check_memory_constraint(total_memory, service_list):
       # 80% of total memory >= sum of lower bound service memory
       lower_bound_memory = 0
   
       for service in service_list:
           lower_bound_memory += MINIMUM_MEMORY_MB.get(service)
   
       required_memory = int(lower_bound_memory / 0.8)
   
       if total_memory < required_memory:
           raise ValueError(f'Minimum memory required for starting services is 
{required_memory}m')
   
       if total_memory >= 2 * lower_bound_memory:
           return int(total_memory / 2)
       else:
           return lower_bound_memory
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to