findingrish commented on code in PR #13365:
URL: https://github.com/apache/druid/pull/13365#discussion_r1033847155


##########
examples/bin/start-druid:
##########
@@ -0,0 +1,545 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+import os
+import psutil
+import pathlib
+import multiprocessing
+import argparse
+
+QUICKSTART_ROOT_CONFIG_PATH = "conf/druid/single-server/quickstart"
+
+MEM_GB_SUFFIX = "g"
+MEM_MB_SUFFIX = "m"
+XMX_PARAMETER = "-Xmx"
+XMS_PARAMETER = "-Xms"
+DIRECT_MEM_PARAMETER = "-XX:MaxDirectMemorySize"
+SERVICE_SEPARATOR = ","
+
+TASK_JAVA_OPTS_ARRAY = ["-server", "-Duser.timezone=UTC", 
"-Dfile.encoding=UTF-8", "-XX:+ExitOnOutOfMemoryError",
+                        
"-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+TASK_JAVA_OPTS_PROPERTY = "druid.indexer.runner.javaOptsArray"
+TASK_WORKER_CAPACITY_PROPERTY = "druid.worker.capacity"
+TASK_COUNT = "task-count"
+TASK_MEM_TYPE_LOW = "low"
+TASK_MEM_TYPE_HIGH = "high"
+TASK_MEM_MAP = {
+    TASK_MEM_TYPE_LOW: ["-Xms256m", "-Xmx256m", 
"-XX:MaxDirectMemorySize=256g"],
+    TASK_MEM_TYPE_HIGH: ["-Xms1g", "-Xmx1g", "-XX:MaxDirectMemorySize=1g"]
+}
+
+BROKER = "broker"
+ROUTER = "router"
+COORDINATOR = "coordinator-overlord"
+HISTORICAL = "historical"
+MIDDLE_MANAGER = "middleManager"
+TASKS = "tasks"
+
+DEFAULT_SERVICES = [
+    BROKER,
+    ROUTER,
+    COORDINATOR,
+    HISTORICAL,
+    MIDDLE_MANAGER
+]
+
+SERVICE_MEMORY_RATIO = {
+    MIDDLE_MANAGER: 1,
+    ROUTER: 2,
+    COORDINATOR: 30,
+    BROKER: 46,
+    HISTORICAL: 80,
+    TASKS: 30
+}
+
+MINIMUM_MEMORY_MB = {
+    MIDDLE_MANAGER: 64,
+    ROUTER: 128,
+    TASKS: 1024,
+    BROKER: 900,
+    COORDINATOR: 256,
+    HISTORICAL: 900
+}
+
+HEAP_TO_TOTAL_MEM_RATIO = {
+    MIDDLE_MANAGER: 1,
+    ROUTER: 1,
+    COORDINATOR: 1,
+    BROKER: 0.60,
+    HISTORICAL: 0.40,
+    TASKS: 0.50
+}
+
+LOGGING_ENABLED = False
+
+
+def print_if_verbose(message):
+    if LOGGING_ENABLED:
+        print(message)
+
+
+def configure_parser():
+    parser = argparse.ArgumentParser(
+        prog='Druid quickstart',
+        formatter_class=argparse.RawTextHelpFormatter,
+        epilog=
+        """
+sample usage:
+    start-druid
+            Start up all the services (including zk).
+    start-druid -m=100g
+            Start up all the services (including zk)
+            using a total memory of 100GB.
+    start-druid -m=100g --compute
+            Compute memory distribution and validate arguments.
+    start-druid -m=100g -s=broker,router
+            Starts a broker and a router, using a total memory of 100GB.
+    start-druid -m=100g --s=broker,router \\
+    -c=conf/druid/single-server/custom
+            Starts a broker and a router, using a total memory of 100GB.
+            Reads configs for each service (jvm.config, runtime.properties)
+            from respective folders inside the given root config path.
+    start-druid -s=broker,router \\
+    -c=conf/druid/single-server/custom
+            Starts a broker and a router service, reading service configs
+            from the given root directory. Calculates memory requirements for
+            each service, if required, using upto 80% of the total system 
memory.
+    start-druid -m=100g \\
+    -s=broker,router \\
+    -c=conf/druid/single-server/custom \\
+    --zk
+            Starts broker, router and zookeeper.
+            zookeeper config is read from conf/zk.
+"""
+    )
+    parser.add_argument('--memory', '-m', type=str, required=False,
+                        help='Total memory for all processes (services and 
tasks, if any). \n'
+                             'This parameter is ignored if each service 
already has a jvm.config \n'
+                             'in the given conf directory. e.g. 500m, 4g, 
6g\n')
+    parser.add_argument('--services', '-s', type=str, required=False,
+                        help='List of services to be started, subset of \n'
+                             '{broker, router, middleManager, historical, 
coordinator-overlord}. \n'
+                             'If the argument is not given, all services \n'
+                             'and zookeeper is started. e.g. 
-sl=broker,historical')
+    parser.add_argument('--config', '-c', type=str, required=False,
+                        help='Relative path to the directory containing common 
and service \n'
+                             'specific properties to be overridden. \n'
+                             'This directory must contain \'_common\' 
directory with \n'
+                             '\'common.jvm.config\' & 
\'common.runtime.properties\' files. \n'
+                             'If this argument is not given, config from \n'
+                             'conf/druid/single-server/quickstart directory is 
used.\n')
+    parser.add_argument('--compute', action='store_true',
+                        help='Does not start Druid, only displays the memory 
allocated \n'
+                             'to each service if started with the given total 
memory.\n')
+    parser.add_argument('--zk', '-zk', action='store_true',
+                        help='Specification to run zookeeper, \n'
+                             'zk config is picked up from conf/zk.')
+    parser.add_argument('--verbose', action='store_true', help='Log details')
+
+    parser.set_defaults(zk=False)
+    parser.set_defaults(compute=False)
+    parser.set_defaults(verbose=False)
+
+    return parser
+
+
+def validate_common_jvm_args(config):
+    if pathlib.Path(f'{config}/_common/common.jvm.config').is_file() is False:
+        raise ValueError(f'_common/common.jvm.config file is missing in the 
root config, '
+                         f'check {QUICKSTART_ROOT_CONFIG_PATH}/_common 
directory')
+
+
+def validate_common_directory(config):
+    if pathlib.Path(f'{config}/_common').is_dir() is False:
+        raise ValueError(
+            f'_common directory is missing in the root config, check 
{QUICKSTART_ROOT_CONFIG_PATH}/_common directory')
+
+    if pathlib.Path(f'{config}/_common/common.runtime.properties').is_file() 
is False:
+        raise ValueError(f'_common/common.runtime.properties file is missing 
in the root config, '
+                         f'check {QUICKSTART_ROOT_CONFIG_PATH}/_common 
directory')
+
+
+def parse_arguments(args):
+    service_list = []
+    config = ""
+    total_memory = ""
+    compute = False
+    zk = False
+
+    if args.compute:
+        compute = True
+    if args.zk:
+        zk = True
+    if args.config is not None:
+        config = pathlib.Path(os.path.join(os.getcwd(), args.config)).resolve()
+        if os.path.exists(config) is False:
+            raise ValueError(f'config {config} not found')
+    if args.memory is not None:
+        total_memory = args.memory
+    if args.services is not None:
+        services = args.services.split(SERVICE_SEPARATOR)
+
+        for service in services:
+            if service not in DEFAULT_SERVICES:
+                raise ValueError(f'Invalid service name {service}, should be 
one of {DEFAULT_SERVICES}')
+
+            if service in service_list:
+                raise ValueError(f'{service} is specified multiple times')
+
+            service_list.append(service)
+
+    if len(service_list) == 0:
+        # start all services
+        service_list = DEFAULT_SERVICES
+        zk = True
+
+    return config, total_memory, service_list, zk, compute
+
+
+def print_startup_config(service_list, config, zk):
+    print_if_verbose(f'Starting {service_list}')
+    print_if_verbose(f'Reading config from {config}')
+    if zk:
+        zk_config = pathlib.Path(f'{os.getcwd()}/../conf/zk').resolve()
+        print_if_verbose(f'Starting zk, reading default config from 
{zk_config}')
+    print_if_verbose('\n')
+
+
+def middle_manager_task_memory_params_present(config):
+    java_opts_property_present = False
+    worker_capacity_property_present = False
+
+    if pathlib.Path(f'{config}/middleManager/runtime.properties').is_file():
+        with open(f'{config}/middleManager/runtime.properties') as file:
+            for line in file:
+                if line.startswith(TASK_JAVA_OPTS_PROPERTY):
+                    java_opts_property_present = True
+                elif line.startswith(TASK_WORKER_CAPACITY_PROPERTY):
+                    worker_capacity_property_present = True
+
+    return java_opts_property_present, worker_capacity_property_present
+
+
+def verify_service_config(service, config):
+    path = f'{config}/{service}/jvm.config'
+
+    required_parameters = [XMX_PARAMETER, XMS_PARAMETER]
+
+    if HEAP_TO_TOTAL_MEM_RATIO.get(service) != 1:
+        required_parameters.append(DIRECT_MEM_PARAMETER)
+
+    with open(path) as file:
+        for line in file:
+            if line.startswith(XMX_PARAMETER) and XMX_PARAMETER in 
required_parameters:
+                required_parameters.remove(XMX_PARAMETER)
+            if line.startswith(XMS_PARAMETER) and XMS_PARAMETER in 
required_parameters:
+                required_parameters.remove(XMS_PARAMETER)
+            if line.startswith(DIRECT_MEM_PARAMETER) and DIRECT_MEM_PARAMETER 
in required_parameters:
+                required_parameters.remove(DIRECT_MEM_PARAMETER)
+
+    if len(required_parameters) > 0:
+        params = ",".join(required_parameters)
+        raise ValueError(f'{params} missing in {service}/jvm.config')
+
+    if service == MIDDLE_MANAGER:
+        if pathlib.Path(f'{config}/{service}/runtime.properties').is_file() is 
False:
+            raise ValueError(f'{service}/runtime.properties file is missing in 
the root config')
+
+        mm_task_java_opts_property, mm_task_worker_capacity_prop = 
middle_manager_task_memory_params_present(config)
+
+        if mm_task_java_opts_property is False:
+            raise ValueError(f'{TASK_JAVA_OPTS_PROPERTY} property missing in 
{service}/runtime.properties')
+
+
+def should_compute_memory(config, total_memory, service_list):
+    """
+    if memory argument is given, memory for services and tasks is computed, 
jvm.config file
+    or runtime.properties with task memory specification shouldn't be present
+    Alternatively, all memory related parameters are specified
+    which implies following should be present:
+    jvm.config file for all services with -Xmx=***, Xms=*** parameters
+    -XX:MaxDirectMemorySize=** in jvm.config for broker and historical
+    druid.indexer.runner.javaOptsArray (optionally druid.worker.capacity) in
+    rootDirectory/middleManager/runtime.properties
+    """
+
+    jvm_config_count = 0
+    for service in service_list:
+        if pathlib.Path(f'{config}/{service}/jvm.config').is_file():
+            jvm_config_count += 1
+
+    mm_task_property_present = False
+    if MIDDLE_MANAGER in service_list:
+        mm_task_java_opts_property, mm_task_worker_capacity_prop = 
middle_manager_task_memory_params_present(config)
+        mm_task_property_present = mm_task_java_opts_property or 
mm_task_worker_capacity_prop
+
+    # possible error states
+    # 1. memory argument is specified, also jvm.config or 
middleManger/runtime.properties having
+    # druid.indexer.runner.javaOptsArray or druid.worker.capacity parameters 
is present
+    # 2. jvm.config is not present for any service, but 
middleManger/runtime.properties has
+    # druid.indexer.runner.javaOptsArray or druid.worker.capacity parameters
+    # 3. jvm.config present for some but not all services
+    # 4. jvm.config file is present for all services, but it doesn't contain 
required parameters
+    # 5. lastly, if middleManager is to be started, and it is missing task 
memory properties
+    if jvm_config_count > 0 or mm_task_property_present:
+        if total_memory != "":
+            raise ValueError("If jvm.config is given for services, memory 
argument shouldn't be specified")
+        if jvm_config_count == 0:
+            raise ValueError("druid.indexer.runner.javaOptsArray or 
druid.worker.capacity is present in "
+                             "middleManager/runtime.properties, \n "
+                             "add jvm.config for all other services")
+        if jvm_config_count != len(service_list):
+            raise ValueError("jvm.config file should be present for all 
services or none")
+        for service in service_list:
+            verify_service_config(service, config)
+
+    # compute memory only when none of the specified services contains 
jvm.config,
+    # if middleManager is to be started it doesn't contain task memory 
properties
+    return jvm_config_count == 0 and mm_task_property_present is False
+
+
+def compute_system_memory():
+    system_memory = psutil.virtual_memory().total  # mem in bytes
+    memory_for_druid = int(system_memory / (1024 * 1024))
+    return memory_for_druid
+
+
+def convert_total_memory_string(memory):
+    try:
+        if memory == "":
+            computed_memory = compute_system_memory()
+            return computed_memory
+        elif memory.endswith(MEM_MB_SUFFIX):
+            return int(memory[:-1])
+        elif memory.endswith(MEM_GB_SUFFIX):
+            return 1024 * int(memory[:-1])
+        else:
+            raise ValueError('Incorrect format for memory argument, expected 
format is <integer_value><m/g>')
+    except Exception:
+        raise ValueError('Incorrect format for memory argument, expected 
format is <integer_value><m/g>')
+
+
+def check_memory_constraint(total_memory, services):
+    # 80% of total memory >= sum of lower bound service memory should be
+    lower_bound_memory = 0
+
+    service_list = services.copy()
+    if MIDDLE_MANAGER in services:
+        service_list.append(TASKS)
+
+    for service in service_list:
+        lower_bound_memory += MINIMUM_MEMORY_MB.get(service)
+
+    required_memory = int(lower_bound_memory / 0.8)
+
+    if total_memory < required_memory:
+        raise ValueError(f'Minimum memory required for starting services is 
{required_memory}m')
+
+    if total_memory >= 2 * lower_bound_memory:
+        return int(total_memory / 2)
+    else:
+        return lower_bound_memory
+
+
+def build_mm_task_java_opts_array(memory_type):
+    task_memory = f'-D{TASK_JAVA_OPTS_PROPERTY}=['
+
+    mem_array = TASK_MEM_MAP.get(memory_type)
+
+    java_opts_list = TASK_JAVA_OPTS_ARRAY + mem_array
+
+    for item in java_opts_list:
+        task_memory += f'\"{item}\";'

Review Comment:
   I tried that out, since it saves the extra array transformation, but the 
perl script seemed to be working with comma separated args only. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to