This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ae0915721b9 [fix](cloud) Check instance_id valid when use
cloud_unique_id degrade format #43253 (#43832)
ae0915721b9 is described below
commit ae0915721b9987897d97c6b6c56faa5363d427fb
Author: deardeng <[email protected]>
AuthorDate: Wed Nov 13 13:48:42 2024 +0800
[fix](cloud) Check instance_id valid when use cloud_unique_id degrade
format #43253 (#43832)
cherry pick from #43253
---
cloud/src/common/config.h | 2 +
cloud/src/meta-service/meta_service.cpp | 56 ++++++++++++++----------
cloud/src/meta-service/meta_service_resource.cpp | 18 ++++++--
cloud/src/resource-manager/resource_manager.cpp | 34 +++++++++++++-
cloud/src/resource-manager/resource_manager.h | 19 ++++++++
cloud/test/fdb_injection_test.cpp | 1 +
cloud/test/meta_service_test.cpp | 16 +++++++
7 files changed, 117 insertions(+), 29 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index bd255b013e3..9fe98c16510 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -220,4 +220,6 @@ CONF_Int32(txn_lazy_commit_num_threads, "8");
CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");
// max TabletIndexPB num for batch get
CONF_Int32(max_tablet_index_num_per_batch, "1000");
+
+CONF_Bool(enable_check_instance_id, "true");
} // namespace doris::cloud::config
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index a59869196e3..69740dbf49a 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -88,46 +88,54 @@ std::string get_instance_id(const
std::shared_ptr<ResourceManager>& rc_mgr,
std::vector<NodeInfo> nodes;
std::string err = rc_mgr->get_node(cloud_unique_id, &nodes);
{ TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); }
+ std::string instance_id;
if (!err.empty()) {
// cache can't find cloud_unique_id, so degraded by parse
cloud_unique_id
// cloud_unique_id encode: ${version}:${instance_id}:${unique_id}
// check it split by ':' c
- auto vec = split(cloud_unique_id, ':');
- std::stringstream ss;
- for (int i = 0; i < vec.size(); ++i) {
- ss << "idx " << i << "= [" << vec[i] << "] ";
- }
- LOG(INFO) << "degraded to get instance_id, cloud_unique_id: " <<
cloud_unique_id
- << "after split: " << ss.str();
- if (vec.size() != 3) {
- LOG(WARNING) << "cloud unique id is not degraded format, failed to
check instance "
- "info, cloud_unique_id="
- << cloud_unique_id << " , err=" << err;
+ auto [valid, id] =
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
+ if (!valid) {
+ LOG(WARNING) << "use degraded format cloud_unique_id, but
cloud_unique_id not degrade "
+ "format, cloud_unique_id="
+ << cloud_unique_id;
return "";
}
- // version: vec[0], instance_id: vec[1], unique_id: vec[2]
- switch (std::atoi(vec[0].c_str())) {
- case 1:
- // just return instance id;
- return vec[1];
- default:
- LOG(WARNING) << "cloud unique id degraded state, but version not
eq configure, "
+
+ // check instance_id valid by get fdb
+ if (config::enable_check_instance_id &&
!rc_mgr->is_instance_id_registered(id)) {
+ LOG(WARNING) << "use degraded format cloud_unique_id, but check
instance failed, "
"cloud_unique_id="
- << cloud_unique_id << ", err=" << err;
+ << cloud_unique_id;
return "";
}
+ return id;
}
- std::string instance_id;
- for (auto& i : nodes) {
- if (!instance_id.empty() && instance_id != i.instance_id) {
+ for (auto& node : nodes) {
+ if (!instance_id.empty() && instance_id != node.instance_id) {
LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, "
<< " cloud_unique_id=" << cloud_unique_id
<< " current_instance_id=" << instance_id
- << " later_instance_id=" << i.instance_id;
+ << " later_instance_id=" << node.instance_id;
+ }
+ instance_id = node.instance_id; // The last wins
+ // check cache unique_id
+ std::string cloud_unique_id_in_cache =
node.node_info.cloud_unique_id();
+ auto [valid, id] =
+
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache);
+ if (!valid) {
+ continue;
+ }
+
+ if (id != node.instance_id || id != instance_id) {
+ LOG(WARNING) << "in cache, node=" << node.node_info.DebugString()
+ << ", cloud_unique_id=" << cloud_unique_id
+ << " current_instance_id=" << instance_id
+ << ", later_instance_id=" << node.instance_id;
+ continue;
}
- instance_id = i.instance_id; // The last wins
}
+
return instance_id;
}
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 5d4a4d69227..92020005c3a 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -1943,6 +1943,16 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
instance_id = request->has_instance_id() ? request->instance_id() : "";
if (!cloud_unique_id.empty() && instance_id.empty()) {
+ auto [is_degraded_format, id] =
+
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
+ if (config::enable_check_instance_id && is_degraded_format &&
+ !resource_mgr_->is_instance_id_registered(id)) {
+ msg = "use degrade cloud_unique_id, but instance_id invalid,
cloud_unique_id=" +
+ cloud_unique_id;
+ LOG(WARNING) << msg;
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ return;
+ }
instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
@@ -1992,7 +2002,7 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::ADD_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg,
false);
if (msg != "") {
- LOG(INFO) << msg;
+ LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
@@ -2016,7 +2026,7 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DROP_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg,
false);
if (msg != "") {
- LOG(INFO) << msg;
+ LOG(WARNING) << msg;
break;
}
std::vector<NodeInfo> to_add;
@@ -2039,7 +2049,7 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::DECOMMISSION_NODE: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg,
false);
if (msg != "") {
- LOG(INFO) << msg;
+ LOG(WARNING) << msg;
break;
}
@@ -2101,7 +2111,7 @@ void
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
resource_mgr_->check_cluster_params_valid(request->cluster(), &msg,
false);
if (msg != "") {
- LOG(INFO) << msg;
+ LOG(WARNING) << msg;
break;
}
diff --git a/cloud/src/resource-manager/resource_manager.cpp
b/cloud/src/resource-manager/resource_manager.cpp
index 43f0a7368d8..9c37d781765 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -23,6 +23,7 @@
#include <sstream>
#include "common/logging.h"
+#include "common/string_util.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
@@ -159,6 +160,16 @@ bool ResourceManager::check_cluster_params_valid(const
ClusterPB& cluster, std::
int master_num = 0;
int follower_num = 0;
for (auto& n : cluster.nodes()) {
+ // check here cloud_unique_id
+ std::string cloud_unique_id = n.cloud_unique_id();
+ auto [is_degrade_format, instance_id] =
get_instance_id_by_cloud_unique_id(cloud_unique_id);
+ if (config::enable_check_instance_id && is_degrade_format &&
+ !is_instance_id_registered(instance_id)) {
+ ss << "node=" << n.DebugString()
+ << " cloud_unique_id use degrade format, but check instance
failed";
+ *err = ss.str();
+ return false;
+ }
if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() &&
n.edit_log_port() &&
n.has_node_type() &&
(n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
@@ -199,6 +210,27 @@ bool ResourceManager::check_cluster_params_valid(const
ClusterPB& cluster, std::
return no_err;
}
+std::pair<bool, std::string>
ResourceManager::get_instance_id_by_cloud_unique_id(
+ const std::string& cloud_unique_id) {
+ auto v = split(cloud_unique_id, ':');
+ if (v.size() != 3) return {false, ""};
+ // degraded format check it
+ int version = std::atoi(v[0].c_str());
+ if (version != 1) return {false, ""};
+ return {true, v[1]};
+}
+
+bool ResourceManager::is_instance_id_registered(const std::string&
instance_id) {
+ // check kv
+ auto [c0, m0] = get_instance(nullptr, instance_id, nullptr);
+ { TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); }
+ if (c0 != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to check instance instance_id=" << instance_id
+ << ", code=" << format_as(c0) << ", info=" + m0;
+ }
+ return c0 == TxnErrorCode::TXN_OK;
+}
+
std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const
std::string& instance_id,
const
ClusterInfo& cluster) {
std::string msg;
@@ -624,7 +656,7 @@ std::pair<TxnErrorCode, std::string>
ResourceManager::get_instance(std::shared_p
return ec;
}
- if (!inst_pb->ParseFromString(val)) {
+ if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
msg = "failed to parse InstanceInfoPB";
return ec;
diff --git a/cloud/src/resource-manager/resource_manager.h
b/cloud/src/resource-manager/resource_manager.h
index 5000764dee8..9e6f4548d24 100644
--- a/cloud/src/resource-manager/resource_manager.h
+++ b/cloud/src/resource-manager/resource_manager.h
@@ -114,6 +114,25 @@ public:
bool check_cluster_params_valid(const ClusterPB& cluster, std::string* err,
bool check_master_num);
+ /**
+ * Check cloud_unique_id is degraded format, and get instance_id from
cloud_unique_id
+ * degraded format : "${version}:${instance_id}:${unique_id}"
+ * @param degraded cloud_unique_id
+ *
+ * @return a <is_degraded_format, instance_id> pair, if is_degraded_format
== true , instance_id, if is_degraded_format == false, instance_id=""
+ */
+ static std::pair<bool, std::string> get_instance_id_by_cloud_unique_id(
+ const std::string& cloud_unique_id);
+
+ /**
+ * check instance_id is a valid instance, check by get fdb kv
+ *
+ * @param instance_id
+ *
+ * @return true, instance_id in fdb kv
+ */
+ bool is_instance_id_registered(const std::string& instance_id);
+
/**
* Refreshes the cache of given instance. This process removes the
instance in cache
* and then replaces it with persisted instance state read from underlying
KV storage.
diff --git a/cloud/test/fdb_injection_test.cpp
b/cloud/test/fdb_injection_test.cpp
index 125ae2b6b04..08ba3e50e52 100644
--- a/cloud/test/fdb_injection_test.cpp
+++ b/cloud/test/fdb_injection_test.cpp
@@ -70,6 +70,7 @@ int main(int argc, char** argv) {
cloud::config::txn_store_retry_base_intervals_ms = 1;
cloud::config::fdb_cluster_file_path = "fdb.cluster";
cloud::config::write_schema_kv = true;
+ cloud::config::enable_check_instance_id = false;
auto sp = SyncPoint::get_instance();
sp->enable_processing();
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index e6b9e9dddee..c67b49aac3f 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -56,6 +56,7 @@ int main(int argc, char** argv) {
config::enable_txn_store_retry = true;
config::txn_store_retry_base_intervals_ms = 1;
config::txn_store_retry_times = 20;
+ config::enable_check_instance_id = false;
if (!doris::cloud::init_glog("meta_service_test")) {
std::cerr << "failed to init glog" << std::endl;
@@ -264,6 +265,21 @@ TEST(MetaServiceTest, GetInstanceIdTest) {
"12345678901:ALBJLH4Q:m-n3qdpyal27rh8iprxx");
ASSERT_EQ(instance_id, "");
+ config::enable_check_instance_id = true;
+ auto ms = get_meta_service(false);
+ instance_id =
+ get_instance_id(ms->resource_mgr(),
"1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
+ ASSERT_EQ(instance_id, "");
+
+ sp->set_call_back("is_instance_id_registered", [&](auto&& args) {
+ TxnErrorCode* c0 = try_any_cast<TxnErrorCode*>(args[0]);
+ *c0 = TxnErrorCode::TXN_OK;
+ });
+ instance_id =
+ get_instance_id(ms->resource_mgr(),
"1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
+ ASSERT_EQ(instance_id, "ALBJLH4Q-check-invalid");
+ config::enable_check_instance_id = false;
+
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]