This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e0cc087f379cb5534209c6c0d2ce16af05591d5a Author: yujun <[email protected]> AuthorDate: Mon Feb 26 17:26:02 2024 +0800 [feature](doris compose) Add create cloud cluster (#31315) --- docker/runtime/doris-compose/Dockerfile | 19 +- docker/runtime/doris-compose/Readme.md | 31 +- docker/runtime/doris-compose/cluster.py | 326 ++++++++++++++---- docker/runtime/doris-compose/command.py | 365 +++++++++++++++++---- docker/runtime/doris-compose/doris-compose.py | 2 +- .../doris-compose/resource/cloud.ini.example | 25 ++ docker/runtime/doris-compose/resource/common.sh | 95 +++++- docker/runtime/doris-compose/resource/fdb.conf | 45 +++ docker/runtime/doris-compose/resource/init_be.sh | 121 +++++-- .../runtime/doris-compose/resource/init_cloud.sh | 104 ++++++ .../resource/{common.sh => init_fdb.sh} | 65 ++-- docker/runtime/doris-compose/resource/init_fe.sh | 104 +++++- .../org/apache/doris/regression/suite/Suite.groovy | 17 +- .../doris/regression/suite/SuiteCluster.groovy | 16 +- .../suites/demo_p0/docker_action.groovy | 15 + 15 files changed, 1151 insertions(+), 199 deletions(-) diff --git a/docker/runtime/doris-compose/Dockerfile b/docker/runtime/doris-compose/Dockerfile index a524ce8c189..2306bf67cd2 100644 --- a/docker/runtime/doris-compose/Dockerfile +++ b/docker/runtime/doris-compose/Dockerfile @@ -25,12 +25,13 @@ ARG OUT_DIRECTORY=output ENV JAVA_HOME="/usr/local/openjdk-8/" ENV jacoco_version 0.8.8 +RUN mkdir -p /opt/apache-doris/coverage + RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list RUN apt-get clean - RUN apt-get update && \ - apt-get install -y default-mysql-client python lsof tzdata curl unzip && \ + apt-get install -y default-mysql-client python lsof tzdata curl unzip patchelf jq && \ ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ dpkg-reconfigure -f noninteractive tzdata && \ apt-get clean @@ -39,7 +40,17 @@ RUN curl -f https://repo1.maven.org/maven2/org/jacoco/jacoco/$jacoco_version/jac mkdir /jacoco && \ unzip jacoco.zip -d /jacoco -ADD $OUT_DIRECTORY /opt/apache-doris/ -RUN mkdir -p /opt/apache-doris/coverage +# cloud +COPY ${OUT_DIRECTORY}/../cloud/CMakeLists.txt ${OUT_DIRECTORY}/../cloud/output* /opt/apache-doris/cloud/ +RUN <<EOF + mkdir /opt/apache-doris/fdb + if [ -d /opt/apache-doris/cloud/bin ]; then + sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/cloud/bin/start.sh + fi +EOF + +# fe and be +COPY $OUT_DIRECTORY /opt/apache-doris/ # in docker, run 'chmod 755 doris_be' first time cost 1min, remove it. RUN sed -i 's/\<chmod\>/echo/g' /opt/apache-doris/be/bin/start_be.sh + diff --git a/docker/runtime/doris-compose/Readme.md b/docker/runtime/doris-compose/Readme.md index 7e7173e7d28..cd3d7805fe8 100644 --- a/docker/runtime/doris-compose/Readme.md +++ b/docker/runtime/doris-compose/Readme.md @@ -26,10 +26,13 @@ Use doris compose to create doris docker compose clusters. 1. The doris image should contains: ``` -/opt/apache-doris/{fe, be} +/opt/apache-doris/{fe, be, cloud} ``` -if build doris use `sh build.sh`, then its output satisfy with this, then run command in doris root +if don't create cloud cluster, the image no need to contains the cloud pkg. + + +if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root ``` docker build -f docker/runtime/doris-compose/Dockerfile -t <image> . @@ -52,13 +55,29 @@ python -m pip install --user -r docker/runtime/doris-compose/requirements.txt python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image?> --add-fe-num <add-fe-num> --add-be-num <add-be-num> --fe-id <fd-id> --be-id <be-id> - + ... + [ --cloud ] ``` if it's a new cluster, must specific the image. add fe/be nodes with the specific image, or update existing nodes with `--fe-id`, `--be-id` + +For create a cloud cluster, steps are as below: +1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'. + It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change this env var to change its path. + A Example file is locate in 'docker/runtime/doris-compose/resource/cloud.ini.example'. +2. Use doris compose up command with option '--cloud' to create a new cloud cluster. + +The simplest way to create a cloud cluster: + +``` +python docker/runtime/doris-compose/doris-compose.py up <cluster-name> <image> --cloud +``` + +It will create 1 fdb, 1 meta service server, 1 recycler, 3 fe and 3 be. + ### Remove node from the cluster ``` @@ -96,5 +115,11 @@ There are more options about doris-compose. Just try python docker/runtime/doris-compose/doris-compose.py <command> -h ``` +### Generate regression custom conf file + +``` +python docker/runtime/doris-compose/doris-compose.py config <cluster-name> +``` +Generate regression-conf-custom.groovy to connect to the specific docker cluster. diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index 23016bc27ba..660a243f5c4 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -28,6 +28,8 @@ DORIS_SUBNET_START = int(os.getenv("DORIS_SUBNET_START", 128)) LOCAL_RESOURCE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "resource") DOCKER_RESOURCE_PATH = os.path.join(DOCKER_DORIS_PATH, "resource") +CLOUD_CFG_FILE = os.getenv("DORIS_CLOUD_CFG_FILE", + os.path.join(LOCAL_DORIS_PATH, 'cloud.ini')) FE_HTTP_PORT = 8030 FE_RPC_PORT = 9020 @@ -39,6 +41,10 @@ BE_WEBSVR_PORT = 8040 BE_HEARTBEAT_PORT = 9050 BE_BRPC_PORT = 8060 +FDB_PORT = 4500 + +MS_PORT = 5000 + ID_LIMIT = 10000 IP_PART4_SIZE = 200 @@ -99,6 +105,24 @@ def get_master_fe_endpoint(cluster_name): return "" +def get_node_seq(node_type, id): + seq = id + seq += IP_PART4_SIZE + if node_type == Node.TYPE_FE: + seq += 0 * ID_LIMIT + elif node_type == Node.TYPE_BE: + seq += 1 * ID_LIMIT + elif node_type == Node.TYPE_MS: + seq += 2 * ID_LIMIT + elif node_type == Node.TYPE_RECYCLE: + seq += 3 * ID_LIMIT + elif node_type == Node.TYPE_FDB: + seq += 4 * ID_LIMIT + else: + seq += 5 * ID_LIMIT + return seq + + class NodeMeta(object): def __init__(self, image): @@ -151,46 +175,61 @@ class Group(object): class Node(object): TYPE_FE = "fe" TYPE_BE = "be" - TYPE_ALL = [TYPE_FE, TYPE_BE] + TYPE_MS = "ms" + TYPE_RECYCLE = "recycle" + TYPE_FDB = "fdb" + TYPE_ALL = [TYPE_FE, TYPE_BE, TYPE_MS, TYPE_RECYCLE, TYPE_FDB] - def __init__(self, cluster_name, coverage_dir, id, subnet, meta): - self.cluster_name = cluster_name - self.coverage_dir = coverage_dir + def __init__(self, cluster, id, meta): + self.cluster = cluster self.id = id - self.subnet = subnet self.meta = meta @staticmethod - def new(cluster_name, coverage_dir, node_type, id, subnet, meta): + def new(cluster, node_type, id, meta): if node_type == Node.TYPE_FE: - return FE(cluster_name, coverage_dir, id, subnet, meta) + return FE(cluster, id, meta) elif node_type == Node.TYPE_BE: - return BE(cluster_name, coverage_dir, id, subnet, meta) + return BE(cluster, id, meta) + elif node_type == Node.TYPE_MS: + return MS(cluster, id, meta) + elif node_type == Node.TYPE_RECYCLE: + return RECYCLE(cluster, id, meta) + elif node_type == Node.TYPE_FDB: + return FDB(cluster, id, meta) else: raise Exception("Unknown node type {}".format(node_type)) - def init_conf(self, config): + def init(self): + self.init_conf() + + def init_conf(self): path = self.get_path() os.makedirs(path, exist_ok=True) + config = self.get_add_init_config() + # copy config to local conf_dir = os.path.join(path, "conf") if not os.path.exists(conf_dir) or utils.is_dir_empty(conf_dir): - utils.copy_image_directory( - self.get_image(), "{}/{}/conf".format(DOCKER_DORIS_PATH, - self.node_type()), - conf_dir) + self.copy_conf_to_local(conf_dir) assert not utils.is_dir_empty(conf_dir), "conf directory {} is empty, " \ "check doris path in image is correct".format(conf_dir) utils.enable_dir_with_rw_perm(conf_dir) if config: with open(os.path.join(conf_dir, self.conf_file_name()), "a") as f: + f.write("\n") for item in config: f.write(item + "\n") for sub_dir in self.expose_sub_dirs(): os.makedirs(os.path.join(path, sub_dir), exist_ok=True) + def copy_conf_to_local(self, local_conf_dir): + utils.copy_image_directory(self.get_image(), + "{}/conf".format(self.docker_home_dir()), + local_conf_dir) + def is_fe(self): return self.node_type() == Node.TYPE_FE @@ -210,7 +249,7 @@ class Node(object): return "{}-{}".format(self.node_type(), self.id) def get_path(self): - return os.path.join(get_cluster_path(self.cluster_name), + return os.path.join(get_cluster_path(self.cluster.name), self.get_name()) def get_image(self): @@ -220,15 +259,8 @@ class Node(object): self.meta.image = image def get_ip(self): - seq = self.id - seq += IP_PART4_SIZE - if self.node_type() == Node.TYPE_FE: - seq += 0 * ID_LIMIT - elif self.node_type() == Node.TYPE_BE: - seq += 1 * ID_LIMIT - else: - seq += 2 * ID_LIMIT - return "{}.{}.{}".format(self.subnet, int(seq / IP_PART4_SIZE), + seq = get_node_seq(self.node_type(), self.id) + return "{}.{}.{}".format(self.cluster.subnet, int(seq / IP_PART4_SIZE), seq % IP_PART4_SIZE) @staticmethod @@ -244,25 +276,30 @@ class Node(object): return seq def service_name(self): - return utils.with_doris_prefix("{}-{}".format(self.cluster_name, + return utils.with_doris_prefix("{}-{}".format(self.cluster.name, self.get_name())) def docker_env(self): - enable_coverage = self.coverage_dir + enable_coverage = self.cluster.coverage_dir envs = { "MY_IP": self.get_ip(), "MY_ID": self.id, + "MY_TYPE": self.node_type(), "FE_QUERY_PORT": FE_QUERY_PORT, "FE_EDITLOG_PORT": FE_EDITLOG_PORT, "BE_HEARTBEAT_PORT": BE_HEARTBEAT_PORT, - "DORIS_HOME": os.path.join(DOCKER_DORIS_PATH, self.node_type()), + "DORIS_HOME": os.path.join(self.docker_home_dir()), "STOP_GRACE": 1 if enable_coverage else 0, + "IS_CLOUD": 1 if self.cluster.is_cloud else 0, } + if self.cluster.is_cloud: + envs["META_SERVICE_ENDPOINT"] = self.cluster.get_meta_server_addr() + if enable_coverage: outfile = "{}/coverage/{}-coverage-{}-{}".format( - DOCKER_DORIS_PATH, self.node_type(), self.cluster_name, + DOCKER_DORIS_PATH, self.node_type(), self.cluster.name, self.id) if self.node_type() == Node.TYPE_FE: envs["JACOCO_COVERAGE_OPT"] = "-javaagent:/jacoco/lib/jacocoagent.jar" \ @@ -274,36 +311,41 @@ class Node(object): return envs + def get_add_init_config(self): + return [] + def docker_ports(self): raise Exception("No implemented") + def docker_home_dir(self): + raise Exception("No implemented") + def compose(self): volumes = [ - "{}:{}/{}/{}".format(os.path.join(self.get_path(), sub_dir), - DOCKER_DORIS_PATH, self.node_type(), sub_dir) + "{}:{}/{}".format(os.path.join(self.get_path(), sub_dir), + self.docker_home_dir(), sub_dir) for sub_dir in self.expose_sub_dirs() ] + [ "{}:{}:ro".format(LOCAL_RESOURCE_PATH, DOCKER_RESOURCE_PATH), - "{}:{}/{}/status".format(get_status_path(self.cluster_name), - DOCKER_DORIS_PATH, self.node_type()), + "{}:{}/status".format(get_status_path(self.cluster.name), + self.docker_home_dir()), ] + [ "{0}:{0}:ro".format(path) for path in ("/etc/localtime", "/etc/timezone", "/usr/share/zoneinfo") if os.path.exists(path) ] - if self.coverage_dir: - volumes.append("{}:{}/coverage".format(self.coverage_dir, + if self.cluster.coverage_dir: + volumes.append("{}:{}/coverage".format(self.cluster.coverage_dir, DOCKER_DORIS_PATH)) - return { + content = { "cap_add": ["SYS_PTRACE"], "hostname": self.get_name(), "container_name": self.service_name(), - "entrypoint": self.entrypoint(), "environment": self.docker_env(), "image": self.get_image(), "networks": { - utils.with_doris_prefix(self.cluster_name): { + utils.with_doris_prefix(self.cluster.name): { "ipv4_address": self.get_ip(), } }, @@ -315,15 +357,49 @@ class Node(object): "volumes": volumes, } + if self.entrypoint(): + content["entrypoint"] = self.entrypoint() + + return content + class FE(Node): + def get_add_init_config(self): + cfg = [] + if self.cluster.fe_config: + cfg += self.cluster.fe_config + if self.cluster.is_cloud: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + "", + "# For regression-test", + "ignore_unsupported_properties_in_cloud_mode = true", + "merge_on_write_forced_to_false = true", + ] + + return cfg + + def docker_env(self): + envs = super().docker_env() + if self.cluster.is_cloud: + envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() + return envs + + def cloud_unique_id(self): + return "sql_server_{}".format(self.id) + def entrypoint(self): return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fe.sh")] def docker_ports(self): return [FE_HTTP_PORT, FE_EDITLOG_PORT, FE_RPC_PORT, FE_QUERY_PORT] + def docker_home_dir(self): + return os.path.join(DOCKER_DORIS_PATH, "fe") + def node_type(self): return Node.TYPE_FE @@ -333,17 +409,28 @@ class FE(Node): class BE(Node): - def entrypoint(self): - return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh")] - - def docker_ports(self): - return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT] - - def node_type(self): - return Node.TYPE_BE - - def expose_sub_dirs(self): - return super().expose_sub_dirs() + ["storage"] + def init(self): + super().init() + if self.cluster.is_cloud: + self.init_cluster_name() + self.init_disk(self.cluster.be_disks) + + def get_add_init_config(self): + cfg = [] + if self.cluster.be_config: + cfg += self.cluster.be_config + if self.cluster.is_cloud: + cfg += [ + "cloud_unique_id = " + self.cloud_unique_id(), + "meta_service_endpoint = {}".format( + self.cluster.get_meta_server_addr()), + 'tmp_file_dirs = [ {"path":"./storage/tmp","max_cache_bytes":10240000," "max_upload_bytes":10240000}]', + ] + return cfg + + def init_cluster_name(self): + with open("{}/conf/CLUSTER_NAME".format(self.get_path()), "w") as f: + f.write(self.cluster.be_cluster) def init_disk(self, be_disks): path = self.get_path() @@ -379,32 +466,138 @@ class BE(Node): with open("{}/conf/{}".format(path, self.conf_file_name()), "a") as f: storage_root_path = ";".join(dir_descs) if dir_descs else '""' - f.write("storage_root_path = {}\n".format(storage_root_path)) + f.write("\nstorage_root_path = {}\n".format(storage_root_path)) + + def entrypoint(self): + return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_be.sh")] + + def docker_env(self): + envs = super().docker_env() + if self.cluster.is_cloud: + envs["CLOUD_UNIQUE_ID"] = self.cloud_unique_id() + return envs + + def cloud_unique_id(self): + return "compute_node_{}".format(self.id) + + def docker_home_dir(self): + return os.path.join(DOCKER_DORIS_PATH, "be") + + def docker_ports(self): + return [BE_WEBSVR_PORT, BE_BRPC_PORT, BE_HEARTBEAT_PORT, BE_PORT] + + def node_type(self): + return Node.TYPE_BE + + def expose_sub_dirs(self): + return super().expose_sub_dirs() + ["storage"] + + +class CLOUD(Node): + + def get_add_init_config(self): + return ["fdb_cluster = " + self.cluster.get_fdb_cluster()] + + def docker_home_dir(self): + return os.path.join(DOCKER_DORIS_PATH, "cloud") + + def docker_ports(self): + return [MS_PORT] + + def conf_file_name(self): + return "doris_cloud.conf" + + +class MS(CLOUD): + + def entrypoint(self): + return [ + "bash", + os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), + "--meta-service" + ] + + def node_type(self): + return Node.TYPE_MS + + def docker_env(self): + envs = super().docker_env() + for key, value in self.cluster.cloud_store_config.items(): + envs[key] = value + return envs + + +class RECYCLE(CLOUD): + + def entrypoint(self): + return [ + "bash", + os.path.join(DOCKER_RESOURCE_PATH, "init_cloud.sh"), "--recycler" + ] + + def node_type(self): + return Node.TYPE_RECYCLE + + +class FDB(Node): + + def copy_conf_to_local(self, local_conf_dir): + os.makedirs(local_conf_dir, exist_ok=True) + with open(os.path.join(LOCAL_RESOURCE_PATH, "fdb.conf"), + "r") as read_file: + with open(os.path.join(local_conf_dir, self.conf_file_name()), + "w") as f: + publish_addr = "{}:{}".format(self.get_ip(), FDB_PORT) + f.write(read_file.read().replace("${PUBLISH-ADDRESS}", + publish_addr)) + + with open(os.path.join(local_conf_dir, "fdb.cluster"), "w") as f: + f.write(self.cluster.get_fdb_cluster()) + + def entrypoint(self): + return ["bash", os.path.join(DOCKER_RESOURCE_PATH, "init_fdb.sh")] + + def docker_home_dir(self): + return os.path.join(DOCKER_DORIS_PATH, "fdb") + + def docker_ports(self): + return [FDB_PORT] + + def node_type(self): + return Node.TYPE_FDB + + def expose_sub_dirs(self): + return super().expose_sub_dirs() + ["data"] class Cluster(object): - def __init__(self, name, subnet, image, fe_config, be_config, be_disks, - coverage_dir): + def __init__(self, name, subnet, image, is_cloud, fe_config, be_config, + be_disks, be_cluster, coverage_dir, cloud_store_config): self.name = name self.subnet = subnet self.image = image + self.is_cloud = is_cloud self.fe_config = fe_config self.be_config = be_config self.be_disks = be_disks + self.be_cluster = be_cluster self.coverage_dir = coverage_dir + self.cloud_store_config = cloud_store_config self.groups = { node_type: Group(node_type) for node_type in Node.TYPE_ALL } @staticmethod - def new(name, image, fe_config, be_config, be_disks, coverage_dir): + def new(name, image, is_cloud, fe_config, be_config, be_disks, be_cluster, + coverage_dir, cloud_store_config): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) with filelock.FileLock(os.path.join(LOCAL_DORIS_PATH, "lock")): subnet = gen_subnet_prefix16() - cluster = Cluster(name, subnet, image, fe_config, be_config, - be_disks, coverage_dir) + cluster = Cluster(name, subnet, image, is_cloud, fe_config, + be_config, be_disks, be_cluster, coverage_dir, + cloud_store_config) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() @@ -440,7 +633,9 @@ class Cluster(object): # cluster's nodes will update image too if cluster update. def set_image(self, image): self.image = image - for _, group in self.groups.items(): + for node_type, group in self.groups.items(): + if node_type == Node.TYPE_FDB: + continue for _, node_meta in group.nodes.items(): node_meta.image = image @@ -458,16 +653,15 @@ class Cluster(object): meta = group.get_node(id) if not meta: raise Exception("No found {} with id {}".format(node_type, id)) - return Node.new(self.name, self.coverage_dir, node_type, id, - self.subnet, meta) + return Node.new(self, node_type, id, meta) def get_all_nodes(self, node_type): group = self.groups.get(node_type, None) if not group: raise Exception("Unknown node_type: {}".format(node_type)) return [ - Node.new(self.name, self.coverage_dir, node_type, id, self.subnet, - meta) for id, meta in group.get_all_nodes().items() + Node.new(self, node_type, id, meta) + for id, meta in group.get_all_nodes().items() ] def get_all_nodes_num(self): @@ -481,15 +675,17 @@ class Cluster(object): id = self.get_group(node_type).add(id, node_meta) node = self.get_node(node_type, id) if not os.path.exists(node.get_path()): - if node.is_fe(): - node.init_conf(self.fe_config) - elif node.is_be(): - node.init_conf(self.be_config) - node.init_disk(self.be_disks) - else: - node.init_conf([]) + node.init() + return node + def get_fdb_cluster(self): + return "123456:123456@{}:{}".format( + self.get_node(Node.TYPE_FDB, 1).get_ip(), FDB_PORT) + + def get_meta_server_addr(self): + return "{}:{}".format(self.get_node(Node.TYPE_MS, 1).get_ip(), MS_PORT) + def remove(self, node_type, id): group = self.get_group(node_type) group.remove(id) diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 261d63ee803..7e1a4ef695a 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -31,8 +31,14 @@ LOG = utils.get_logger() # return for_all, related_nodes, related_node_num -def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False): - if fe_ids is None and be_ids is None: +def get_ids_related_nodes(cluster, + fe_ids, + be_ids, + ms_ids, + recycle_ids, + fdb_ids, + ignore_not_exists=False): + if fe_ids is None and be_ids is None and ms_ids is None and recycle_ids is None and fdb_ids is None: return True, None, cluster.get_all_nodes_num() def get_ids_related_nodes_with_type(node_type, ids): @@ -55,9 +61,17 @@ def get_ids_related_nodes(cluster, fe_ids, be_ids, ignore_not_exists=False): raise e return nodes - nodes = get_ids_related_nodes_with_type( - CLUSTER.Node.TYPE_FE, fe_ids) + get_ids_related_nodes_with_type( - CLUSTER.Node.TYPE_BE, be_ids) + type_ids = [ + (CLUSTER.Node.TYPE_FE, fe_ids), + (CLUSTER.Node.TYPE_BE, be_ids), + (CLUSTER.Node.TYPE_MS, ms_ids), + (CLUSTER.Node.TYPE_RECYCLE, recycle_ids), + (CLUSTER.Node.TYPE_FDB, fdb_ids), + ] + + nodes = [] + for node_type, ids in type_ids: + nodes.extend(get_ids_related_nodes_with_type(node_type, ids)) related_node_num = len(nodes) @@ -69,6 +83,9 @@ class Command(object): def __init__(self, name): self.name = name + def print_use_time(self): + return True + def add_parser(self, args_parsers): raise Exception("No implemented") @@ -88,13 +105,37 @@ class Command(object): "if specific --fe-id but not specific ids, apply to all fe. Example: '--fe-id 2 3' will select fe-2 and fe-3.") group.add_argument("--be-id", nargs="*", type=int, help="Specify up be ids, support multiple ids, " \ "if specific --be-id but not specific ids, apply to all be. Example: '--be-id' will select all backends.") + group.add_argument( + "--ms-id", + nargs="*", + type=int, + help= + "Specify up ms ids, support multiple ids. Only use in cloud cluster." + ) + group.add_argument( + "--recycle-id", + nargs="*", + type=int, + help= + "Specify up recycle ids, support multiple ids. Only use in cloud cluster." + ) + group.add_argument( + "--fdb-id", + nargs="*", + type=int, + help= + "Specify up fdb ids, support multiple ids. Only use in cloud cluster." + ) def _get_parser_bool_action(self, is_store_true): - if sys.version_info.major == 3 and sys.version_info.minor >= 9: + if self._support_boolean_action(): return argparse.BooleanOptionalAction else: return "store_true" if is_store_true else "store_false" + def _support_boolean_action(self): + return sys.version_info.major == 3 and sys.version_info.minor >= 9 + class SimpleCommand(Command): @@ -104,7 +145,8 @@ class SimpleCommand(Command): self.help = help def add_parser(self, args_parsers): - help = self.help + " If none of --fe-id, --be-id is specific, then apply to all containers." + help = self.help + " If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\ + "then apply to all containers." parser = args_parsers.add_parser(self.command, help=help) parser.add_argument("NAME", help="Specify cluster name.") self._add_parser_ids_args(parser) @@ -113,7 +155,8 @@ class SimpleCommand(Command): def run(self, args): cluster = CLUSTER.Cluster.load(args.NAME) _, related_nodes, related_node_num = get_ids_related_nodes( - cluster, args.fe_id, args.be_id) + cluster, args.fe_id, args.be_id, args.ms_id, args.recycle_id, + args.fdb_id) utils.exec_docker_compose_command(cluster.get_compose_file(), self.command, nodes=related_nodes) @@ -128,7 +171,8 @@ class UpCommand(Command): def add_parser(self, args_parsers): parser = args_parsers.add_parser("up", help="Create and upgrade doris containers, "\ "or add new containers. " \ - "If none of --add-fe-num, --add-be-num, --fe-id, --be-id is specific, " \ + "If none of --add-fe-num, --add-be-num, --add-ms-num, --add-recycle-num, "\ + "--fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, " \ "then apply to all containers.") parser.add_argument("NAME", default="", help="Specific cluster name.") parser.add_argument("IMAGE", @@ -136,6 +180,14 @@ class UpCommand(Command): nargs="?", help="Specify docker image.") + parser.add_argument( + "--cloud", + default=False, + action=self._get_parser_bool_action(True), + help= + "Create cloud cluster, default is false. Only use when creating new cluster." + ) + parser.add_argument( "--wait-timeout", type=int, @@ -161,6 +213,18 @@ class UpCommand(Command): help= "Specify add be num, default: 3 for a new cluster, 0 for a existing cluster." ) + group1.add_argument( + "--add-ms-num", + type=int, + help= + "Specify add ms num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster" + ) + group1.add_argument( + "--add-recycle-num", + type=int, + help= + "Specify add recycle num, default: 1 for a new cloud cluster, 0 for a existing cluster. Only use in cloud cluster" + ) group1.add_argument("--fe-config", nargs="*", type=str, @@ -180,16 +244,32 @@ class UpCommand(Command): "Example: --be-disks \"HDD=1\", \"SSD=1,10\", \"SSD=2,100\""\ "means each be has 1 HDD without capactity limit, 1 SSD with 10GB capactity limit, "\ "2 SSD with 100GB capactity limit") + group1.add_argument( + "--be-cluster", + type=str, + help= + "be cluster name, if not specific, will use compute_cluster. Only use in cloud cluster." + ) self._add_parser_ids_args(parser) group2 = parser.add_mutually_exclusive_group() - group2.add_argument( - "--start", - default=True, - action=self._get_parser_bool_action(False), - help="Start containers, default is true. If specific --no-start, "\ - "will create or update config image only but not start containers.") + if self._support_boolean_action(): + group2.add_argument( + "--start", + default=True, + action=self._get_parser_bool_action(False), + help="Start containers, default is true. If specific --no-start, "\ + "will create or update config image only but not start containers.") + else: + group2.add_argument( + "--no-start", + dest='start', + default=True, + action=self._get_parser_bool_action(False), + help= + "Create or update config image only and don't start containers." + ) group2.add_argument("--force-recreate", default=False, action=self._get_parser_bool_action(True), @@ -200,13 +280,31 @@ class UpCommand(Command): default="", help="code coverage output directory") + parser.add_argument( + "--fdb-version", + type=str, + default="7.1.26", + help="fdb image version. Only use in cloud cluster.") + def run(self, args): if not args.NAME: raise Exception("Need specific not empty cluster name") for_all = True + add_fdb_num = 0 try: cluster = CLUSTER.Cluster.load(args.NAME) - if args.fe_id != None or args.be_id != None or args.add_fe_num or args.add_be_num: + + if not cluster.is_cloud: + args.add_ms_num = None + args.add_recycle_num = None + args.ms_id = None + args.recycle_id = None + args.fdb_id = None + + if args.fe_id != None or args.be_id != None \ + or args.ms_id != None or args.recycle_id != None or args.fdb_id != None \ + or args.add_fe_num or args.add_be_num \ + or args.add_ms_num or args.add_recycle_num: for_all = False except: # a new cluster @@ -220,37 +318,83 @@ class UpCommand(Command): args.be_id = None LOG.warning( utils.render_yellow("Ignore --be-id for new cluster")) - cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, + + args.fdb_id = None + args.ms_id = None + args.recycle_id = None + + if args.add_fe_num is None: + args.add_fe_num = 3 + if args.add_be_num is None: + args.add_be_num = 3 + + cloud_store_config = {} + if args.cloud: + add_fdb_num = 1 + if not args.add_ms_num: + args.add_ms_num = 1 + if not args.add_recycle_num: + args.add_recycle_num = 1 + if not args.be_cluster: + args.be_cluster = "compute_cluster" + cloud_store_config = self._get_cloud_store_config() + else: + args.add_ms_num = 0 + args.add_recycle_num = 0 + + cluster = CLUSTER.Cluster.new(args.NAME, args.IMAGE, args.cloud, args.fe_config, args.be_config, - args.be_disks, args.coverage_dir) + args.be_disks, args.be_cluster, + args.coverage_dir, + cloud_store_config) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) - if not args.add_fe_num: - args.add_fe_num = 3 - if not args.add_be_num: - args.add_be_num = 3 + + if args.be_cluster and cluster.is_cloud: + cluster.be_cluster = args.be_cluster _, related_nodes, _ = get_ids_related_nodes(cluster, args.fe_id, - args.be_id) - add_be_ids = [] + args.be_id, args.ms_id, + args.recycle_id, + args.fdb_id) add_fe_ids = [] + add_be_ids = [] + add_ms_ids = [] + add_recycle_ids = [] + add_fdb_ids = [] + + add_type_nums = [ + (CLUSTER.Node.TYPE_FDB, add_fdb_num, add_fdb_ids), + (CLUSTER.Node.TYPE_MS, args.add_ms_num, add_ms_ids), + (CLUSTER.Node.TYPE_RECYCLE, args.add_recycle_num, add_recycle_ids), + (CLUSTER.Node.TYPE_FE, args.add_fe_num, add_fe_ids), + (CLUSTER.Node.TYPE_BE, args.add_be_num, add_be_ids), + ] + if not related_nodes: related_nodes = [] - if args.add_fe_num: - for i in range(args.add_fe_num): - fe = cluster.add(CLUSTER.Node.TYPE_FE) - related_nodes.append(fe) - add_fe_ids.append(fe.id) - if args.add_be_num: - for i in range(args.add_be_num): - be = cluster.add(CLUSTER.Node.TYPE_BE) - related_nodes.append(be) - add_be_ids.append(be.id) + + def do_add_node(node_type, add_num, add_ids): + if not add_num: + return + for i in range(add_num): + node = cluster.add(node_type) + related_nodes.append(node) + add_ids.append(node.id) + + for node_type, add_num, add_ids in add_type_nums: + do_add_node(node_type, add_num, add_ids) + if args.IMAGE: for node in related_nodes: node.set_image(args.IMAGE) - if for_all and args.IMAGE: - cluster.set_image(args.IMAGE) + if for_all: + cluster.set_image(args.IMAGE) + + for node in cluster.get_all_nodes(CLUSTER.Node.TYPE_FDB): + node.set_image("foundationdb/foundationdb:{}".format( + args.fdb_version)) + cluster.save() options = [] @@ -269,6 +413,8 @@ class UpCommand(Command): utils.exec_docker_compose_command(cluster.get_compose_file(), "up", options, related_nodes) + ls_cmd = "python docker/runtime/doris-compose/doris-compose.py ls " + cluster.name + LOG.info("Inspect command: " + utils.render_green(ls_cmd) + "\n") LOG.info( "Master fe query address: " + utils.render_green(CLUSTER.get_master_fe_endpoint(cluster.name)) + @@ -318,8 +464,59 @@ class UpCommand(Command): "be": { "add_list": add_be_ids, }, + "ms": { + "add_list": add_ms_ids, + }, + "recycle": { + "add_list": add_recycle_ids, + }, + "fdb": { + "add_list": add_fdb_ids, + }, } + def _get_cloud_store_config(self): + example_cfg_file = os.path.join(CLUSTER.LOCAL_RESOURCE_PATH, + "cloud.ini.example") + if not CLUSTER.CLOUD_CFG_FILE: + raise Exception("Cloud cluster need S3 store, specific its config in a file.\n" \ + "A example file is " + example_cfg_file + ".\n" \ + "Then setting the env variable `export DORIS_CLOUD_CFG_FILE=<cfg-file-path>`.") + + if not os.path.exists(CLUSTER.CLOUD_CFG_FILE): + raise Exception("Cloud store config file '" + + CLUSTER.CLOUD_CFG_FILE + "' not exists.") + + config = {} + with open(example_cfg_file, "r") as f: + for line in f.readlines(): + if line.startswith('#'): + continue + pos = line.find('=') + if pos <= 0: + continue + key = line[0:pos].strip() + if key: + config[key] = "" + + with open(CLUSTER.CLOUD_CFG_FILE, "r") as f: + for line in f.readlines(): + if line.startswith('#'): + continue + pos = line.find('=') + if pos <= 0: + continue + key = line[0:pos].strip() + if key and config.get(key, None) != None: + config[key] = line[line.find('=') + 1:].strip() + + for key, value in config.items(): + if not value: + raise Exception( + "Should provide none empty property '{}' in file {}". + format(key, CLUSTER.CLOUD_CFG_FILE)) + return config + class DownCommand(Command): @@ -327,7 +524,7 @@ class DownCommand(Command): parser = args_parsers.add_parser("down", help="Down doris containers, networks. "\ "It will also remove node from DB. " \ - "If none of --fe-id, --be-id is specific, "\ + "If none of --fe-id, --be-id, --ms-id, --recycle-id, --fdb-id is specific, "\ "then apply to all containers.") parser.add_argument("NAME", help="Specify cluster name") self._add_parser_ids_args(parser) @@ -353,7 +550,13 @@ class DownCommand(Command): except: return "Cluster not exists or load failed" for_all, related_nodes, related_node_num = get_ids_related_nodes( - cluster, args.fe_id, args.be_id, ignore_not_exists=True) + cluster, + args.fe_id, + args.be_id, + args.ms_id, + args.recycle_id, + args.fdb_id, + ignore_not_exists=True) if for_all: if os.path.exists(cluster.get_compose_file()): @@ -401,8 +604,9 @@ class DownCommand(Command): if args.clean: utils.enable_dir_with_rw_perm(node.get_path()) shutil.rmtree(node.get_path()) - register_file = "{}/{}-register".format( - CLUSTER.get_status_path(cluster.name), node.get_ip()) + register_file = "{}/{}-{}-register".format( + CLUSTER.get_status_path(cluster.name), + node.node_type(), node.get_ip()) if os.path.exists(register_file): os.remove(register_file) LOG.info( @@ -444,8 +648,8 @@ class ListNode(object): result = [ self.cluster_name, "{}-{}".format(self.node_type, self.id), self.ip, self.status, self.container_id, self.image, self.created, - self.alive, self.is_master, self.query_port, self.backend_id, - self.tablet_num, self.last_heartbeat, self.err_msg + self.alive, self.is_master, self.backend_id, self.tablet_num, + self.last_heartbeat, self.err_msg ] if detail: query_port = "" @@ -483,6 +687,48 @@ class ListNode(object): self.err_msg = be.err_msg +class GenConfCommand(Command): + + def print_use_time(self): + return False + + def add_parser(self, args_parsers): + parser = args_parsers.add_parser( + "config", + help="Generate regression-conf-custom.groovy for regression test.") + parser.add_argument("NAME", default="", help="Specific cluster name.") + + return parser + + def run(self, args): + content = ''' +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" +targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" +feSourceThriftAddress = "127.0.0.1:9020" +feTargetThriftAddress = "127.0.0.1:9020" +syncerAddress = "127.0.0.1:9190" +feHttpAddress = "127.0.0.1:8030" +''' + master_fe_ip = CLUSTER.get_master_fe_endpoint(args.NAME) + if not master_fe_ip: + print("Not found cluster with name {} in directory {}".format( + args.NAME, CLUSTER.LOCAL_DORIS_PATH)) + return + doris_root_dir = os.path.abspath(__file__) + for i in range(4): + doris_root_dir = os.path.dirname(doris_root_dir) + regression_conf_custom = doris_root_dir + "/regression-test/conf/regression-conf-custom.groovy" + if input("write file {} ?\n y/N: ".format( + regression_conf_custom)) != 'y': + print("No write regression custom file.") + return + with open(regression_conf_custom, "w") as f: + f.write( + content.replace("127.0.0.1", + master_fe_ip[:master_fe_ip.find(':')])) + print("Write succ: " + regression_conf_custom) + + class ListCommand(Command): def add_parser(self, args_parsers): @@ -574,7 +820,7 @@ class ListCommand(Command): TYPE_COMPOSESERVICE = type(ComposeService("", "", "")) if not args.NAME: - header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", + header = ("CLUSTER", "OWNER", "STATUS", "MASTER FE", "CLOUD", "CONFIG FILES") rows = [] for name in sorted(clusters.keys()): @@ -591,16 +837,24 @@ class ListCommand(Command): ]) owner = utils.get_path_owner(CLUSTER.get_cluster_path(name)) compose_file = CLUSTER.get_compose_file(name) + + is_cloud = "" + try: + cluster = CLUSTER.Cluster.load(name) + is_cloud = "true" if cluster.is_cloud else "false" + except: + pass + rows.append((name, owner, show_status, - CLUSTER.get_master_fe_endpoint(name), + CLUSTER.get_master_fe_endpoint(name), is_cloud, "{}{}".format(compose_file, cluster_info["status"]))) return self._handle_data(header, rows) header = [ "CLUSTER", "NAME", "IP", "STATUS", "CONTAINER ID", "IMAGE", - "CREATED", "alive", "is_master", "query_port", "backend_id", - "tablet_num", "last_heartbeat", "err_msg" + "CREATED", "alive", "is_master", "backend_id", "tablet_num", + "last_heartbeat", "err_msg" ] if args.detail: header += [ @@ -644,6 +898,11 @@ class ListCommand(Command): node.image = ",".join(container.image.tags) node.container_id = container.short_id node.status = container.status + if node.container_id and \ + node_type in (CLUSTER.Node.TYPE_FDB, + CLUSTER.Node.TYPE_MS, + CLUSTER.Node.TYPE_RECYCLE): + node.alive = "true" for id, fe in db_mgr.fe_states.items(): if fe_ids.get(id, False): @@ -666,17 +925,10 @@ class ListCommand(Command): node.update_db_info(db_mgr) nodes.append(node) - def get_key(node): - key = node.id - if node.node_type == CLUSTER.Node.TYPE_FE: - key += 0 * CLUSTER.ID_LIMIT - elif node.node_type == CLUSTER.Node.TYPE_BE: - key += 1 * CLUSTER.ID_LIMIT - else: - key += 2 * CLUSTER.ID_LIMIT - return key + def get_node_seq(node): + return CLUSTER.get_node_seq(node.node_type, node.id) - for node in sorted(nodes, key=get_key): + for node in sorted(nodes, key=get_node_seq): rows.append(node.info(args.detail)) return self._handle_data(header, rows) @@ -690,5 +942,6 @@ ALL_COMMANDS = [ SimpleCommand("restart", "Restart the doris containers. "), SimpleCommand("pause", "Pause the doris containers. "), SimpleCommand("unpause", "Unpause the doris containers. "), + GenConfCommand("config"), ListCommand("ls"), ] diff --git a/docker/runtime/doris-compose/doris-compose.py b/docker/runtime/doris-compose/doris-compose.py index c29449b116d..0091b70eae9 100644 --- a/docker/runtime/doris-compose/doris-compose.py +++ b/docker/runtime/doris-compose/doris-compose.py @@ -36,7 +36,7 @@ def run(args, disable_log, help): if args.command == cmd.name: timer = utils.Timer() result = cmd.run(args) - if not disable_log: + if cmd.print_use_time() and not disable_log: timer.show() return result print(help) diff --git a/docker/runtime/doris-compose/resource/cloud.ini.example b/docker/runtime/doris-compose/resource/cloud.ini.example new file mode 100644 index 00000000000..2ce113a2b53 --- /dev/null +++ b/docker/runtime/doris-compose/resource/cloud.ini.example @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +DORIS_CLOUD_USER=xxxx +DORIS_CLOUD_AK=xxxx +DORIS_CLOUD_SK=xxxx +DORIS_CLOUD_BUCKET=xxxx +DORIS_CLOUD_ENDPOINT=cos.ap-hongkong.myqcloud.com +DORIS_CLOUD_EXTERNAL_ENDPOINT=cos.ap-hongkong.myqcloud.com +DORIS_CLOUD_REGION=ap-hongkong +DORIS_CLOUD_PROVIDER=COS diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/common.sh index 5342eb0e54b..de6ba29865a 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/common.sh @@ -17,21 +17,102 @@ export MASTER_FE_IP="" export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip +export HAS_INIT_FDB_FILE=${DORIS_HOME}/status/has_init_fdb +export HAS_CREATE_INSTANCE_FILE=$DORIS_HOME/status/has_create_instance export LOG_FILE=$DORIS_HOME/log/health.out +export LOCK_FILE=$DORIS_HOME/status/token health_log() { echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE } -read_master_fe_ip() { - MASTER_FE_IP=$(cat $MASTER_FE_IP_FILE) - if [ $? -eq 0 ]; then - health_log "master fe ${MASTER_FE_IP} has ready." - return 0 - else +# concurrent write meta service server will failed due to fdb txn conflict. +# so add lock to protect writing ms txns. +lock_cluster() { + health_log "start acquire token" + while true; do + if [ -f $LOCK_FILE ]; then + if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + health_log "rm $LOCK_FILE generate by myself" + rm $LOCK_FILE + continue + fi + + mt=$(stat -c %Y $LOCK_FILE) + if [ -z "$mt" ]; then + health_log "get $LOCK_FILE modify time failed" + sleep 0.1 + continue + fi + + now=$(date '+%s') + diff=$(expr $now - $mt) + if [ $diff -lt 10 ]; then + sleep 0.1 + continue + fi + + rm $LOCK_FILE + health_log "rm $LOCK_FILE due to exceeds $diff seconds." + fi + + if [ ! -f $LOCK_FILE ]; then + echo $MY_IP >$LOCK_FILE + fi + + sleep 0.1 + + if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + break + fi + + sleep 0.1 + done + + health_log "now got token" +} + +unlock_cluster() { + if [ ! -f $LOCK_FILE ]; then + return + fi + + if [ "a$(cat $LOCK_FILE)" == "a${MY_IP}" ]; then + rm $LOCK_FILE + fi +} + +wait_master_fe_ready() { + while true; do + MASTER_FE_IP=$(cat $MASTER_FE_IP_FILE) + if [ -n "$MASTER_FE_IP" ]; then + health_log "master fe ${MASTER_FE_IP} has ready." + break + fi health_log "master fe has not ready." - return 1 + sleep 1 + done +} + +wait_create_instance() { + ok=0 + for ((i = 0; i < 30; i++)); do + if [ -f $HAS_CREATE_INSTANCE_FILE ]; then + ok=1 + break + fi + + health_log "has not create instance, not found file $HAS_CREATE_INSTANCE_FILE" + + sleep 1 + done + + if [ $ok -eq 0 ]; then + health_log "wait create instance file too long, exit" + exit 1 fi + + health_log "check has create instance ok" } wait_pid() { diff --git a/docker/runtime/doris-compose/resource/fdb.conf b/docker/runtime/doris-compose/resource/fdb.conf new file mode 100644 index 00000000000..14c9976e4fc --- /dev/null +++ b/docker/runtime/doris-compose/resource/fdb.conf @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[fdbmonitor] +user = root +group = root + +[general] +restart-delay = 60 +cluster-file = /opt/apache-doris/fdb/conf/fdb.cluster + +## Default parameters for individual fdbserver processes +[fdbserver] +command = /usr/bin/fdbserver +public-address = ${PUBLISH-ADDRESS} +listen-address = public +logdir = /opt/apache-doris/fdb/log +datadir = /opt/apache-doris/fdb/data +logsize = 10MiB +maxlogssize = 100MiB +memory = 1GiB +storage-memory = 200MiB +cache-memory = 200MiB + +## An individual fdbserver process with id 4500 +## Parameters set here override defaults from the [fdbserver] section +[fdbserver.4500] + +[backup_agent] +command = /usr/bin/backup_agent +logdir = /opt/apache-doris/fdb/log diff --git a/docker/runtime/doris-compose/resource/init_be.sh b/docker/runtime/doris-compose/resource/init_be.sh index 4d7b9780c30..0df464c625c 100755 --- a/docker/runtime/doris-compose/resource/init_be.sh +++ b/docker/runtime/doris-compose/resource/init_be.sh @@ -22,20 +22,17 @@ DIR=$( source $DIR/common.sh -REGISTER_FILE=$DORIS_HOME/status/$MY_IP-register +REGISTER_FILE=$DORIS_HOME/status/be-$MY_IP-register + +add_local_be() { + wait_master_fe_ready -add_backend() { while true; do - read_master_fe_ip - if [ $? -ne 0 ]; then - sleep 1 - continue - fi - lsof -i:$BE_HEARTBEAT_PORT - if [ $? -ne 0 ]; then - sleep 1 - continue - fi + #lsof -i:$BE_HEARTBEAT_PORT + #if [ $? -ne 0 ]; then + # sleep 1 + # continue + #fi output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD BACKEND '$MY_IP:$BE_HEARTBEAT_PORT';" 2>&1) res=$? @@ -44,8 +41,80 @@ add_backend() { (echo $output | grep "Same backend already exists") && break sleep 1 done +} - touch $REGISTER_FILE +add_cloud_be() { + if [ -f "${DORIS_HOME}/log/be.out" ]; then + return + fi + + cluster_file_name="${DORIS_HOME}/conf/CLUSTER_NAME" + cluster_name=$(cat $cluster_file_name) + if [ -z $cluster_name ]; then + health_log "Empty cluster name, it should specific in file ${cluster_file_name}" + exit 1 + fi + + cluster_id="${cluster_name}_id" + + wait_create_instance + + nodes='{ + "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", + "ip": "'"${MY_IP}"'", + "heartbeat_port": "'"${BE_HEARTBEAT_PORT}"'" + }' + + lock_cluster + + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/add_cluster?token=greedisgood9999" \ + -d '{"instance_id": "default_instance_id", + "cluster": { + "type": "COMPUTE", + "cluster_name": "'"${cluster_name}"'", + "cluster_id": "'"${cluster_id}"'", + "nodes": ['"${nodes}"'] + }}') + + health_log "add cluster. output: $output" + code=$(jq -r '.code' <<<$output) + + # cluster has exists + if [ "$code" == "ALREADY_EXISTED" ]; then + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/add_node?token=greedisgood9999" \ + -d '{"instance_id": "default_instance_id", + "cluster": { + "type": "COMPUTE", + "cluster_name": "'"${cluster_name}"'", + "cluster_id": "'"${cluster_id}"'", + "nodes": ['"${nodes}"'] + }}') + fi + + unlock_cluster + + health_log "add cluster. output: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "add cluster failed, exit." + exit 1 + fi + + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/get_cluster?token=greedisgood9999" \ + -d '{"instance_id": "default_instance_id", + "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", + "cluster_name": "'"${cluster_name}"'", + "cluster_id": "'"${cluster_id}"'" + }') + + health_log "get cluster is: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "get cluster failed, exit." + exit 1 + fi } stop_backend() { @@ -63,7 +132,7 @@ wait_process() { for ((i = 0; i < 5; i++)); do sleep 1s pid=$(pgrep doris_be) - if [ -n $pid ]; then + if [ -n "$pid" ]; then break fi done @@ -71,16 +140,30 @@ wait_process() { wait_pid $pid } -main() { - trap stop_backend SIGTERM +add_be_to_cluster() { + if [ -f $REGISTER_FILE ]; then + return + fi - if [ ! -f $REGISTER_FILE ]; then - add_backend & + if [ "${IS_CLOUD}" == "1" ]; then + add_cloud_be + else + add_local_be fi - if [ -n $LLVM_PROFILE_FILE_PREFIX ]; then + touch $REGISTER_FILE + health_log "register be" +} + +main() { + trap stop_backend SIGTERM + + if [ -n "$LLVM_PROFILE_FILE_PREFIX" ]; then export LLVM_PROFILE_FILE="${LLVM_PROFILE_FILE_PREFIX}-$(date +%s)" fi + + add_be_to_cluster + health_log "run start_be.sh" bash $DORIS_HOME/bin/start_be.sh --daemon diff --git a/docker/runtime/doris-compose/resource/init_cloud.sh b/docker/runtime/doris-compose/resource/init_cloud.sh new file mode 100644 index 00000000000..baa983aee95 --- /dev/null +++ b/docker/runtime/doris-compose/resource/init_cloud.sh @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ARGS=$@ + +DIR=$( + cd $(dirname $0) + pwd +) + +source $DIR/common.sh + +wait_fdb_ready() { + while true; do + if [ -f $HAS_INIT_FDB_FILE ]; then + health_log "has init fdb" + return + fi + + health_log "hasn't init fdb, waiting" + + sleep 1 + done +} + +check_init_cloud() { + if [ -f $HAS_CREATE_INSTANCE_FILE ]; then + return + fi + + if [ "$MY_TYPE" != "ms" -o "$MY_ID" != "1" ]; then + return + fi + + while true; do + + lock_cluster + + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/create_instance?token=greedisgood9999" \ + -d '{"instance_id":"default_instance_id", + "name": "default_instance", + "user_id": "'"${DORIS_CLOUD_USER}"'", + "obj_info": { + "ak": "'"${DORIS_CLOUD_AK}"'", + "sk": "'"${DORIS_CLOUD_SK}"'", + "bucket": "'"${DORIS_CLOUD_BUCKET}"'", + "endpoint": "'"${DORIS_CLOUD_ENDPOINT}"'", + "external_endpoint": "'"${DORIS_CLOUD_EXTERNAL_ENDPOINT}"'", + "prefix": "'"${DORIS_CLOUD_PREFIX}"'", + "region": "'"${DORIS_CLOUD_REGION}"'", + "provider": "'"${DORIS_CLOUD_PROVIDER}"'" + }}') + + unlock_cluster + + health_log "create instance output: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "create instance failed" + sleep 1 + continue + fi + + health_log "create doris instance succ, output: $output" + touch $HAS_CREATE_INSTANCE_FILE + break + done +} + +stop_cloud() { + cd $DORIS_HOME + bash bin/stop.sh +} + +main() { + trap stop_cloud SIGTERM + + cd $DORIS_HOME + + wait_fdb_ready + + check_init_cloud & + + health_log "input args: $ARGS" + + bash bin/start.sh $ARGS +} + +main diff --git a/docker/runtime/doris-compose/resource/common.sh b/docker/runtime/doris-compose/resource/init_fdb.sh similarity index 54% copy from docker/runtime/doris-compose/resource/common.sh copy to docker/runtime/doris-compose/resource/init_fdb.sh index 5342eb0e54b..b3b22f44ee4 100644 --- a/docker/runtime/doris-compose/resource/common.sh +++ b/docker/runtime/doris-compose/resource/init_fdb.sh @@ -15,41 +15,46 @@ # specific language governing permissions and limitations # under the License. -export MASTER_FE_IP="" -export MASTER_FE_IP_FILE=$DORIS_HOME/status/master_fe_ip -export LOG_FILE=$DORIS_HOME/log/health.out +DIR=$( + cd $(dirname $0) + pwd +) -health_log() { - echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE -} - -read_master_fe_ip() { - MASTER_FE_IP=$(cat $MASTER_FE_IP_FILE) - if [ $? -eq 0 ]; then - health_log "master fe ${MASTER_FE_IP} has ready." - return 0 - else - health_log "master fe has not ready." - return 1 - fi -} +source $DIR/common.sh -wait_pid() { - pid=$1 - health_log "" - health_log "ps -elf\n$(ps -elf)\n" - if [ -z $pid ]; then - health_log "pid not exist" - exit 1 +init_db() { + if [ -f $HAS_INIT_FDB_FILE ]; then + return fi - health_log "wait process $pid" - while true; do - ps -p $pid >/dev/null - if [ $? -ne 0 ]; then + for ((i = 0; i < 10; i++)); do + /usr/bin/fdbcli -C ${DORIS_HOME}/conf/fdb.cluster --exec 'configure new single ssd' + if [ $? -eq 0 ]; then + touch $HAS_INIT_FDB_FILE + health_log "fdbcli init cluster succ" break fi - sleep 1s + + health_log "fdbcli init cluster failed" + sleep 1 done - health_log "wait end" + + if [ ! -f $HAS_INIT_FDB_FILE ]; then + health_log "fdbcli init cluster failed, exit" + exit 1 + fi +} + +stop_fdb() { + exit 0 +} + +main() { + trap stop_fdb SIGTERM + + init_db & + + fdbmonitor --conffile ${DORIS_HOME}/conf/fdb.conf --lockfile ${DORIS_HOME}/fdbmonitor.pid } + +main diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index 1762f10d1fa..e532d0d56e1 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -22,14 +22,12 @@ DIR=$( source $DIR/common.sh -add_frontend() { - while true; do - read_master_fe_ip - if [ $? -ne 0 ]; then - sleep 1 - continue - fi +REGISTER_FILE=$DORIS_HOME/status/fe-$MY_IP-register + +add_local_fe() { + wait_master_fe_ready + while true; do output=$(mysql -P $FE_QUERY_PORT -h $MASTER_FE_IP -u root --execute "ALTER SYSTEM ADD FOLLOWER '$MY_IP:$FE_EDITLOG_PORT';" 2>&1) res=$? health_log "${output}\n" @@ -37,6 +35,8 @@ add_frontend() { (echo $output | grep "frontend already exists") && break sleep 1 done + + touch $REGISTER_FILE } fe_daemon() { @@ -81,6 +81,69 @@ fe_daemon() { done } +add_cloud_fe() { + if [ -f "$REGISTER_FILE" ]; then + return + fi + + wait_create_instance + + action=add_cluster + node_type=FE_MASTER + if [ "$MY_ID" != "1" ]; then + wait_master_fe_ready + action=add_node + node_type=FE_OBSERVER + fi + + nodes='{ + "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", + "ip": "'"${MY_IP}"'", + "edit_log_port": "'"${FE_EDITLOG_PORT}"'", + "node_type": "'"${node_type}"'" + }' + + lock_cluster + + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/${action}?token=greedisgood9999" \ + -d '{"instance_id": "default_instance_id", + "cluster": { + "type": "SQL", + "cluster_name": "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER", + "cluster_id": "RESERVED_CLUSTER_ID_FOR_SQL_SERVER", + "nodes": ['"${nodes}"'] + }}') + + unlock_cluster + + health_log "add cluster. output: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "add cluster failed, exit." + exit 1 + fi + + output=$(curl -s "${META_SERVICE_ENDPOINT}/MetaService/http/get_cluster?token=greedisgood9999" \ + -d '{"instance_id": "default_instance_id", + "cloud_unique_id": "'"${CLOUD_UNIQUE_ID}"'", + "cluster_name": "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER", + "cluster_id": "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"}') + + health_log "get cluster is: $output" + code=$(jq -r '.code' <<<$output) + + if [ "$code" != "OK" ]; then + health_log "get cluster failed, exit." + exit 1 + fi + + touch $REGISTER_FILE + if [ "$MY_ID" == "1" ]; then + echo $MY_IP >$MASTER_FE_IP_FILE + fi +} + stop_frontend() { if [ "$STOP_GRACE" = "1" ]; then bash $DORIS_HOME/bin/stop_fe.sh --grace @@ -96,7 +159,7 @@ wait_process() { for ((i = 0; i < 5; i++)); do sleep 1s pid=$(ps -elf | grep java | grep org.apache.doris.DorisFE | grep -v grep | awk '{print $4}') - if [ -n $pid ]; then + if [ -n "$pid" ]; then break fi done @@ -104,17 +167,34 @@ wait_process() { wait_pid $pid } -main() { - trap stop_frontend SIGTERM +start_local_fe() { + if [ "$MY_ID" = "1" -a ! -f $REGISTER_FILE ]; then + touch $REGISTER_FILE + fi - if [ "$MY_ID" = "1" -o -d "${DORIS_HOME}/doris-meta/image" ]; then + if [ -f $REGISTER_FILE ]; then fe_daemon & bash $DORIS_HOME/bin/start_fe.sh --daemon else - add_frontend + add_local_fe fe_daemon & bash $DORIS_HOME/bin/start_fe.sh --helper $MASTER_FE_IP:$FE_EDITLOG_PORT --daemon fi +} + +start_cloud_fe() { + add_cloud_fe + bash $DORIS_HOME/bin/start_fe.sh --daemon +} + +main() { + trap stop_frontend SIGTERM + + if [ "$IS_CLOUD" == "1" ]; then + start_cloud_fe + else + start_local_fe + fi wait_process } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 27632326a3b..a83722be2ea 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -216,9 +216,20 @@ class Suite implements GroovyInterceptable { return } + boolean pipelineIsCloud = isCloudCluster() + boolean dockerIsCloud = false + if (options.cloudMode == null) { + dockerIsCloud = pipelineIsCloud + } else { + dockerIsCloud = options.cloudMode + if (dockerIsCloud != pipelineIsCloud && options.skipRunWhenPipelineDiff) { + return + } + } + try { cluster.destroy(true) - cluster.init(options) + cluster.init(options, dockerIsCloud) def user = context.config.jdbcUser def password = context.config.jdbcPassword @@ -971,6 +982,10 @@ class Suite implements GroovyInterceptable { return result.last().get(0); } + boolean isCloudCluster() { + return !getFeConfig("cloud_unique_id").isEmpty() + } + String getFeConfig(String key) { return sql_return_maparray("SHOW FRONTEND CONFIG LIKE '${key}'")[0].Value } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 6d4ae4be0ad..e556c1896bb 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -38,6 +38,15 @@ class ClusterOptions { List<String> beConfigs = [] boolean connectToFollower = false + // 1. cloudMode = true, only create cloud cluster. + // 2. cloudMode = false, only create none-cloud cluster. + // 3. cloudMode = null, create both cloud and none-cloud cluster, depend on the running pipeline mode. + Boolean cloudMode = false + + // when cloudMode = true/false, but the running pipeline is diff with cloudMode, + // skip run this docker test or not. + boolean skipRunWhenPipelineDiff = true + // each be disks, a disks format is: disk_type=disk_num[,disk_capacity] // here disk_type=HDD or SSD, disk capacity is in gb unit. // for example: beDisks = ["HDD=1", "SSD=2,10", "SSD=10,3"] means: @@ -169,7 +178,7 @@ class SuiteCluster { this.running = false } - void init(ClusterOptions options) { + void init(ClusterOptions options, boolean isCloud) { assert name != null && name != '' assert options.feNum > 0 || options.beNum > 0 assert config.image != null && config.image != '' @@ -200,6 +209,9 @@ class SuiteCluster { if (config.dockerCoverageOutputDir != null && config.dockerCoverageOutputDir != '') { cmd += ['--coverage-dir', config.dockerCoverageOutputDir] } + if (isCloud) { + cmd += ['--cloud'] + } cmd += ['--wait-timeout', String.valueOf(180)] runCmd(cmd.join(' '), -1) @@ -296,6 +308,8 @@ class SuiteCluster { } else if (name.startsWith('fe-')) { int index = name.substring('fe-'.length()) as int frontends.add(Frontend.fromCompose(header, index, row)) + } else if (name.startsWith('ms-') || name.startsWith('recycle-') || name.startsWith('fdb-')) { + // TODO: handle these nodes } else { assert false : 'Unknown node type with name: ' + name } diff --git a/regression-test/suites/demo_p0/docker_action.groovy b/regression-test/suites/demo_p0/docker_action.groovy index 8c827aa40f4..6d62d6ea7be 100644 --- a/regression-test/suites/demo_p0/docker_action.groovy +++ b/regression-test/suites/demo_p0/docker_action.groovy @@ -50,4 +50,19 @@ suite('docker_action') { docker(options) { sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="5")''' } + + def options2 = new ClusterOptions() + options2.beNum = 1 + // create cloud cluster + options2.cloudMode = true + //// cloud docker only run in cloud pipeline, but enable it run in none-cloud pipeline + // options2.skipRunWhenPipelineDiff = false + // run another docker, create a cloud cluster + docker(options2) { + // cloud cluster will ignore replication_num, always set to 1. so create table succ even has 1 be. + sql '''create table tb1 (k int) DISTRIBUTED BY HASH(k) BUCKETS 10 properties ("replication_num"="1000")''' + + sql 'set global enable_memtable_on_sink_node = false' + sql 'insert into tb1 values (2)' + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
