findingrish commented on code in PR #13365: URL: https://github.com/apache/druid/pull/13365#discussion_r1034310172
########## examples/bin/start-druid: ########## @@ -0,0 +1,545 @@ +#!/usr/bin/env python3 + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import sys +import os +import psutil +import pathlib +import multiprocessing +import argparse + +QUICKSTART_ROOT_CONFIG_PATH = "conf/druid/single-server/quickstart" + +MEM_GB_SUFFIX = "g" +MEM_MB_SUFFIX = "m" +XMX_PARAMETER = "-Xmx" +XMS_PARAMETER = "-Xms" +DIRECT_MEM_PARAMETER = "-XX:MaxDirectMemorySize" +SERVICE_SEPARATOR = "," + +TASK_JAVA_OPTS_ARRAY = ["-server", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", "-XX:+ExitOnOutOfMemoryError", + "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"] +TASK_JAVA_OPTS_PROPERTY = "druid.indexer.runner.javaOptsArray" +TASK_WORKER_CAPACITY_PROPERTY = "druid.worker.capacity" +TASK_COUNT = "task-count" +TASK_MEM_TYPE_LOW = "low" +TASK_MEM_TYPE_HIGH = "high" +TASK_MEM_MAP = { + TASK_MEM_TYPE_LOW: ["-Xms256m", "-Xmx256m", "-XX:MaxDirectMemorySize=256g"], + TASK_MEM_TYPE_HIGH: ["-Xms1g", "-Xmx1g", "-XX:MaxDirectMemorySize=1g"] +} + +BROKER = "broker" +ROUTER = "router" +COORDINATOR = "coordinator-overlord" +HISTORICAL = "historical" +MIDDLE_MANAGER = "middleManager" +TASKS = "tasks" + +DEFAULT_SERVICES = [ + BROKER, + ROUTER, + COORDINATOR, + HISTORICAL, + MIDDLE_MANAGER +] + +SERVICE_MEMORY_RATIO = { + MIDDLE_MANAGER: 1, + ROUTER: 2, + COORDINATOR: 30, + BROKER: 46, + HISTORICAL: 80, + TASKS: 30 +} + +MINIMUM_MEMORY_MB = { + MIDDLE_MANAGER: 64, + ROUTER: 128, + TASKS: 1024, + BROKER: 900, + COORDINATOR: 256, + HISTORICAL: 900 +} + +HEAP_TO_TOTAL_MEM_RATIO = { + MIDDLE_MANAGER: 1, + ROUTER: 1, + COORDINATOR: 1, + BROKER: 0.60, + HISTORICAL: 0.40, + TASKS: 0.50 +} + +LOGGING_ENABLED = False + + +def print_if_verbose(message): + if LOGGING_ENABLED: + print(message) + + +def configure_parser(): + parser = argparse.ArgumentParser( + prog='Druid quickstart', + formatter_class=argparse.RawTextHelpFormatter, + epilog= + """ +sample usage: + start-druid + Start up all the services (including zk). + start-druid -m=100g + Start up all the services (including zk) + using a total memory of 100GB. + start-druid -m=100g --compute + Compute memory distribution and validate arguments. + start-druid -m=100g -s=broker,router + Starts a broker and a router, using a total memory of 100GB. + start-druid -m=100g --s=broker,router \\ + -c=conf/druid/single-server/custom + Starts a broker and a router, using a total memory of 100GB. + Reads configs for each service (jvm.config, runtime.properties) + from respective folders inside the given root config path. + start-druid -s=broker,router \\ + -c=conf/druid/single-server/custom + Starts a broker and a router service, reading service configs + from the given root directory. Calculates memory requirements for + each service, if required, using upto 80% of the total system memory. + start-druid -m=100g \\ + -s=broker,router \\ + -c=conf/druid/single-server/custom \\ + --zk + Starts broker, router and zookeeper. + zookeeper config is read from conf/zk. +""" + ) + parser.add_argument('--memory', '-m', type=str, required=False, + help='Total memory for all processes (services and tasks, if any). \n' + 'This parameter is ignored if each service already has a jvm.config \n' + 'in the given conf directory. e.g. 500m, 4g, 6g\n') + parser.add_argument('--services', '-s', type=str, required=False, + help='List of services to be started, subset of \n' + '{broker, router, middleManager, historical, coordinator-overlord}. \n' + 'If the argument is not given, all services \n' + 'and zookeeper is started. e.g. -sl=broker,historical') + parser.add_argument('--config', '-c', type=str, required=False, + help='Relative path to the directory containing common and service \n' + 'specific properties to be overridden. \n' + 'This directory must contain \'_common\' directory with \n' + '\'common.jvm.config\' & \'common.runtime.properties\' files. \n' + 'If this argument is not given, config from \n' + 'conf/druid/single-server/quickstart directory is used.\n') + parser.add_argument('--compute', action='store_true', + help='Does not start Druid, only displays the memory allocated \n' + 'to each service if started with the given total memory.\n') + parser.add_argument('--zk', '-zk', action='store_true', + help='Specification to run zookeeper, \n' + 'zk config is picked up from conf/zk.') + parser.add_argument('--verbose', action='store_true', help='Log details') + + parser.set_defaults(zk=False) + parser.set_defaults(compute=False) + parser.set_defaults(verbose=False) + + return parser + + +def validate_common_jvm_args(config): + if pathlib.Path(f'{config}/_common/common.jvm.config').is_file() is False: + raise ValueError(f'_common/common.jvm.config file is missing in the root config, ' + f'check {QUICKSTART_ROOT_CONFIG_PATH}/_common directory') + + +def validate_common_directory(config): + if pathlib.Path(f'{config}/_common').is_dir() is False: + raise ValueError( + f'_common directory is missing in the root config, check {QUICKSTART_ROOT_CONFIG_PATH}/_common directory') + + if pathlib.Path(f'{config}/_common/common.runtime.properties').is_file() is False: + raise ValueError(f'_common/common.runtime.properties file is missing in the root config, ' + f'check {QUICKSTART_ROOT_CONFIG_PATH}/_common directory') + + +def parse_arguments(args): + service_list = [] + config = "" + total_memory = "" + compute = False + zk = False + + if args.compute: + compute = True + if args.zk: + zk = True + if args.config is not None: + config = pathlib.Path(os.path.join(os.getcwd(), args.config)).resolve() + if os.path.exists(config) is False: + raise ValueError(f'config {config} not found') + if args.memory is not None: + total_memory = args.memory + if args.services is not None: + services = args.services.split(SERVICE_SEPARATOR) + + for service in services: + if service not in DEFAULT_SERVICES: + raise ValueError(f'Invalid service name {service}, should be one of {DEFAULT_SERVICES}') + + if service in service_list: + raise ValueError(f'{service} is specified multiple times') + + service_list.append(service) + + if len(service_list) == 0: + # start all services + service_list = DEFAULT_SERVICES + zk = True + + return config, total_memory, service_list, zk, compute + + +def print_startup_config(service_list, config, zk): + print_if_verbose(f'Starting {service_list}') + print_if_verbose(f'Reading config from {config}') + if zk: + zk_config = pathlib.Path(f'{os.getcwd()}/../conf/zk').resolve() + print_if_verbose(f'Starting zk, reading default config from {zk_config}') + print_if_verbose('\n') + + +def middle_manager_task_memory_params_present(config): + java_opts_property_present = False + worker_capacity_property_present = False + + if pathlib.Path(f'{config}/middleManager/runtime.properties').is_file(): + with open(f'{config}/middleManager/runtime.properties') as file: + for line in file: + if line.startswith(TASK_JAVA_OPTS_PROPERTY): + java_opts_property_present = True + elif line.startswith(TASK_WORKER_CAPACITY_PROPERTY): + worker_capacity_property_present = True + + return java_opts_property_present, worker_capacity_property_present + + +def verify_service_config(service, config): + path = f'{config}/{service}/jvm.config' + + required_parameters = [XMX_PARAMETER, XMS_PARAMETER] + + if HEAP_TO_TOTAL_MEM_RATIO.get(service) != 1: + required_parameters.append(DIRECT_MEM_PARAMETER) + + with open(path) as file: + for line in file: + if line.startswith(XMX_PARAMETER) and XMX_PARAMETER in required_parameters: + required_parameters.remove(XMX_PARAMETER) + if line.startswith(XMS_PARAMETER) and XMS_PARAMETER in required_parameters: + required_parameters.remove(XMS_PARAMETER) + if line.startswith(DIRECT_MEM_PARAMETER) and DIRECT_MEM_PARAMETER in required_parameters: + required_parameters.remove(DIRECT_MEM_PARAMETER) + + if len(required_parameters) > 0: + params = ",".join(required_parameters) + raise ValueError(f'{params} missing in {service}/jvm.config') + + if service == MIDDLE_MANAGER: + if pathlib.Path(f'{config}/{service}/runtime.properties').is_file() is False: + raise ValueError(f'{service}/runtime.properties file is missing in the root config') + + mm_task_java_opts_property, mm_task_worker_capacity_prop = middle_manager_task_memory_params_present(config) + + if mm_task_java_opts_property is False: + raise ValueError(f'{TASK_JAVA_OPTS_PROPERTY} property missing in {service}/runtime.properties') + + +def should_compute_memory(config, total_memory, service_list): + """ + if memory argument is given, memory for services and tasks is computed, jvm.config file + or runtime.properties with task memory specification shouldn't be present + Alternatively, all memory related parameters are specified + which implies following should be present: + jvm.config file for all services with -Xmx=***, Xms=*** parameters + -XX:MaxDirectMemorySize=** in jvm.config for broker and historical + druid.indexer.runner.javaOptsArray (optionally druid.worker.capacity) in + rootDirectory/middleManager/runtime.properties + """ + + jvm_config_count = 0 + for service in service_list: + if pathlib.Path(f'{config}/{service}/jvm.config').is_file(): + jvm_config_count += 1 + + mm_task_property_present = False + if MIDDLE_MANAGER in service_list: + mm_task_java_opts_property, mm_task_worker_capacity_prop = middle_manager_task_memory_params_present(config) + mm_task_property_present = mm_task_java_opts_property or mm_task_worker_capacity_prop + + # possible error states + # 1. memory argument is specified, also jvm.config or middleManger/runtime.properties having + # druid.indexer.runner.javaOptsArray or druid.worker.capacity parameters is present + # 2. jvm.config is not present for any service, but middleManger/runtime.properties has + # druid.indexer.runner.javaOptsArray or druid.worker.capacity parameters + # 3. jvm.config present for some but not all services + # 4. jvm.config file is present for all services, but it doesn't contain required parameters + # 5. lastly, if middleManager is to be started, and it is missing task memory properties + if jvm_config_count > 0 or mm_task_property_present: + if total_memory != "": + raise ValueError("If jvm.config is given for services, memory argument shouldn't be specified") + if jvm_config_count == 0: + raise ValueError("druid.indexer.runner.javaOptsArray or druid.worker.capacity is present in " + "middleManager/runtime.properties, \n " + "add jvm.config for all other services") + if jvm_config_count != len(service_list): + raise ValueError("jvm.config file should be present for all services or none") + for service in service_list: + verify_service_config(service, config) + + # compute memory only when none of the specified services contains jvm.config, + # if middleManager is to be started it doesn't contain task memory properties + return jvm_config_count == 0 and mm_task_property_present is False + + +def compute_system_memory(): + system_memory = psutil.virtual_memory().total # mem in bytes + memory_for_druid = int(system_memory / (1024 * 1024)) + return memory_for_druid + + +def convert_total_memory_string(memory): + try: + if memory == "": + computed_memory = compute_system_memory() + return computed_memory + elif memory.endswith(MEM_MB_SUFFIX): + return int(memory[:-1]) + elif memory.endswith(MEM_GB_SUFFIX): + return 1024 * int(memory[:-1]) + else: + raise ValueError('Incorrect format for memory argument, expected format is <integer_value><m/g>') + except Exception: + raise ValueError('Incorrect format for memory argument, expected format is <integer_value><m/g>') + + +def check_memory_constraint(total_memory, services): + # 80% of total memory >= sum of lower bound service memory should be + lower_bound_memory = 0 + + service_list = services.copy() + if MIDDLE_MANAGER in services: + service_list.append(TASKS) + + for service in service_list: + lower_bound_memory += MINIMUM_MEMORY_MB.get(service) + + required_memory = int(lower_bound_memory / 0.8) + + if total_memory < required_memory: + raise ValueError(f'Minimum memory required for starting services is {required_memory}m') + + if total_memory >= 2 * lower_bound_memory: + return int(total_memory / 2) + else: + return lower_bound_memory + + +def build_mm_task_java_opts_array(memory_type): + task_memory = f'-D{TASK_JAVA_OPTS_PROPERTY}=[' + + mem_array = TASK_MEM_MAP.get(memory_type) + + java_opts_list = TASK_JAVA_OPTS_ARRAY + mem_array + + for item in java_opts_list: + task_memory += f'\"{item}\";' Review Comment: That was the very first thing I tried, but the perl script wasn't able to parse the args correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
