This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3939c999004 [Fix](cloud) Cloud enable fe deploy mode from 
master-observers to multi-followers (#45255)
3939c999004 is described below

commit 3939c9990041eb9e83e45ce2883c18f0c93c96db
Author: deardeng <[email protected]>
AuthorDate: Fri Jan 3 17:06:56 2025 +0800

    [Fix](cloud) Cloud enable fe deploy mode from master-observers to 
multi-followers (#45255)
    
    IN CLOUD
    1. add some cloud metaservice api regression test
    2. prohibit one node add to multi cluster repeatedly
    3. supports switching from master observer mode to multi follower mode,
    and compatible with upgrading from existing master-observers deployment
    mode to multi-follwers mode
    4. when adding an FE node(add fe cluster, add fe node), there is a
    5-minute protection time and the FE node cannot be droped(drop fe
    cluster, drop fe node)
    5. drop the node. If the node cannot be found, an error will occur and
    the code will return 404 instead of the previous silent processing
    6. If the MS node starts with 127.0.0.1 IP and the
    `prohibit_use_loopback_addresses` is set to true(default), the MS node
    will not be able to start, ms log will find `enable check prohibit use
    loopback addr, but localhost=127.0.0.1, so exit(-1)`
---
 cloud/src/common/config.h                          |    3 +
 cloud/src/common/network_util.cpp                  |   11 +
 cloud/src/meta-service/injection_point_http.cpp    |  120 +-
 cloud/src/meta-service/meta_service_resource.cpp   |   27 +
 cloud/src/resource-manager/resource_manager.cpp    |  258 +++-
 cloud/test/fdb_injection_test.cpp                  |    3 +
 cloud/test/meta_service_http_test.cpp              |    4 +-
 cloud/test/resource_test.cpp                       |    4 +
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   13 +-
 gensrc/proto/cloud.proto                           |    2 +-
 .../doris/regression/suite/SuiteCluster.groovy     |   36 +
 .../suites/cloud_p0/node_mgr/test_ms_api.groovy    | 1553 ++++++++++++++++++++
 .../node_mgr/test_sql_mode_node_mgr.groovy         |   41 +
 13 files changed, 2015 insertions(+), 60 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 4f3c49ee98d..a3274479189 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -236,4 +236,7 @@ CONF_Int32(max_tablet_index_num_per_batch, "1000");
 CONF_mInt64(max_num_aborted_txn, "100");
 
 CONF_Bool(enable_check_instance_id, "true");
+
+// Check if ip eq 127.0.0.1, ms/recycler exit
+CONF_Bool(enable_loopback_address_for_ms, "false");
 } // namespace doris::cloud::config
diff --git a/cloud/src/common/network_util.cpp 
b/cloud/src/common/network_util.cpp
index 9f1b085163d..d31ca7ad937 100644
--- a/cloud/src/common/network_util.cpp
+++ b/cloud/src/common/network_util.cpp
@@ -29,6 +29,7 @@
 #include <sstream>
 #include <vector>
 
+#include "common/config.h"
 #include "common/logging.h"
 
 namespace doris::cloud {
@@ -160,6 +161,16 @@ static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
 
 std::string get_local_ip(const std::string& priority_networks) {
     std::string localhost_str = butil::my_ip_cstr();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&localhost_str](int*) {
+        // Check if ip eq 127.0.0.1, ms/recycler exit
+        LOG(INFO) << "get the IP for ms is " << localhost_str;
+        if (config::enable_loopback_address_for_ms || localhost_str != 
"127.0.0.1") return;
+        LOG(WARNING) << "localhost IP is loopback address (127.0.0.1), "
+                     << "there may be multiple NICs for use, "
+                     << "please set priority_network with a CIDR expression in 
doris_cloud.conf "
+                     << "to choose a non-loopback address accordingly";
+        exit(-1);
+    });
     if (priority_networks == "") {
         LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << 
localhost_str;
         return localhost_str;
diff --git a/cloud/src/meta-service/injection_point_http.cpp 
b/cloud/src/meta-service/injection_point_http.cpp
index 80d1bcfdf2e..18b2ddcebb0 100644
--- a/cloud/src/meta-service/injection_point_http.cpp
+++ b/cloud/src/meta-service/injection_point_http.cpp
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -18,6 +17,7 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/cloud.pb.h>
+#include <rapidjson/document.h>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -34,6 +34,11 @@ namespace doris::cloud {
 std::map<std::string, std::function<void()>> suite_map;
 std::once_flag register_suites_once;
 
+// define a struct to store value only
+struct TypedValue {
+    std::variant<int64_t, bool, std::string> value;
+};
+
 inline std::default_random_engine make_random_engine() {
     return std::default_random_engine(
             
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
@@ -88,6 +93,108 @@ static void register_suites() {
     });
 }
 
+bool url_decode(const std::string& in, std::string* out) {
+    out->clear();
+    out->reserve(in.size());
+
+    for (size_t i = 0; i < in.size(); ++i) {
+        if (in[i] == '%') {
+            if (i + 3 <= in.size()) {
+                int value = 0;
+                std::istringstream is(in.substr(i + 1, 2));
+
+                if (is >> std::hex >> value) {
+                    (*out) += static_cast<char>(value);
+                    i += 2;
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        } else if (in[i] == '+') {
+            (*out) += ' ';
+        } else {
+            (*out) += in[i];
+        }
+    }
+
+    return true;
+}
+
+HttpResponse set_value(const std::string& point, const brpc::URI& uri) {
+    std::string value_str(http_query(uri, "value"));
+    std::string decoded_value;
+    if (!url_decode(value_str, &decoded_value)) {
+        auto msg = fmt::format("failed to decode value: {}", value_str);
+        LOG(WARNING) << msg;
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+    }
+    rapidjson::Document doc;
+    if (doc.Parse(decoded_value.c_str()).HasParseError()) {
+        auto msg = fmt::format("invalid json value: {}", decoded_value);
+        LOG(WARNING) << msg;
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+    }
+
+    if (!doc.IsArray()) {
+        auto msg = "value must be a json array";
+        LOG(WARNING) << msg;
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+    }
+
+    // use vector to keep order
+    std::vector<TypedValue> parsed_values;
+
+    for (const auto& value : doc.GetArray()) {
+        TypedValue typed_value;
+        try {
+            if (value.IsBool()) {
+                typed_value.value = value.GetBool();
+            } else if (value.IsInt64()) {
+                typed_value.value = value.GetInt64();
+            } else if (value.IsString()) {
+                typed_value.value = value.GetString();
+            } else {
+                auto msg = "value must be boolean, integer or string";
+                LOG(WARNING) << msg;
+                return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+            }
+            parsed_values.push_back(std::move(typed_value));
+        } catch (const std::exception& e) {
+            auto msg = fmt::format("failed to parse value");
+            LOG(WARNING) << msg;
+            return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+        }
+    }
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back(point, [point, parsed_values = 
std::move(parsed_values)](auto&& args) {
+        LOG(INFO) << "injection point hit, point=" << point;
+        for (size_t i = 0; i < parsed_values.size(); i++) {
+            const auto& typed_value = parsed_values[i];
+            std::visit(
+                    [&](const auto& v) {
+                        LOG(INFO) << "index=" << i << " value=" << v
+                                  << " type=" << typeid(v).name();
+                        if constexpr 
(std::is_same_v<std::decay_t<decltype(v)>, int64_t>) {
+                            // process int64_t
+                            *try_any_cast<int64_t*>(args[i]) = v;
+                        } else if constexpr 
(std::is_same_v<std::decay_t<decltype(v)>, bool>) {
+                            // process bool
+                            *try_any_cast<bool*>(args[i]) = v;
+                        } else if constexpr 
(std::is_same_v<std::decay_t<decltype(v)>,
+                                                            std::string>) {
+                            // process string
+                            *try_any_cast<std::string*>(args[i]) = v;
+                        }
+                    },
+                    typed_value.value);
+        }
+    });
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
 HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
     std::string duration_str(http_query(uri, "duration"));
     int64_t duration = 0;
@@ -136,6 +243,8 @@ HttpResponse handle_set(const brpc::URI& uri) {
         return set_sleep(point, uri);
     } else if (behavior == "return") {
         return set_return(point, uri);
+    } else if (behavior == "change_args") {
+        return set_value(point, uri);
     }
 
     return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown 
behavior: " + behavior);
@@ -202,6 +311,15 @@ HttpResponse handle_disable(const brpc::URI& uri) {
 
 // curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
 //     &name=${injection_point_name}&behavior=return" # return void
+
+// ATTN: change_args use uri encode, see example test_ms_api.groovy
+// use inject point in cpp file
+// bool testBool = false;
+// std::string testString = "world";
+// TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", 
&exceed_time, &testBool, &testString);
+
+// curl 
http://175.40.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&o
+// 
p=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=%5B-1%2Ctrue%2C%22hello%22%5D
 // ```
 
 HttpResponse process_injection_point(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index d873dec7b21..23dc9d0b40c 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -2305,6 +2305,14 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
         code = MetaServiceCode::UNDEFINED_ERR;
     }
 
+    // ugly but easy to repair
+    // not change cloud.proto add err_code
+    if (request->op() == AlterClusterRequest::DROP_NODE &&
+        msg.find("not found") != std::string::npos) {
+        // see convert_ms_code_to_http_code, reuse CLUSTER_NOT_FOUND, return 
http status code 404
+        code = MetaServiceCode::CLUSTER_NOT_FOUND;
+    }
+
     if (code != MetaServiceCode::OK) return;
 
     auto f = new std::function<void()>([instance_id = request->instance_id(), 
txn_kv = txn_kv_] {
@@ -2418,6 +2426,7 @@ void 
MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
         response->mutable_cluster()->CopyFrom(instance.clusters());
         LOG_EVERY_N(INFO, 100) << "get all cluster info, " << msg;
     } else {
+        bool is_instance_changed = false;
         for (int i = 0; i < instance.clusters_size(); ++i) {
             auto& c = instance.clusters(i);
             std::set<std::string> mysql_users;
@@ -2433,6 +2442,24 @@ void 
MetaServiceImpl::get_cluster(google::protobuf::RpcController* controller,
                                        << " cluster=" << msg;
             }
         }
+        if (is_instance_changed) {
+            val = instance.SerializeAsString();
+            if (val.empty()) {
+                msg = "failed to serialize";
+                code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+                return;
+            }
+
+            txn->put(key, val);
+            LOG(INFO) << "put instance_id=" << instance_id << " instance_key=" 
<< hex(key)
+                      << " json=" << proto_to_json(instance);
+            err = txn->commit();
+            if (err != TxnErrorCode::TXN_OK) {
+                code = cast_as<ErrCategory::COMMIT>(err);
+                msg = fmt::format("failed to commit kv txn, err={}", err);
+                LOG(WARNING) << msg;
+            }
+        }
     }
 
     if (response->cluster().empty()) {
diff --git a/cloud/src/resource-manager/resource_manager.cpp 
b/cloud/src/resource-manager/resource_manager.cpp
index 9c37d781765..3addfecdb85 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -183,7 +183,7 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
             continue;
         }
         ss << "check cluster params failed, edit_log_port is required for 
frontends while "
-              "heatbeat_port is required for banckens, node : "
+              "heatbeat_port is required for banckends, node : "
            << proto_to_json(n);
         *err = ss.str();
         no_err = false;
@@ -191,19 +191,11 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
     }
 
     if (check_master_num && ClusterPB::SQL == cluster.type()) {
-        no_err = false;
-        if (master_num > 0 && follower_num > 0) {
-            ss << "cluster is SQL type, and use multi follower mode, cant set 
master node, master "
-                  "count: "
-               << master_num << " follower count: " << follower_num;
-        } else if (!follower_num && master_num != 1) {
-            ss << "cluster is SQL type, must have only one master node, now 
master count: "
-               << master_num;
-        } else {
-            // followers mode
-            // 1. followers 2. observers + followers
-            no_err = true;
-            ss << "";
+        if (master_num == 0 && follower_num == 0) {
+            ss << "cluster is SQL type, but not set master and follower node, 
master count="
+               << master_num << " follower count=" << follower_num
+               << " so sql cluster can't get a Master node";
+            no_err = false;
         }
         *err = ss.str();
     }
@@ -231,6 +223,85 @@ bool ResourceManager::is_instance_id_registered(const 
std::string& instance_id)
     return c0 == TxnErrorCode::TXN_OK;
 }
 
+/**
+ * Gets addr and port from NodeInfoPB
+ * @param node
+ * @return <addr, port>
+ */
+static std::pair<std::string, int32_t> get_node_endpoint_from_cluster(const 
ClusterPB::Type& type,
+                                                                      const 
NodeInfoPB& node) {
+    std::string addr = node.has_host() ? node.host() : (node.has_ip() ? 
node.ip() : "");
+    int32_t port = (ClusterPB::SQL == type)
+                           ? node.edit_log_port()
+                           : (ClusterPB::COMPUTE == type ? 
node.heartbeat_port() : -1);
+    return std::make_pair(addr, port);
+}
+
+/**
+ * Gets nodes endpoint from InstanceInfoPB which are registered
+ * @param instance
+ * @return <fe_nodes, be_nodes>
+ */
+static std::pair<std::set<std::string>, std::set<std::string>> 
get_nodes_endpoint_registered(
+        const InstanceInfoPB& instance) {
+    std::set<std::string> instance_sql_node_endpoints;
+    std::set<std::string> instance_compute_node_endpoints;
+    for (auto& instance_cluster : instance.clusters()) {
+        for (auto& node : instance_cluster.nodes()) {
+            const auto& [addr, port] =
+                    get_node_endpoint_from_cluster(instance_cluster.type(), 
node);
+            if (ClusterPB::SQL == instance_cluster.type()) {
+                instance_sql_node_endpoints.insert(addr + ":" + 
std::to_string(port));
+            } else if (ClusterPB::COMPUTE == instance_cluster.type()) {
+                instance_compute_node_endpoints.insert(addr + ":" + 
std::to_string(port));
+            }
+        }
+    }
+    return std::make_pair(instance_sql_node_endpoints, 
instance_compute_node_endpoints);
+}
+
+/**
+ * When add_cluster or add_node, check its node has been registered
+ * @param cluster type, for check port
+ * @param node, which node to add
+ * @param registered_fes, that fes has been registered in kv
+ * @param registered_bes, that bes has been registered in kv
+ * @return <error_code, err_msg>, if error_code == OK, check pass
+ */
+static std::pair<MetaServiceCode, std::string> check_node_has_been_registered(
+        const ClusterPB::Type& type, const NodeInfoPB& node,
+        std::set<std::string> fe_endpoints_registered,
+        std::set<std::string> be_endpoints_registered) {
+    const auto& [addr, port] = get_node_endpoint_from_cluster(type, node);
+    std::stringstream ss;
+    std::string msg;
+    if (addr == "" || port == -1) {
+        ss << "add node input args node invalid, cluster=" << 
proto_to_json(node);
+        LOG(WARNING) << ss.str();
+        msg = ss.str();
+        return std::make_pair(MetaServiceCode::INVALID_ARGUMENT, msg);
+    }
+
+    std::string node_endpoint = addr + ":" + std::to_string(port);
+
+    if (type == ClusterPB::SQL) {
+        if (fe_endpoints_registered.count(node_endpoint)) {
+            ss << "sql node endpoint has been added, registered fe node=" << 
node_endpoint;
+            LOG(WARNING) << ss.str();
+            msg = ss.str();
+            return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
+        }
+    } else if (type == ClusterPB::COMPUTE) {
+        if (be_endpoints_registered.count(node_endpoint)) {
+            ss << "compute node endpoint has been added, registered be node=" 
<< node_endpoint;
+            LOG(WARNING) << ss.str();
+            msg = ss.str();
+            return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
+        }
+    }
+    return std::make_pair(MetaServiceCode::OK, "");
+}
+
 std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const 
std::string& instance_id,
                                                                      const 
ClusterInfo& cluster) {
     std::string msg;
@@ -294,35 +365,49 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::add_cluster(const std::
         return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
     }
 
-    LOG(INFO) << "cluster to add json=" << proto_to_json(cluster.cluster);
+    auto& req_cluster = cluster.cluster;
+    LOG(INFO) << "cluster to add json=" << proto_to_json(req_cluster);
     LOG(INFO) << "json=" << proto_to_json(instance);
 
     // Check id and name, they need to be unique
     // One cluster id per name, name is alias of cluster id
-    for (auto& i : instance.clusters()) {
-        if (i.cluster_id() == cluster.cluster.cluster_id()) {
+    for (auto& instance_cluster : instance.clusters()) {
+        if (instance_cluster.cluster_id() == req_cluster.cluster_id()) {
             ss << "try to add a existing cluster id,"
-               << " existing_cluster_id=" << i.cluster_id();
+               << " existing_cluster_id=" << instance_cluster.cluster_id();
             msg = ss.str();
             return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
         }
 
-        if (i.cluster_name() == cluster.cluster.cluster_name()) {
+        if (instance_cluster.cluster_name() == req_cluster.cluster_name()) {
             ss << "try to add a existing cluster name,"
-               << " existing_cluster_name=" << i.cluster_name();
+               << " existing_cluster_name=" << instance_cluster.cluster_name();
             msg = ss.str();
             return std::make_pair(MetaServiceCode::ALREADY_EXISTED, msg);
         }
     }
 
-    // TODO(gavin): Check duplicated nodes, one node cannot deploy on multiple 
clusters
+    // modify cluster's node info
     auto now_time = std::chrono::system_clock::now();
     uint64_t time =
             
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
-    for (auto& n : cluster.cluster.nodes()) {
+
+    const auto& [fe_endpoints_registered, be_endpoints_registered] =
+            get_nodes_endpoint_registered(instance);
+
+    for (auto& n : req_cluster.nodes()) {
         auto& node = const_cast<std::decay_t<decltype(n)>&>(n);
         node.set_ctime(time);
         node.set_mtime(time);
+        // Check duplicated nodes, one node cannot deploy on multiple clusters
+        // diff instance_cluster's nodes and req_cluster's nodes
+        for (auto& n : req_cluster.nodes()) {
+            const auto& [c1, m1] = check_node_has_been_registered(
+                    req_cluster.type(), n, fe_endpoints_registered, 
be_endpoints_registered);
+            if (c1 != MetaServiceCode::OK) {
+                return std::make_pair(c1, m1);
+            }
+        }
     }
 
     auto to_add_cluster = instance.add_clusters();
@@ -358,6 +443,27 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::add_cluster(const std::
     return std::make_pair(MetaServiceCode::OK, "");
 }
 
+/**
+ * The current implementation is to add fe clusters through HTTP API, 
+ * such as follower nodes `ABC` in the cluster, and then immediately drop 
follower node `A`, while fe is not yet pulled up, 
+ * which may result in the formation of a multi master fe cluster
+ * This function provides a simple protection mechanism that does not allow 
dropping the fe node within 5 minutes after adding it through the 
API(add_cluster/add_node).
+ * If you bypass this protection and do the behavior described above, god 
bless you.
+ * @param node, which fe node to drop
+ * @return true, can drop. false , within ctime 5 mins, can't drop
+ */
+static bool is_sql_node_exceeded_safe_drop_time(const NodeInfoPB& node) {
+    int64_t ctime = node.ctime();
+    // protect time 5mins
+    int64_t exceed_time = 5 * 60;
+    TEST_SYNC_POINT_CALLBACK("resource_manager::set_safe_drop_time", 
&exceed_time);
+    exceed_time = ctime + exceed_time;
+    auto now_time = std::chrono::system_clock::now();
+    int64_t current_time =
+            
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch()).count();
+    return current_time > exceed_time;
+}
+
 std::pair<MetaServiceCode, std::string> ResourceManager::drop_cluster(
         const std::string& instance_id, const ClusterInfo& cluster) {
     std::stringstream ss;
@@ -400,12 +506,27 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::drop_cluster(
 
     bool found = false;
     int idx = -1;
+    std::string cache_err_help_msg =
+            "Ms nodes memory cache may be inconsistent, pls check registry key 
may be contain "
+            "127.0.0.1, and call get_instance api get instance info from fdb";
     ClusterPB to_del;
-    // Check id and name, they need to be unique
+    // Check id, they need to be unique
     // One cluster id per name, name is alias of cluster id
     for (auto& i : instance.clusters()) {
         ++idx;
         if (i.cluster_id() == cluster.cluster.cluster_id()) {
+            if (i.type() == ClusterPB::SQL) {
+                for (auto& fe_node : i.nodes()) {
+                    // check drop fe cluster
+                    if (!is_sql_node_exceeded_safe_drop_time(fe_node)) {
+                        ss << "drop fe cluster not in safe time, try later, 
cluster="
+                           << i.DebugString();
+                        msg = ss.str();
+                        LOG(WARNING) << msg;
+                        return 
std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
+                    }
+                }
+            }
             to_del.CopyFrom(i);
             LOG(INFO) << "found a cluster to drop,"
                       << " instance_id=" << instance_id << " cluster_id=" << 
i.cluster_id()
@@ -418,7 +539,8 @@ std::pair<MetaServiceCode, std::string> 
ResourceManager::drop_cluster(
     if (!found) {
         ss << "failed to find cluster to drop,"
            << " instance_id=" << instance_id << " cluster_id=" << 
cluster.cluster.cluster_id()
-           << " cluster_name=" << cluster.cluster.cluster_name();
+           << " cluster_name=" << cluster.cluster.cluster_name()
+           << " help Msg=" << cache_err_help_msg;
         msg = ss.str();
         return std::make_pair(MetaServiceCode::CLUSTER_NOT_FOUND, msg);
     }
@@ -666,27 +788,26 @@ std::pair<TxnErrorCode, std::string> 
ResourceManager::get_instance(std::shared_p
 }
 
 // check instance pb is valid
+// check in drop nodes
+// In the mode of managing cluster nodes through SQL, FE knows the master node 
of the current system, so it cannot delete the current master node (SQL error)
+// However, in the mode of managing cluster nodes through HTTP, MS cannot know 
which node in the current FE cluster is the master node, so some SOP operations 
are required for cloud control
+// 1. First, check the status of the nodes in the current FE cluster
+// 2. Drop non master nodes
+// 3. If you want to drop the master node, you need to find a way to switch 
the master node to another follower node and then drop it again
 bool is_instance_valid(const InstanceInfoPB& instance) {
     // check has fe node
     for (auto& c : instance.clusters()) {
         if (c.has_type() && c.type() == ClusterPB::SQL) {
             int master = 0;
             int follower = 0;
-            std::string mode = "multi-followers";
             for (auto& n : c.nodes()) {
                 if (n.node_type() == NodeInfoPB::FE_MASTER) {
-                    mode = "master-observers";
                     master++;
                 } else if (n.node_type() == NodeInfoPB::FE_FOLLOWER) {
                     follower++;
                 }
             }
-            // if master/observers mode , not have master or have multi 
master, return false
-            if (mode == "master-observers" && master != 1) {
-                return false;
-            }
-            // if multi followers mode, not have follower, return false
-            if (mode == "multi-followers" && !follower) {
+            if (master > 1 || (master == 0 && follower == 0)) {
                 return false;
             }
             return true;
@@ -734,7 +855,24 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
     }
 
     LOG(INFO) << "instance json=" << proto_to_json(instance);
-    std::vector<std::pair<ClusterPB, ClusterPB>> vec;
+    if (!to_add.empty()) {
+        // add nodes
+        // Check duplicated nodes, one node cannot deploy on multiple clusters
+        // diff instance's nodes and to_add nodes
+        const auto& [fe_endpoints_registered, be_endpoints_registered] =
+                get_nodes_endpoint_registered(instance);
+        for (auto& add_node : to_add) {
+            const ClusterPB::Type type =
+                    add_node.role == Role::SQL_SERVER ? ClusterPB::SQL : 
ClusterPB::COMPUTE;
+            const auto& [c1, m1] = check_node_has_been_registered(
+                    type, add_node.node_info, fe_endpoints_registered, 
be_endpoints_registered);
+            if (c1 != MetaServiceCode::OK) {
+                return m1;
+            }
+        }
+    }
+    // a vector save (origin_cluster , changed_cluster), to update ms mem
+    std::vector<std::pair<ClusterPB, ClusterPB>> change_from_to_clusters;
     using modify_impl_func = std::function<std::string(const ClusterPB& c, 
const NodeInfo& n)>;
     using check_func = std::function<std::string(const NodeInfo& n)>;
     auto modify_func = [&](const NodeInfo& node, check_func check,
@@ -764,6 +902,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         return "";
     };
 
+    // check in ms mem cache
     check_func check_to_add = [&](const NodeInfo& n) -> std::string {
         std::string err;
         std::stringstream s;
@@ -787,6 +926,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         return "";
     };
 
+    // modify kv
     modify_impl_func modify_to_add = [&](const ClusterPB& c, const NodeInfo& 
n) -> std::string {
         std::string err;
         std::stringstream s;
@@ -838,7 +978,8 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         auto& change_cluster = const_cast<std::decay_t<decltype(c)>&>(c);
         change_cluster.add_nodes()->CopyFrom(node);
         copied_cluster.CopyFrom(change_cluster);
-        vec.emplace_back(std::move(copied_original_cluster), 
std::move(copied_cluster));
+        
change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
+                                             std::move(copied_cluster));
         return "";
     };
 
@@ -850,18 +991,22 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         }
     }
 
+    std::string cache_err_help_msg =
+            "Ms nodes memory cache may be inconsistent, pls check registry key 
may be contain "
+            "127.0.0.1, and call get_instance api get instance info from fdb";
+
+    // check in ms mem cache
     check_func check_to_del = [&](const NodeInfo& n) -> std::string {
         std::string err;
         std::stringstream s;
         auto [start, end] = 
node_info_.equal_range(n.node_info.cloud_unique_id());
         if (start == node_info_.end() || start->first != 
n.node_info.cloud_unique_id()) {
-            s << "cloud_unique_id can not find to drop node,"
+            s << "can not find to drop nodes by cloud_unique_id=" << 
n.node_info.cloud_unique_id()
               << " instance_id=" << n.instance_id << " cluster_name=" << 
n.cluster_name
-              << " cluster_id=" << n.cluster_id
-              << " cloud_unique_id=" << n.node_info.cloud_unique_id();
+              << " cluster_id=" << n.cluster_id << " help Msg=" << 
cache_err_help_msg;
             err = s.str();
             LOG(WARNING) << err;
-            return err;
+            return std::string("not found ,") + err;
         }
 
         bool found = false;
@@ -905,15 +1050,16 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         if (!found) {
             s << "cloud_unique_id can not find to drop node,"
               << " instance_id=" << n.instance_id << " cluster_name=" << 
n.cluster_name
-              << " cluster_id=" << n.cluster_id
-              << " cloud_unique_id=" << n.node_info.cloud_unique_id();
+              << " cluster_id=" << n.cluster_id << " node_info=" << 
n.node_info.DebugString()
+              << " help Msg=" << cache_err_help_msg;
             err = s.str();
             LOG(WARNING) << err;
-            return err;
+            return std::string("not found ,") + err;
         }
         return "";
     };
 
+    // modify kv
     modify_impl_func modify_to_del = [&](const ClusterPB& c, const NodeInfo& 
n) -> std::string {
         std::string err;
         std::stringstream s;
@@ -922,7 +1068,10 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
 
         bool found = false;
         int idx = -1;
+        // ni: to drop node
         const auto& ni = n.node_info;
+        // c.nodes: cluster registered nodes
+        NodeInfoPB copy_node;
         for (auto& cn : c.nodes()) {
             idx++;
             if (cn.has_ip() && ni.has_ip()) {
@@ -937,6 +1086,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
                                                  : 
std::to_string(ni.edit_log_port()));
 
                 if (ni.cloud_unique_id() == cn.cloud_unique_id() && 
cn_endpoint == ni_endpoint) {
+                    copy_node.CopyFrom(cn);
                     found = true;
                     break;
                 }
@@ -955,6 +1105,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
 
                 if (ni.cloud_unique_id() == cn.cloud_unique_id() &&
                     cn_endpoint_host == ni_endpoint_host) {
+                    copy_node.CopyFrom(cn);
                     found = true;
                     break;
                 }
@@ -964,17 +1115,26 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         if (!found) {
             s << "failed to find node to drop,"
               << " instance_id=" << instance.instance_id() << " cluster_id=" 
<< c.cluster_id()
-              << " cluster_name=" << c.cluster_name() << " cluster=" << 
proto_to_json(c);
+              << " cluster_name=" << c.cluster_name() << " cluster=" << 
proto_to_json(c)
+              << " help Msg =" << cache_err_help_msg;
             err = s.str();
             LOG(WARNING) << err;
-            // not found return ok.
-            return "";
+            return std::string("not found ,") + err;
+        }
+
+        // check drop fe node
+        if (ClusterPB::SQL == c.type() && 
!is_sql_node_exceeded_safe_drop_time(copy_node)) {
+            s << "drop fe node not in safe time, try later, node=" << 
copy_node.DebugString();
+            err = s.str();
+            LOG(WARNING) << err;
+            return err;
         }
         copied_original_cluster.CopyFrom(c);
         auto& change_nodes = 
const_cast<std::decay_t<decltype(c.nodes())>&>(c.nodes());
         change_nodes.DeleteSubrange(idx, 1); // Remove it
         copied_cluster.CopyFrom(c);
-        vec.emplace_back(std::move(copied_original_cluster), 
std::move(copied_cluster));
+        
change_from_to_clusters.emplace_back(std::move(copied_original_cluster),
+                                             std::move(copied_cluster));
         return "";
     };
 
@@ -982,13 +1142,13 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         msg = modify_func(it, check_to_del, modify_to_del);
         if (msg != "") {
             LOG(WARNING) << msg;
-            // not found, just return OK to cloud control
-            return "";
+            return msg;
         }
     }
 
     LOG(INFO) << "instance " << instance_id << " info: " << 
instance.DebugString();
-    if (!to_del.empty() && !is_instance_valid(instance)) {
+    // here, instance has been changed, not save in fdb
+    if ((!to_add.empty() || !to_del.empty()) && !is_instance_valid(instance)) {
         msg = "instance invalid, cant modify, plz check";
         LOG(WARNING) << msg;
         return msg;
@@ -1014,7 +1174,7 @@ std::string ResourceManager::modify_nodes(const 
std::string& instance_id,
         return msg;
     }
 
-    for (auto& it : vec) {
+    for (auto& it : change_from_to_clusters) {
         update_cluster_to_index(instance_id, it.first, it.second);
     }
 
diff --git a/cloud/test/fdb_injection_test.cpp 
b/cloud/test/fdb_injection_test.cpp
index 08ba3e50e52..60226e7f952 100644
--- a/cloud/test/fdb_injection_test.cpp
+++ b/cloud/test/fdb_injection_test.cpp
@@ -71,6 +71,7 @@ int main(int argc, char** argv) {
     cloud::config::fdb_cluster_file_path = "fdb.cluster";
     cloud::config::write_schema_kv = true;
     cloud::config::enable_check_instance_id = false;
+    cloud::config::enable_loopback_address_for_ms = true;
 
     auto sp = SyncPoint::get_instance();
     sp->enable_processing();
@@ -92,6 +93,8 @@ int main(int argc, char** argv) {
                       [](auto&& args) { *try_any_cast<uint64_t*>(args[0]) = 0; 
});
     sp->set_call_back("put_schema_kv:schema_key_exists_return",
                       [](auto&& args) { *try_any_cast<bool*>(args.back()) = 
true; });
+    sp->set_call_back("resource_manager::set_safe_drop_time",
+                      [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; 
});
 
     meta_service = create_meta_service();
 
diff --git a/cloud/test/meta_service_http_test.cpp 
b/cloud/test/meta_service_http_test.cpp
index 81c322303a5..9061a5c2c5d 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -810,11 +810,11 @@ TEST(MetaServiceHttpTest, AlterClusterTest) {
         req.mutable_cluster()->set_type(ClusterPB::COMPUTE);
         auto node = req.mutable_cluster()->add_nodes();
         node->set_ip("127.0.0.1");
-        node->set_heartbeat_port(9999);
+        node->set_heartbeat_port(9996);
         node->set_cloud_unique_id("cloud_unique_id");
         auto& meta_service = ctx.meta_service_;
         NodeInfoPB npb;
-        npb.set_heartbeat_port(9999);
+        npb.set_heartbeat_port(9996);
         npb.set_ip("127.0.0.1");
         npb.set_cloud_unique_id("cloud_unique_id");
         meta_service->resource_mgr()->node_info_.insert(
diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp
index 516a0b35c0f..c8d53408a04 100644
--- a/cloud/test/resource_test.cpp
+++ b/cloud/test/resource_test.cpp
@@ -227,9 +227,11 @@ TEST(ResourceTest, ModifyNodesIpTest) {
         auto* c = ins.mutable_clusters()->Add();
         c->set_cluster_name("cluster_name_1");
         c->set_cluster_id("cluster_id_1");
+        c->set_type(ClusterPB::COMPUTE);
         auto* c1 = ins.mutable_clusters()->Add();
         c1->set_cluster_name("cluster_name_2");
         c1->set_cluster_id("cluster_id_2");
+        c1->set_type(ClusterPB::COMPUTE);
         *try_any_cast<InstanceInfoPB*>(args[1]) = ins;
     });
     sp->enable_processing();
@@ -286,9 +288,11 @@ TEST(ResourceTest, ModifyNodesHostTest) {
         auto* c = ins.mutable_clusters()->Add();
         c->set_cluster_name("cluster_name_1");
         c->set_cluster_id("cluster_id_1");
+        c->set_type(ClusterPB::COMPUTE);
         auto* c1 = ins.mutable_clusters()->Add();
         c1->set_cluster_name("cluster_name_2");
         c1->set_cluster_id("cluster_id_2");
+        c1->set_type(ClusterPB::COMPUTE);
         *try_any_cast<InstanceInfoPB*>(args[1]) = ins;
     });
     sp->enable_processing();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 89338c228fc..7aeb35ede68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -180,13 +180,12 @@ public class CloudEnv extends Env {
                 
.stream().filter(NodeInfoPB::hasNodeType).collect(Collectors.toList());
 
         helperNodes.clear();
-        Optional<Cloud.NodeInfoPB> firstNonObserverNode = 
allNodes.stream().findFirst();
-        if (firstNonObserverNode.isPresent()) {
-            helperNodes.add(new HostInfo(
-                    Config.enable_fqdn_mode ? 
firstNonObserverNode.get().getHost()
-                            : firstNonObserverNode.get().getIp(),
-                    firstNonObserverNode.get().getEditLogPort()));
-        }
+        Optional<Cloud.NodeInfoPB> firstNonObserverNode = allNodes.stream()
+                .filter(nodeInfoPB -> nodeInfoPB.getNodeType() != 
NodeInfoPB.NodeType.FE_OBSERVER).findFirst();
+        firstNonObserverNode.ifPresent(nodeInfoPB -> helperNodes.add(new 
HostInfo(
+                Config.enable_fqdn_mode ? nodeInfoPB.getHost()
+                : nodeInfoPB.getIp(),
+                nodeInfoPB.getEditLogPort())));
         Preconditions.checkState(helperNodes.size() == 1);
 
         Optional<NodeInfoPB> local = allNodes.stream().filter(n -> 
((Config.enable_fqdn_mode ? n.getHost() : n.getIp())
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 4e00faa0c6f..58510c2f138 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -140,7 +140,7 @@ message ClusterPB {
 message NodeInfoPB {
     enum NodeType {
         UNKNOWN = 0;
-         // lagacy logic for one-master-multi-observer mode
+        // lagacy logic for one-master-multi-observer mode
         FE_MASTER = 1;
         FE_OBSERVER = 2;
         FE_FOLLOWER = 3;
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index e77658793fe..15802a02888 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -38,6 +38,7 @@ class ClusterOptions {
 
     int feNum = 1
     int beNum = 3
+    int msNum = 1
 
     Boolean sqlModeNodeMgr = false
     Boolean beMetaServiceEndpoint = true
@@ -59,6 +60,10 @@ class ClusterOptions {
         'report_random_wait=false',
     ]
 
+    List<String> msConfigs = []
+
+    List<String> recycleConfigs = []
+
     boolean connectToFollower = false
 
     // 1. cloudMode = true, only create cloud cluster.
@@ -297,6 +302,9 @@ class SuiteCluster {
         if (options.beNum > 0) {
             cmd += ['--add-be-num', String.valueOf(options.beNum)]
         }
+        if (options.msNum > 0) {
+            cmd += ['--add-ms-num', String.valueOf(options.msNum)]
+        }
         // TODO: need escape white space in config
         if (options.feConfigs != null && options.feConfigs.size() > 0) {
             cmd += ['--fe-config']
@@ -306,6 +314,14 @@ class SuiteCluster {
             cmd += ['--be-config']
             cmd += options.beConfigs
         }
+        if (options.msConfigs != null && options.msConfigs.size() > 0) {
+            cmd += ['--ms-config']
+            cmd += options.msConfigs
+        }
+        if (options.recycleConfigs != null && options.recycleConfigs.size() > 
0) {
+            cmd += ['--recycle-config']
+            cmd += options.recycleConfigs
+        }
         if (options.beDisks != null) {
             cmd += ['--be-disks']
             cmd += options.beDisks
@@ -561,6 +577,16 @@ class SuiteCluster {
         runBackendsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
     }
 
+    // if not specific ms indices, then restart all ms
+    void restartMs(int... indices) {
+        runMsCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
+    } 
+
+    // if not specific recycler indices, then restart all recyclers
+    void restartRecyclers(int... indices) {
+        runRecyclerCmd(START_WAIT_TIMEOUT + 5, "restart --wait-timeout 
${START_WAIT_TIMEOUT}".toString(), indices)
+    }
+
     // if not specific fe indices, then drop all frontends
     void dropFrontends(boolean clean=false, int... indices) {
         def cmd = 'down'
@@ -640,6 +666,16 @@ class SuiteCluster {
         runCmd(cmd, timeoutSecond)
     }
 
+    private void runMsCmd(int timeoutSecond, String op, int... indices) {
+        def cmd = op + ' ' + name + ' --ms-id ' + indices.join(' ')
+        runCmd(cmd, timeoutSecond)
+    }
+
+    private void runRecyclerCmd(int timeoutSecond, String op, int... indices) {
+        def cmd = op + ' ' + name + ' --recycle-id ' + indices.join(' ')
+        runCmd(cmd, timeoutSecond)
+    }
+
     private Object runCmd(String cmd, int timeoutSecond = 60) throws Exception 
{
         def fullCmd = String.format('python -W ignore %s %s -v --output-json', 
config.dorisComposePath, cmd)
         logger.info('Run doris compose cmd: {}', fullCmd)
diff --git a/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy 
b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy
new file mode 100644
index 00000000000..62dfe5d39e2
--- /dev/null
+++ b/regression-test/suites/cloud_p0/node_mgr/test_ms_api.groovy
@@ -0,0 +1,1553 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_ms_api', 'p0, docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+
+    def create_instance_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/create_instance?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def get_instance_api = { msHttpPort, instance_id, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri 
"/MetaService/http/get_instance?token=${token}&instance_id=${instance_id}"
+            check check_func
+        }
+    }
+
+
+    def enable_ms_inject_api = { msHttpPort, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri "/MetaService/http/v1/injection_point?token=${token}&op=enable"
+            check check_func
+        }
+    }
+    
+    // curl 
"175.43.101.1:5000/MetaService/http/v1/injection_point?token=greedisgood9999&op=set&name=resource_manager::set_safe_drop_time&behavior=change_args&value=[-1]"
+    def inject_to_ms_api = { msHttpPort, key, value, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri 
"/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}"
+            check check_func
+        }
+    }
+
+    // curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear"
+    def clear_ms_inject_api = { msHttpPort, key, value, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri "/MetaService/http/v1/injection_point?token=${token}&op=clear"
+            check check_func
+        }
+    }
+
+    def drop_cluster_api = { msHttpPort, request_body, check_func ->
+            httpTest {
+                endpoint msHttpPort
+                uri "/MetaService/http/drop_cluster?token=$token"
+                body request_body
+                check check_func
+            }
+    }
+
+
+    // drop instance
+    def drop_instance_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/drop_instance?token=greedisgood9999"
+            body request_body
+            check check_func
+        }
+    }
+
+    def add_cluster_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/add_cluster?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def get_obj_store_info_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/get_obj_store_info?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def update_ak_sk_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/update_ak_sk?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def add_obj_info_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/add_obj_info?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def get_cluster_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/get_cluster?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def rename_node_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/rename_cluster?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def add_node_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/add_node?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def drop_node_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/drop_node?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    // old case
+    docker(options) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, 
ms.httpPort, msHttpPort)
+
+        // Inventory function test
+        def token = "greedisgood9999"
+        def instance_id = "instance_id_test_in_docker"
+        def name = "user_1"
+        def user_id = "10000"
+
+        // create instance
+        /*
+        curl -X GET 
'127.0.0.1:5000/MetaService/http/create_instance?token=greedisgood9999' -d '{
+            "instance_id": "instance_id_deadbeef",
+            "name": "user_1",
+            "user_id": "10000",
+            "obj_info": {
+                "ak": "test-ak1",
+                "sk": "test-sk1",
+                "bucket": "test-bucket",
+                "prefix": "test-prefix",
+                "endpoint": "test-endpoint",
+                "region": "test-region",
+                "provider" : "BOS",
+                "external_endpoint" : "endpoint"
+            }
+        }'
+        */
+        def jsonOutput = new JsonOutput()
+        def s3 = [
+                    ak: "test-ak1",
+                    sk : "test-sk1",
+                    bucket : "test-bucket",
+                    prefix: "test-prefix",
+                    endpoint: "test-endpoint",
+                    region: "test-region",
+                    provider : "BOS",
+                    'external_endpoint': "test-external-endpoint"
+                ]
+        def map = [instance_id: "${instance_id}", name: "${name}", user_id: 
"${user_id}", obj_info: s3]
+        def instance_body = jsonOutput.toJson(map)
+
+        create_instance_api.call(msHttpPort, instance_body) {
+                respCode, body ->
+                    log.info("http cli result: ${body} ${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        
+        def otherInstancemap = [instance_id: 
"instance_id_test_in_docker_other", name: "${name}", user_id: "${user_id}", 
obj_info: s3]
+        def otherInstanceBody = jsonOutput.toJson(otherInstancemap)
+
+        create_instance_api.call(msHttpPort, otherInstanceBody) {
+            respCode, body ->
+                log.info("create other instance http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // create again failed
+        create_instance_api.call(msHttpPort, otherInstanceBody) {
+            respCode, body ->
+                log.info("create other instance again http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED"))
+        }
+
+        jsonOutput = new JsonOutput()
+        def instance = [instance_id: "instance_id_test_in_docker_other"]
+        def dropInstanceBody = jsonOutput.toJson(instance)
+
+        drop_instance_api.call(msHttpPort, dropInstanceBody) {
+            respCode, body ->
+                log.info("drop instance http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add compute group 1
+        def clusterName = "compute_name_1"
+        def clusterId = "compute_id_1"
+        def cloudUniqueId = "1:${instance_id}:xxxxx"
+        def compute_ip1 = "182.0.0.1"
+        def heartbeatPort = 9050
+
+        // add no be cluster
+        /*
+            curl -X GET 
http://127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999 -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_name": "cluster_name1",
+                    "cluster_id": "cluster_id1",
+                    "type": "COMPUTE",
+                    "nodes": []
+                }
+            }'
+        */
+        def nodeList = []
+        def clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def addEmptyComputeGroup  = jsonOutput.toJson(instance)
+        add_cluster_api.call(msHttpPort, addEmptyComputeGroup) {
+            respCode, body ->
+                log.info("add empty compute group http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        } 
+
+        jsonOutput = new JsonOutput()
+        def dropEmptyComputeGroup  = jsonOutput.toJson(instance)
+        // drop empty cluster
+        /*
+            curl -X GET 
http://127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999 -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_name": "cluster_name1",
+                    "cluster_id": "cluster_id1"
+                }
+            }'
+        */
+        drop_cluster_api.call(msHttpPort, dropEmptyComputeGroup) {
+            respCode, body ->
+                log.info("drop empty compute group http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${compute_ip1}", heartbeat_port: "${heartbeatPort}"]
+        nodeList = [nodeMap]
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        def addComputeGroupBody = jsonOutput.toJson(instance)
+        // add_cluster has one node
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"cluster_name1",
+                    "cluster_id":"cluster_id1",
+                    "type" : "COMPUTE",
+                    "nodes":[
+                        {
+                            "cloud_unique_id":"cloud_unique_id_compute_node0",
+                            "ip":"172.0.0.10",
+                            "heartbeat_port":9050
+                        }
+                    ]
+                }
+            }'
+        */
+        add_cluster_api.call(msHttpPort, addComputeGroupBody) {
+            respCode, body ->
+                log.info("add one compute node http cli result: ${body} 
${respCode}".toString())
+                def json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        } 
+
+        // add sql group
+        // see Config.java 
+        // public static String cloud_sql_server_cluster_id = 
"RESERVED_CLUSTER_ID_FOR_SQL_SERVER";
+        // public static String cloud_sql_server_cluster_name = 
"RESERVED_CLUSTER_NAME_FOR_SQL_SERVER";
+        def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+        def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"
+        def ip1 = "162.0.0.1"
+        def ip2 = "162.0.0.2"
+        def ip3 = "162.0.0.3"
+        def edit_log_port = 8050
+        def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip1}", 
edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"]
+        def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip2}", 
edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+        def feNodeMap3 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip3}", 
edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+        def feNodeList = [feNodeMap1, feNodeMap2, feNodeMap3]
+        def feClusterMap = [cluster_name: "${feClusterName}", 
cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList]
+        instance = [instance_id: "${instance_id}", cluster: feClusterMap]
+        jsonOutput = new JsonOutput()
+        def addSqlGroupBody = jsonOutput.toJson(instance) 
+
+        add_cluster_api.call(msHttpPort, addSqlGroupBody) {
+                respCode, body ->
+                    log.info("http cli result: ${body} ${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        
+        def json
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        json.result
+
+
+        // get instance's s3 info
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/get_obj_store_info?token=greedisgood9999' -d '{
+                "cloud_unique_id":"cloud_unique_id_compute_node0"
+            }'
+        */
+        def get_obj_store_info_api_body = [cloud_unique_id:"${cloudUniqueId}"]
+        jsonOutput = new JsonOutput()
+        def getObjStoreInfo = jsonOutput.toJson(get_obj_store_info_api_body)
+
+        get_obj_store_info_api.call(msHttpPort, getObjStoreInfo) {
+            respCode, body ->
+                log.info("http cli result: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // update instance's s3 info
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/update_ak_sk?token=greedisgood9999' -d '{
+                "instance_id": "cloud_unique_id_compute_node0",
+                "internal_bucket_user": [
+                    "user_id": "1-userid",
+                    "ak": "test-ak1-updated",
+                    "sk": "test-sk1-updated"
+                ],
+            }'
+        */
+        def internal_bucket_user = [[user_id:"1-userid", 
ak:"test-ak1-updated", sk:"test-sk1-updated"]]
+        def update_ak_sk_api_body = [instance_id:"${instance_id}", 
internal_bucket_user:internal_bucket_user]
+        jsonOutput = new JsonOutput()
+        upDateAKSKBody = jsonOutput.toJson(update_ak_sk_api_body)
+
+
+        update_ak_sk_api.call(msHttpPort, upDateAKSKBody) {
+            respCode, body ->
+                log.info("http cli result: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add s3 info to instance
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/add_obj_info?token=greedisgood9999' -d '{
+                "cloud_unique_id": "cloud_unique_id_compute_node0",
+                "obj": {
+                    "ak": "test-ak2",
+                    "sk": "test-sk2",
+                    "bucket": "test-bucket",
+                    "prefix": "test-prefix",
+                    "endpoint": "test-endpoint",
+                    "region": "test-region",
+                    "provider": "COS"
+                }
+            }'
+        */
+
+        def add_obj_info_api_body = [cloud_unique_id:"${cloudUniqueId}",
+                                    obj:[ak:"test-ak2", sk:"test-sk2", 
bucket:"test-bucket",
+                                        prefix: "test-prefix", endpoint: 
"test-endpoint", region:"test-region", provider:"COS"]]
+        jsonOutput = new JsonOutput()
+        addObjInfoBody = jsonOutput.toJson(add_obj_info_api_body)
+
+
+        add_obj_info_api.call(msHttpPort, addObjInfoBody) {
+            respCode, body ->
+                log.info("http cli result: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add again, failed
+        add_cluster_api.call(msHttpPort, addComputeGroupBody) {
+            respCode, body ->
+                log.info("add again http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED"))
+                assertTrue(json.msg.startsWith("try to add a existing cluster 
id"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        } 
+
+        // get_cluster by cluster name
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/get_cluster?token=greedisgood9999' -d '{
+                "cluster_name": "cluster_name1",
+                "cloud_unique_id": "cloud_unique_id_compute_node0"
+            }'
+        */
+        def get_cluster_by_name = [cluster_name: "${clusterName}", 
cloud_unique_id: "${cloudUniqueId}"]
+        jsonOutput = new JsonOutput()
+        def getClusterByNameBody = jsonOutput.toJson(get_cluster_by_name)
+
+        get_cluster_api.call(msHttpPort, getClusterByNameBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // get_cluster by cluster id
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/get_cluster?token=greedisgood9999' -d '{
+                "cluster_id": "cluster_id1",
+                "cloud_unique_id": "cloud_unique_id_compute_node0"
+            }'
+        */
+        def get_cluster_by_id = [cluster_id: "${clusterId}", cloud_unique_id: 
"${cloudUniqueId}"]
+        jsonOutput = new JsonOutput()
+        def getClusterByIdBody = jsonOutput.toJson(get_cluster_by_id)
+
+        get_cluster_api.call(msHttpPort, getClusterByIdBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add nodes
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/add_node?token=greedisgood9999' -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_name": "cluster_name1",
+                    "cluster_id": "cluster_id1",
+                    "type": "COMPUTE",
+                    "nodes": [
+                        {
+                            "cloud_unique_id": "cloud_unique_id_compute_node1",
+                            "ip": "172.0.0.11",
+                            "heartbeat_port": 9050
+                        },
+                        {
+                            "cloud_unique_id": "cloud_unique_id_compute_node2",
+                            "ip": "172.0.0.12",
+                            "heartbeat_port": 9050
+                        }
+                    ]
+                }
+            }'
+        */
+        def compute_ip2 = "182.0.0.2"
+        def compute_ip3 = "182.0.0.3"
+        def node1 = [cloud_unique_id: "${cloudUniqueId}", ip : 
"${compute_ip2}", heartbeat_port: 9050]
+        def node2 = [cloud_unique_id: "${cloudUniqueId}", ip : 
"${compute_ip3}", heartbeat_port: 9050]
+        def add_nodes = [node1, node2]
+        def add_nodes_cluster = [cluster_name: "${clusterName}", cluster_id: 
"${clusterId}", type: "COMPUTE", nodes: add_nodes]
+        def add_nodes_body = [instance_id: "${instance_id}", cluster: 
add_nodes_cluster]
+        jsonOutput = new JsonOutput()
+        def addTwoComputeNodeBody = jsonOutput.toJson(add_nodes_body)
+
+        add_node_api.call(msHttpPort, addTwoComputeNodeBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("add two compute nodes http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // drop nodes
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/drop_node?token=greedisgood9999' -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_name": "cluster_name1",
+                    "cluster_id": "cluster_id1",
+                    "type": "COMPUTE",
+                    "nodes": [
+                        {
+                            "cloud_unique_id": "cloud_unique_id_compute_node1",
+                            "ip": "172.0.0.11",
+                            "heartbeat_port": 9050
+                        },
+                        {
+                            "cloud_unique_id": "cloud_unique_id_compute_node2",
+                            "ip": "172.0.0.12",
+                            "heartbeat_port": 9050
+                        }
+                    ]
+                }
+            }'
+        */
+        def del_nodes = [node1, node2]
+        def del_nodes_cluster = [cluster_name: "${clusterName}", cluster_id: 
"${clusterId}", type: "COMPUTE", nodes: del_nodes]
+        def del_nodes_body = [instance_id: "${instance_id}", cluster: 
del_nodes_cluster]
+        jsonOutput = new JsonOutput()
+        def delTwoNodesBody = jsonOutput.toJson(del_nodes_body)
+
+
+        drop_node_api.call(msHttpPort, delTwoNodesBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("drop two nodes http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // rename cluster
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/rename_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"compute_name_1",
+                    "cluster_id":"compute_id_1"
+                }
+            }'
+        */
+
+        clusterMap = [cluster_name: "compute_name_1_renamed", 
cluster_id:"${clusterId}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def renmaeClusterBody = jsonOutput.toJson(instance)
+        rename_node_api.call(msHttpPort, renmaeClusterBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("rename cluster http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def renameClusterBackBody = jsonOutput.toJson(instance)
+        rename_node_api.call(msHttpPort, renameClusterBackBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("rename cluster back http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        def clusterName2 = "cluster_name2"
+        def clusterId2 = "cluster_id2"
+        def nodeList1 = [node1]
+        clusterMap1 = [cluster_name: "${clusterName2}", 
cluster_id:"${clusterId2}", type:"COMPUTE", nodes:nodeList1]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap1]
+        jsonOutput = new JsonOutput()
+        def addNewComputeGroupBody = jsonOutput.toJson(instance)
+        // add_cluster has one node
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"cluster_name2",
+                    "cluster_id":"cluster_id2",
+                    "type" : "COMPUTE",
+                    "nodes":[
+                        {
+                            "cloud_unique_id":"cloud_unique_id_compute_node0",
+                            "ip":"172.0.0.11",
+                            "heartbeat_port":9050
+                        }
+                    ]
+                }
+            }'
+        */
+
+        add_cluster_api.call(msHttpPort, addNewComputeGroupBody) {
+            respCode, body ->
+                log.info("add new compute group http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // "code": "INVALID_ARGUMENT",
+        // "msg": "failed to drop instance, instance has clusters"
+        def dropInstanceFailedDueHasCuster = addNewComputeGroupBody
+        drop_instance_api.call(msHttpPort, dropInstanceFailedDueHasCuster) {
+            respCode, body ->
+                log.info("drop instance failed due to has cluster http cli 
result: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT"))
+        }
+
+        // failed, failed to rename cluster, a cluster with the same name 
already exists in this instance
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId2}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def renameFailedBody = jsonOutput.toJson(instance)
+        rename_node_api.call(msHttpPort, renameFailedBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("rename failed http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT"))
+                assertTrue(json.msg.contains("failed to rename cluster, a 
cluster with the same name already exists in this instance"))
+        }
+
+        // get cluster status
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/get_cluster_status?token=greedisgood9999' -d '{
+                "instance_ids":["instance_id_deadbeef"]
+            }'
+        */
+        def get_cluster_status = { request_body, check_func ->
+            httpTest {
+                endpoint msHttpPort
+                uri "/MetaService/http/get_cluster_status?token=$token"
+                body request_body
+                check check_func
+            }
+        }
+
+        // set cluster status
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/set_cluster_status?token=greedisgood9999' -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_id": "test_cluster_1_id1",
+                    "cluster_status":"STOPPED"
+                }
+            }'
+        */
+        def set_cluster_status = { request_body, check_func ->
+            httpTest {
+                endpoint msHttpPort
+                uri "/MetaService/http/set_cluster_status?token=$token"
+                body request_body
+                check check_func
+            }
+        }
+
+        def getClusterInstance = [instance_ids: ["${instance_id}"]]
+        jsonOutput = new JsonOutput()
+        def getClusterStatusBody = jsonOutput.toJson(getClusterInstance)
+        get_cluster_status.call(getClusterStatusBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("get cluster status http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        clusterMap = [cluster_id:"${clusterId2}", cluster_status:"SUSPENDED"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def setClusterStatusBody = jsonOutput.toJson(instance)
+        set_cluster_status.call(setClusterStatusBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("set cluster status http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // failed to set cluster status, status eq original status, original 
cluster is NORMAL
+        set_cluster_status.call(setClusterStatusBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("set cluster status again failed http cli result: 
${body} ${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT"))
+        }
+
+        // failed to set cluster status, original cluster is SUSPENDED and 
want set UNKNOWN
+        clusterMap = [cluster_id:"${clusterId2}", cluster_status:"UNKNOWN"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def setClusterInvalidStatusBody = jsonOutput.toJson(instance)
+        set_cluster_status.call(setClusterInvalidStatusBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("set cluster status invalid status http cli result: 
${body} ${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT"))
+        }
+
+        // drop cluster
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"cluster_name1",
+                    "cluster_id":"cluster_id1"
+                }
+            }'
+        */
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def dropCluster1Body = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropCluster1Body) {
+            respCode, body ->
+                log.info("drop cluster 1 http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // drop cluster_id2
+        clusterMap = [cluster_id:"${clusterId2}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def dropCluster2Body = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropCluster2Body) {
+            respCode, body ->
+                log.info("drop cluster 2 http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // drop not exist cluster, falied
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"not_exist_cluster_name",
+                    "cluster_id":"not_exist_cluster_name"
+                }
+            }'
+        */
+        clusterMap = [cluster_name: "not_exist_cluster_name", 
cluster_id:"not_exist_cluster_id", nodes:nodeList]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def dropNotExistClusterBody = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropNotExistClusterBody) {
+            respCode, body ->
+                log.info("drop not exist cluster http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("NOT_FOUND"))
+        }
+
+        // after drop, get cluster again, failed
+        instance = [cloud_unique_id: "${cloudUniqueId}", cluster_id: 
"${clusterId}"]
+        jsonOutput = new JsonOutput()
+        def afterDropAndGetBody = jsonOutput.toJson(instance)
+        get_cluster_api.call(msHttpPort, afterDropAndGetBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("after drop and get again http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("NOT_FOUND"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("get instance resp: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add node to another cluster, compute_ip1 not owned by any cluster
+        /*
+            curl 
'127.0.0.1:5000/MetaService/http/add_cluster?token=greedisgood9999' -d '{
+                "instance_id":"instance_id_deadbeef",
+                "cluster":{
+                    "cluster_name":"cluster_name2",
+                    "cluster_id":"cluster_name2",
+                    "type" : "COMPUTE",
+                    "nodes":[
+                        {
+                            "cloud_unique_id":"cloud_unique_id_compute_node0",
+                            "ip":"172.0.0.10",
+                            "heartbeat_port":9050
+                        }
+                    ]
+                }
+            }'
+        */
+        clusterName = "compute_group_name2"
+        clusterId = "compute_group_id2"
+        nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: "${compute_ip1}", 
heartbeat_port: "${heartbeatPort}"]
+        nodeList = [nodeMap]
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}", nodes:nodeList, type:"COMPUTE"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def addNodeToOtherBody = jsonOutput.toJson(instance)
+        add_cluster_api.call(msHttpPort, addNodeToOtherBody) {
+            respCode, body ->
+                log.info("add node to other compute group http cli result: 
${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // drop cluster
+        /*
+            curl -X GET 
http://127.0.0.1:5000/MetaService/http/drop_cluster?token=greedisgood9999 -d '{
+                "instance_id": "instance_id_deadbeef",
+                "cluster": {
+                    "cluster_name": "cluster_name2",
+                    "cluster_id": "cluster_id2"
+                }
+            }'
+        */
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}"]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        jsonOutput = new JsonOutput()
+        def dropClusterErrBody = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropClusterErrBody) {
+            respCode, body ->
+                log.info("http cli result: ${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("after drop other cluster get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // add back compute group
+        add_cluster_api.call(msHttpPort, addNewComputeGroupBody) {
+            respCode, body ->
+                log.info("add new compute group again again http cli result: 
${body} ${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        add_cluster_api.call(msHttpPort, addComputeGroupBody) {
+            respCode, body ->
+                log.info("add again again http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("after add back fe cluster get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        // custom instance PB
+        /*
+        {
+            "code": "OK",
+            "msg": "",
+            "result": {
+                "user_id": "10000",
+                "instance_id": "instance_id_test_in_docker",
+                "name": "user_1",
+                "clusters": [
+                    {
+                        "cluster_id": "RESERVED_CLUSTER_ID_FOR_SQL_SERVER",
+                        "cluster_name": "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER",
+                        "type": "SQL",
+                            "nodes": [
+                            {
+                                "cloud_unique_id": 
"1:instance_id_test_in_docker:xxxxx",
+                                "ip": "162.0.0.1",
+                                "ctime": "1733650571",
+                                "mtime": "1733650571",
+                                "edit_log_port": 8050,
+                                "node_type": "FE_FOLLOWER"
+                            },
+                            {
+                                "cloud_unique_id": 
"1:instance_id_test_in_docker:xxxxx",
+                                "ip": "162.0.0.2",
+                                "ctime": "1733650571",
+                                "mtime": "1733650571",
+                                "edit_log_port": 8050,
+                                "node_type": "FE_OBSERVER"
+                            },
+                            {
+                                "cloud_unique_id": 
"1:instance_id_test_in_docker:xxxxx",
+                                "ip": "162.0.0.3",
+                                "ctime": "1733650571",
+                                "mtime": "1733650571",
+                                "edit_log_port": 8050,
+                                "node_type": "FE_OBSERVER"
+                            }
+                        ]
+                    },
+                    {
+                        "cluster_id": "cluster_id2",
+                        "cluster_name": "cluster_name2",
+                        "type": "COMPUTE",
+                        "nodes": [
+                            {
+                                "cloud_unique_id": 
"1:instance_id_test_in_docker:xxxxx",
+                                "ip": "182.0.0.2",
+                                "ctime": "1733653263",
+                                "mtime": "1733653263",
+                                "heartbeat_port": 9050
+                            }
+                        ],
+                        "cluster_status": "NORMAL"
+                    },
+                    {
+                        "cluster_id": "compute_id_1",
+                        "cluster_name": "compute_name_1",
+                        "type": "COMPUTE",
+                        "nodes": [
+                            {
+                                "cloud_unique_id": 
"1:instance_id_test_in_docker:xxxxx",
+                                "ip": "182.0.0.1",
+                                "ctime": "1733653263",
+                                "mtime": "1733653263",
+                                "heartbeat_port": 9050
+                            }
+                        ],
+                        "cluster_status": "NORMAL"
+                    }
+                ],
+                "obj_info": [
+                    {
+                        "ctime": "1733650571",
+                        "mtime": "1733650571",
+                        "id": "1",
+                        "ak": "test-ak1-updated",
+                        "sk": "test-sk1-updated",
+                        "bucket": "test-bucket",
+                        "prefix": "test-prefix",
+                        "endpoint": "test-endpoint",
+                        "region": "test-region",
+                        "provider": "BOS",
+                        "external_endpoint": "test-external-endpoint",
+                        "user_id": "1-userid",
+                        "encryption_info": {
+                            "encryption_method": "AES_256_ECB",
+                            "key_id": "1"
+                        },
+                        "sse_enabled": false
+                    },
+                    {
+                        "ctime": "1733650571",
+                        "mtime": "1733650571",
+                        "id": "2",
+                        "ak": "test-ak2",
+                        "sk": "test-sk2",
+                        "bucket": "test-bucket",
+                        "prefix": "test-prefix",
+                        "endpoint": "test-endpoint",
+                        "region": "test-region",
+                        "provider": "COS",
+                        "external_endpoint": "",
+                        "encryption_info": {
+                            "encryption_method": "AES_256_ECB",
+                            "key_id": "1"
+                        },
+                        "sse_enabled": false
+                    }
+                ],
+                        "status": "NORMAL",
+                "iam_user": {
+                    "user_id": "",
+                    "ak": "",
+                    "sk": "",
+                    "external_id": "instance_id_test_in_docker"
+                },
+                "sse_enabled": false,
+                "enable_storage_vault": false
+            }
+        }
+        */
+    }
+
+    def clusterOptions = [
+        new ClusterOptions(),
+        new ClusterOptions(),
+    ]
+
+    for (def cos in clusterOptions) {
+        cos.cloudMode = true
+        cos.feNum = 1
+        cos.beNum = 1
+        cos.feConfigs += [
+            'cloud_cluster_check_interval_second=1',
+            'sys_log_verbose_modules=org',
+            'heartbeat_interval_second=1',
+            ]
+    }
+    
+
+    for (def i = 0; i < clusterOptions.size(); i++) {
+        // 1. Test that a node cannot be repeatedly added to multiple clusters
+        // 1.1 compute node
+        // 2. Test API supports switching from master observer mode to multi 
follower mode
+        docker(clusterOptions[i]) {
+            def ms = cluster.getAllMetaservices().get(0)
+            def msHttpPort = ms.host + ":" + ms.httpPort
+            logger.info("ms2 addr={}, port={}, ms endpoint={}", ms.host, 
ms.httpPort, msHttpPort)
+
+            def token = "greedisgood9999"
+            def instance_id = "instance_id_test_in_docker_1"
+            def name = "user_1"
+            def user_id = "10000"
+            def clusterName = "compute_name_1"
+            def clusterId = "compute_id_1"
+            def cloudUniqueId = "1:${instance_id}:xxxxx"
+            // create instance
+            def jsonOutput = new JsonOutput()
+            def s3 = [
+                        ak: "test-ak1",
+                        sk : "test-sk1",
+                        bucket : "test-bucket",
+                        prefix: "test-prefix",
+                        endpoint: "test-endpoint",
+                        region: "test-region",
+                        provider : "BOS",
+                        'external_endpoint': "test-external-endpoint"
+                    ]
+            def map = [instance_id: "${instance_id}", name: "${name}", 
user_id: "${user_id}", obj_info: s3]
+            def instance_body = jsonOutput.toJson(map)
+
+            create_instance_api.call(msHttpPort, instance_body) {
+                    respCode, body ->
+                        log.info("http cli result: ${body} 
${respCode}".toString())
+                        def json = parseJson(body)
+                        assertTrue(json.code.equalsIgnoreCase("OK"))
+            }
+
+            def compute_ip1 = "182.0.0.1" 
+            def heartbeatPort = 9050
+            def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${compute_ip1}", heartbeat_port: "${heartbeatPort}"]
+            nodeList = [nodeMap]
+            clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList]
+            instance = [instance_id: "${instance_id}", cluster: clusterMap]
+            def addComputeGroupBody = jsonOutput.toJson(instance)
+            add_cluster_api.call(msHttpPort, addComputeGroupBody) {
+                respCode, body ->
+                    log.info("add one compute node http cli result: ${body} 
${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+            } 
+            
+            // 1. Test that a node cannot be repeatedly added to multiple 
clusters
+            // 1.1 compute node
+            node1 = [cloud_unique_id: "${cloudUniqueId}", ip : 
"${compute_ip1}", heartbeat_port: 9050]
+            add_nodes = [node1]
+            def otherClusterName = "compute_name_1_other"
+            def otherClusterId = "compute_id_1_other"
+            add_nodes_cluster = [cluster_name: "${otherClusterName}", 
cluster_id: "${otherClusterId}", type: "COMPUTE", nodes: add_nodes]
+            def addNodeToOtherCluster = [instance_id: "${instance_id}", 
cluster: add_nodes_cluster]
+            jsonOutput = new JsonOutput()
+            addNodeToOtherClusterbody = 
jsonOutput.toJson(addNodeToOtherCluster)
+            add_cluster_api.call(msHttpPort, addNodeToOtherClusterbody) {
+                respCode, body ->
+                    log.info("add node to other compute group http cli result: 
${body} ${respCode}".toString())
+                    json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED"))
+                    assertTrue(json.msg.contains("compute node endpoint has 
been added"))
+            }
+
+            // 1.2 sql node
+            def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+            def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"
+            def ip1 = "162.0.0.1"
+            def ip2 = "162.0.0.2"
+            def ip3 = "162.0.0.3"
+            def edit_log_port = 8050
+            def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${ip1}", edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"]
+            def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${ip2}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+            def feNodeMap3 = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${ip3}", edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+            def feNodeList = [feNodeMap1, feNodeMap2, feNodeMap3]
+            def feClusterMap = [cluster_name: "${feClusterName}", 
cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList]
+            instance = [instance_id: "${instance_id}", cluster: feClusterMap]
+            jsonOutput = new JsonOutput()
+            def addSqlGroupBody = jsonOutput.toJson(instance) 
+
+            add_cluster_api.call(msHttpPort, addSqlGroupBody) {
+                    respCode, body ->
+                        log.info("http cli result: ${body} 
${respCode}".toString())
+                        def json = parseJson(body)
+                        assertTrue(json.code.equalsIgnoreCase("OK"))
+            }
+            
+            node_fe_other = [cloud_unique_id: "${cloudUniqueId}", ip : 
"${ip3}", edit_log_port: 8050, node_type:"FE_FOLLOWER"]
+            add_nodes = [node_fe_other]
+            otherClusterName = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER_OTHER"
+            otherClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER_OTHER"
+            add_nodes_cluster = [cluster_name: "${otherClusterName}", 
cluster_id: "${otherClusterId}", type:"SQL", nodes: add_nodes]
+            def addNodeToOtherClusterFE = [instance_id: "${instance_id}", 
cluster: add_nodes_cluster]
+            jsonOutput = new JsonOutput()
+            addNodeToOtherFEClusterbody = 
jsonOutput.toJson(addNodeToOtherClusterFE)
+            add_cluster_api.call(msHttpPort, addNodeToOtherFEClusterbody) {
+                respCode, body ->
+                    log.info("add node to other compute group http cli result: 
${body} ${respCode}".toString())
+                    json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("ALREADY_EXISTED"))
+                    assertTrue(json.msg.contains("sql node endpoint has been 
added"))
+            }
+
+            // 2. Test API supports switching from master observer mode to 
multi follower mode
+            def newTestSQLIP1 = "152.0.0.1"
+            def newTestSQLIP2 = "152.0.0.2"
+            def newNodefeTestForUnlimit1Failed = [cloud_unique_id: 
"${cloudUniqueId}", ip : "${newTestSQLIP1}", edit_log_port: 8050, 
node_type:"FE_OBSERVER"]
+            def newNodefeTestForUnlimit2 = [cloud_unique_id: 
"${cloudUniqueId}", ip : "${newTestSQLIP2}", edit_log_port: 8050, 
node_type:"FE_OBSERVER"]
+            def addNodesFailed = [newNodefeTestForUnlimit1Failed, 
newNodefeTestForUnlimit2]
+
+            // two FE_OBSERVER Failed to add cluster
+            def addTwoObNodesClusterFailed = [cluster_name: 
"${otherClusterName}", cluster_id: "${otherClusterId}", type:"SQL", nodes: 
addNodesFailed]
+            def addNodesClusterInstanceFailed = [instance_id: 
"${instance_id}", cluster: addTwoObNodesClusterFailed]
+            jsonOutput = new JsonOutput()
+            def addNodesClusterFailedBody = 
jsonOutput.toJson(addNodesClusterInstanceFailed)
+            add_cluster_api.call(msHttpPort, addNodesClusterFailedBody) {
+                respCode, body ->
+                    log.info("add two observer fe failed test http cli result: 
${body} ${respCode}".toString())
+                    json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("INVALID_ARGUMENT"))
+                    assertTrue(json.msg.contains("cluster is SQL type, but not 
set master and follower node, master count=0 follower count=0 so sql cluster 
can't get a Master node"))
+            }
+
+            def ip4 = "162.0.0.4"
+            def ip5 = "162.0.0.5"
+            def ip6 = "162.0.0.6"
+            def feNode4 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip4}", 
edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"]
+            def feNode5 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip5}", 
edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+            def feNode6 = [cloud_unique_id: "${cloudUniqueId}", ip : "${ip6}", 
edit_log_port: "${edit_log_port}", node_type:"FE_FOLLOWER"]
+            def addNodesClusterFailed = [cluster_name: "${feClusterName}", 
cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode4, feNode5, feNode6]]
+            def dropAllFeNodesFailed = [cluster_name: "${feClusterName}", 
cluster_id: "${feClusterId}", type: "SQL", nodes: [feNodeMap1, feNodeMap2, 
feNodeMap3, feNode5, feNode6]]
+            def addNodesClusterSucc = [cluster_name: "${feClusterName}", 
cluster_id: "${feClusterId}", type: "SQL", nodes: [feNode5, feNode6]]
+            def addNodesFailedBody = [instance_id: "${instance_id}", cluster: 
addNodesClusterFailed]
+            def dropAllFeNodesClusterBody = [instance_id: "${instance_id}", 
cluster: dropAllFeNodesFailed]
+            def addNodesBodySuccBody= [instance_id: "${instance_id}", cluster: 
addNodesClusterSucc]
+            jsonOutput = new JsonOutput()
+            def addSomeFENodesFailed = jsonOutput.toJson(addNodesFailedBody)
+            def addSomeFENodesSucc = jsonOutput.toJson(addNodesBodySuccBody)
+            def dropAllFeNodesFailedJson = 
jsonOutput.toJson(dropAllFeNodesClusterBody)
+
+            add_node_api.call(msHttpPort, addSomeFENodesFailed) {
+                respCode, body ->
+                    json = parseJson(body)
+                    // failed, due to two master node
+                    // if force_change_to_multi_follower_mode == false, check 
type not changed, FE_MASTER
+                    log.info("add some fe failed nodes http cli result: 
${body} ${respCode} ${json}".toString())
+                    assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR"))
+                    assertTrue(json.msg.contains("instance invalid, cant 
modify, plz check"))
+            }
+
+            add_node_api.call(msHttpPort, addSomeFENodesSucc) {
+                respCode, body ->
+                    json = parseJson(body)
+                    log.info("add some fe nodes http cli result: ${body} 
${respCode} ${json}".toString())
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+            }
+
+            // inject point, to change MetaServiceImpl_get_cluster_set_config
+            inject_to_ms_api.call(msHttpPort, 
"resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) {
+                respCode, body ->
+                    log.info("inject resource_manager::set_safe_drop_time 
resp: ${body} ${respCode}".toString()) 
+            }
+
+            enable_ms_inject_api.call(msHttpPort) {
+                respCode, body ->
+                log.info("enable inject resp: ${body} ${respCode}".toString()) 
+            }
+
+            drop_node_api.call(msHttpPort, dropAllFeNodesFailedJson) {
+                respCode, body ->
+                    json = parseJson(body)
+                    log.info("drop all fe nodes failed http cli result: 
${body} ${respCode} ${json}".toString())
+                    assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR"))
+                    assertTrue(json.msg.contains("instance invalid, cant 
modify, plz check")) 
+            }
+
+            get_instance_api.call(msHttpPort, instance_id) {
+                respCode, body ->
+                    log.info("add Master-observer mode get instance resp: 
${body} ${respCode}".toString())
+                    json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+                    def result = json.result
+                    def FECluster = result.clusters.find {
+                        it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+                    }
+                    log.info("find it cluster ${FECluster}")
+                    assertNotNull(FECluster)
+                    def checkFENode = FECluster.nodes.find {
+                        it.ip == ip1
+                    }
+                    def followerFeNode = FECluster.nodes.find {
+                        it.ip == ip6
+                    }
+                    log.info("find it node fe1: ${checkFENode}")
+                    log.info("find it node fe6: ${followerFeNode}")
+                    assertNotNull(checkFENode)
+                    assertNotNull(followerFeNode)
+                    assertEquals("FE_MASTER", checkFENode.node_type)
+                    assertEquals("FE_FOLLOWER", followerFeNode.node_type)
+            }
+        }
+    }
+
+    // 3. fe get cluster, change FE_MASTER to FE_FOLLOWER
+    // Upgrade compatibility for existing master-observers models
+    // when call get_cluster api, FE_MASTER type will be changed to FE_FOLLOWER
+    def optionsForUpgrade = new ClusterOptions()
+    optionsForUpgrade.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    optionsForUpgrade.setFeNum(1)
+    optionsForUpgrade.setBeNum(1)
+    optionsForUpgrade.cloudMode = true
+
+    docker(optionsForUpgrade) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        logger.info("ms3 addr={}, port={}, ms endpoint={}", ms.host, 
ms.httpPort, msHttpPort)
+
+        def token = "greedisgood9999"
+        def instance_id = "instance_id_test_in_docker_2"
+        def name = "user_1"
+        def user_id = "10000"
+        def clusterName = "compute_name_1"
+        def clusterId = "compute_id_1"
+        def cloudUniqueId = "1:${instance_id}:xxxxx"
+        // create instance
+        def jsonOutput = new JsonOutput()
+        def s3 = [
+                    ak: "test-ak1",
+                    sk : "test-sk1",
+                    bucket : "test-bucket",
+                    prefix: "test-prefix",
+                    endpoint: "test-endpoint",
+                    region: "test-region",
+                    provider : "BOS",
+                    'external_endpoint': "test-external-endpoint"
+                ]
+        def map = [instance_id: "${instance_id}", name: "${name}", user_id: 
"${user_id}", obj_info: s3]
+        def instance_body = jsonOutput.toJson(map)
+
+        create_instance_api.call(msHttpPort, instance_body) {
+                respCode, body ->
+                    log.info("http cli result: ${body} ${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        def feClusterId = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+        def feClusterName = "RESERVED_CLUSTER_NAME_FOR_SQL_SERVER"
+        def ip1 = "162.0.0.1"
+        def ip2 = "162.0.0.2"
+        def edit_log_port = 8050
+        def feNodeMap1 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip1}", 
edit_log_port: "${edit_log_port}", node_type:"FE_MASTER"]
+        def feNodeMap2 = [cloud_unique_id: "${cloudUniqueId}", ip: "${ip2}", 
edit_log_port: "${edit_log_port}", node_type:"FE_OBSERVER"]
+        def feNodeList = [feNodeMap1, feNodeMap2]
+        def feClusterMap = [cluster_name: "${feClusterName}", 
cluster_id:"${feClusterId}", type:"SQL", nodes:feNodeList]
+        instance = [instance_id: "${instance_id}", cluster: feClusterMap]
+        jsonOutput = new JsonOutput()
+        def addSqlGroupBody = jsonOutput.toJson(instance) 
+
+        add_cluster_api.call(msHttpPort, addSqlGroupBody) {
+                respCode, body ->
+                    log.info("http cli result: ${body} ${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("add Master-observer mode get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def FECluster = result.clusters.find {
+                    it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+                }
+                log.info("find it cluster ${FECluster}")
+                assertNotNull(FECluster)
+                def checkMasterNode = FECluster.nodes.find {
+                    it.ip == ip1
+                }
+                assertNotNull(checkMasterNode) 
+                assertEquals("FE_MASTER", checkMasterNode.node_type)
+        }
+
+
+        enable_ms_inject_api.call(msHttpPort) {
+            respCode, body ->
+               log.info("enable inject resp: ${body} ${respCode}".toString()) 
+        }
+
+        def getFEClusterByName = [cluster_name: "${feClusterName}", 
cluster_id:"${feClusterId}", cloud_unique_id: "${cloudUniqueId}"]
+        jsonOutput = new JsonOutput()
+        def getClusterByNameBody = jsonOutput.toJson(getFEClusterByName)
+
+        get_cluster_api.call(msHttpPort, getClusterByNameBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("get FE cluster http cli result: ${body} ${respCode} 
${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def checkFollowerNode = result.nodes.find {
+                    it.ip == ip1
+                }
+                assertNotNull(checkFollowerNode) 
+                assertEquals("FE_MASTER", checkFollowerNode.node_type)
+        }
+
+        // check instance
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("after get cluster get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def FECluster = result.clusters.find {
+                    it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+                }
+                log.info("find it cluster ${FECluster}")
+                assertNotNull(FECluster)
+                def checkFollowerNode = FECluster.nodes.find {
+                    it.ip == ip1
+                }
+                assertNotNull(checkFollowerNode) 
+                assertEquals("FE_MASTER", checkFollowerNode.node_type)
+        } 
+
+        // 4. Test use HTTP API add fe, drop fe node in protection time 
failed, excced protection time succ
+        // test drop fe observer node
+        def del_nodes = [feNodeMap2]
+        def del_nodes_cluster = [cluster_name: "${feClusterName}", cluster_id: 
"${feClusterId}", type: "SQL", nodes: del_nodes]
+        def del_nodes_body = [instance_id: "${instance_id}", cluster: 
del_nodes_cluster]
+        jsonOutput = new JsonOutput()
+        def delFeObserverNodesBody = jsonOutput.toJson(del_nodes_body)
+
+        drop_node_api.call(msHttpPort, delFeObserverNodesBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("drop fe observer node http cli result: ${body} 
${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("INTERNAL_ERROR"))
+                assertTrue(json.msg.contains("drop fe node not in safe time, 
try later"))
+        }
+
+        // test drop fe cluster, can drop, without protection
+        // fe checker will drop fe node,
+        // non-master node will exit, and master node, whill throw error, 
can't drop itself succ
+        // feClusterMap = [cluster_name: "${feClusterName}", 
cluster_id:"${feClusterId}"]
+        feClusterMap = [cluster_id:"${feClusterId}"]
+        instance = [instance_id: "${instance_id}", cluster: feClusterMap]
+        jsonOutput = new JsonOutput()
+        def dropFeClusterBody = jsonOutput.toJson(instance)
+        drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
+            respCode, body ->
+                log.info("drop fe cluster http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("NOT_FOUND"))
+                assertTrue(json.msg.contains("drop fe cluster not in safe 
time, try later"))
+        }
+
+        get_cluster_api.call(msHttpPort, getClusterByNameBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("get FE cluster after drop observer http cli result: 
${body} ${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def checkObserverNotBeenDropedNode = result.nodes.find {
+                    it.ip == ip2
+                }
+                assertNotNull(checkObserverNotBeenDropedNode)
+        }
+
+        // inject point, to change MetaServiceImpl_get_cluster_set_config
+        inject_to_ms_api.call(msHttpPort, 
"resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) {
+            respCode, body ->
+                log.info("inject resource_manager::set_safe_drop_time resp: 
${body} ${respCode}".toString()) 
+        }
+
+        enable_ms_inject_api.call(msHttpPort) {
+            respCode, body ->
+                log.info("enable inject resp: ${body} ${respCode}".toString()) 
+        }
+
+        // after inject, drop fe node, drop fe cluster all succ
+        drop_node_api.call(msHttpPort, delFeObserverNodesBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("after inject drop fe observer nodeshttp cli result: 
${body} ${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        } 
+
+        get_cluster_api.call(msHttpPort, getClusterByNameBody) {
+            respCode, body ->
+                json = parseJson(body)
+                log.info("get FE cluster after drop observer http cli result: 
${body} ${respCode} ${json}".toString())
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def checkObserverBeenDropedNode = result.nodes.find {
+                    it.ip == ip2
+                }
+                assertNull(checkObserverBeenDropedNode)
+        }
+
+        drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
+            respCode, body ->
+                log.info("drop fe cluster http cli result: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("after get cluster get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+                def result = json.result
+                def FECluster = result.clusters.find {
+                    it.cluster_id == "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"
+                }
+                assertNull(FECluster)
+        } 
+
+        // 5. Test Drop node, unable to find node message HTTP code return 404
+        def compute_ip1 = "182.0.0.1" 
+        def heartbeatPort = 9050
+        def nodeMap = [cloud_unique_id: "${cloudUniqueId}", ip: 
"${compute_ip1}", heartbeat_port: "${heartbeatPort}"]
+        nodeList = [nodeMap]
+        clusterMap = [cluster_name: "${clusterName}", 
cluster_id:"${clusterId}", type:"COMPUTE", nodes:nodeList]
+        instance = [instance_id: "${instance_id}", cluster: clusterMap]
+        def addComputeGroupBody = jsonOutput.toJson(instance)
+        add_cluster_api.call(msHttpPort, addComputeGroupBody) {
+                respCode, body ->
+                    log.info("http cli result: ${body} ${respCode}".toString())
+                    def json = parseJson(body)
+                    assertTrue(json.code.equalsIgnoreCase("OK"))
+        }
+
+        get_instance_api.call(msHttpPort, instance_id) {
+            respCode, body ->
+                log.info("after get cluster get instance resp: ${body} 
${respCode}".toString())
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("OK"))
+        } 
+
+        // 182.0.0.3 not in instance, can't find to drop, return code 404
+        def node1 = [cloud_unique_id: "${cloudUniqueId}", ip : "182.0.0.3", 
heartbeat_port: 9050] 
+        def del_compute_nodes = [node1]
+        def del_compute_nodes_cluster = [cluster_name: "${clusterName}", 
cluster_id: "${clusterId}", type: "COMPUTE", nodes: del_compute_nodes]
+        def del_compute_nodes_body = [instance_id: "${instance_id}", cluster: 
del_compute_nodes_cluster]
+        jsonOutput = new JsonOutput()
+        def delComputeNodesBody = jsonOutput.toJson(del_compute_nodes_body)
+
+        drop_node_api.call(msHttpPort, delComputeNodesBody) {
+            respCode, body ->
+                log.info("drop compute group http cli result: ${body} 
${respCode}".toString())
+                assertEquals(404, respCode)
+                json = parseJson(body)
+                assertTrue(json.code.equalsIgnoreCase("NOT_FOUND"))
+        }
+    }
+
+    // 6. check 127.0.0.1 ms exit
+    def optionsForMs = new ClusterOptions()
+    optionsForMs.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    optionsForMs.setFeNum(1)
+    optionsForMs.setBeNum(1)
+    optionsForMs.setMsNum(2)
+    optionsForMs.cloudMode = true
+
+    docker(optionsForMs) {
+        log.info("in test ms docker env")
+        def mss = cluster.getAllMetaservices()
+        cluster.addRWPermToAllFiles()
+        def ms2 = cluster.getMetaservices().get(1)
+        assertNotNull(ms2)
+        // change ms2 conf, and restart it
+        def confFile = ms2.getConfFilePath()
+        log.info("ms2 conf file: {}", confFile)
+        def writer = new PrintWriter(new FileWriter(confFile, true))  // true 
表示 append 模式
+        writer.println("priority_networks=127.0.0.1/32")
+        writer.flush()
+        writer.close()
+
+        cluster.restartMs(ms2.index)
+        // check ms2 exit, exit need some time
+        sleep(15000)
+        ms2 = cluster.getMetaservices().get(1)
+        def ms2Alive = ms2.alive
+        assertFalse(ms2Alive)
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy 
b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
index 70372f68ab8..61faf6cb016 100644
--- a/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
+++ b/regression-test/suites/cloud_p0/node_mgr/test_sql_mode_node_mgr.groovy
@@ -36,6 +36,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
         ]
         options.cloudMode = true
         options.sqlModeNodeMgr = true
+        options.connectToFollower = true
         options.waitTimeout = 0
         options.feNum = 3
         options.useFollowersMode = true
@@ -59,9 +60,48 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
     clusterOptions[3].beClusterId = false;
     clusterOptions[3].beMetaServiceEndpoint = false;
 
+    def inject_to_ms_api = { msHttpPort, key, value, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri 
"/MetaService/http/v1/injection_point?token=${token}&op=set&name=${key}&behavior=change_args&value=${value}"
+            check check_func
+        }
+    }
+
+    def clear_ms_inject_api = { msHttpPort, key, value, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri "/MetaService/http/v1/injection_point?token=${token}&op=clear"
+            check check_func
+        }
+    }
+
+    def enable_ms_inject_api = { msHttpPort, check_func ->
+        httpTest {
+            op "get"
+            endpoint msHttpPort
+            uri "/MetaService/http/v1/injection_point?token=${token}&op=enable"
+            check check_func
+        }
+    }
+
     for (options in clusterOptions) {
         docker(options) {
             logger.info("docker started");
+            def ms = cluster.getAllMetaservices().get(0)
+            def msHttpPort = ms.host + ":" + ms.httpPort
+            // inject point, to change MetaServiceImpl_get_cluster_set_config
+            inject_to_ms_api.call(msHttpPort, 
"resource_manager::set_safe_drop_time", URLEncoder.encode('[-1]', "UTF-8")) {
+            respCode, body ->
+                log.info("inject resource_manager::set_safe_drop_time resp: 
${body} ${respCode}".toString()) 
+            }
+
+            enable_ms_inject_api.call(msHttpPort) {
+                respCode, body ->
+                log.info("enable inject resp: ${body} ${respCode}".toString()) 
+            }
 
             def checkFrontendsAndBackends = {
                 // Check frontends
@@ -474,6 +514,7 @@ suite('test_sql_mode_node_mgr', 'multi_cluster,docker,p1') {
             logger.info("Successfully decommissioned backend and verified its 
status")
 
             checkClusterStatus(3, 3, 8)
+
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to