This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8c850d8250a [enhance](cloud) Validate and normalize hdfs conf when
adding new vault (#33255)
8c850d8250a is described below
commit 8c850d8250a09f1f6a8314127abe304511c0bdcc
Author: plat1ko <[email protected]>
AuthorDate: Thu Apr 4 20:17:06 2024 +0800
[enhance](cloud) Validate and normalize hdfs conf when adding new vault
(#33255)
---
cloud/src/meta-service/meta_service_resource.cpp | 72 ++++++++++++++++++++++++
cloud/src/recycler/meta_checker.cpp | 6 +-
cloud/test/meta_service_test.cpp | 37 ++++++++++++
3 files changed, 112 insertions(+), 3 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 88aefdcf0d0..3465c765595 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -308,6 +308,61 @@ static std::string next_available_vault_id(const
InstanceInfoPB& instance) {
return std::to_string(prev + 1);
}
+namespace detail {
+
+// Removes any trailing `c` in `str`
+void strip_trailing(std::string& str, char c) {
+ size_t end = str.find_last_not_of(c);
+ if (end == std::string::npos) {
+ str = "";
+ } else {
+ str.resize(end + 1);
+ }
+}
+
+// Removes any leading `c` in `str`
+void strip_leading(std::string& str, char c) {
+ size_t start = str.find_first_not_of(c);
+ if (start == std::string::npos) {
+ str = "";
+ } else if (start > 0) {
+ str = str.substr(start);
+ }
+}
+
+// Validate and normalize hdfs prefix. Return true if prefix is valid.
+bool normalize_hdfs_prefix(std::string& prefix) {
+ if (prefix.empty()) {
+ return true;
+ }
+
+ if (prefix.find("://") != std::string::npos) {
+ // Should not contain scheme
+ return false;
+ }
+
+ strip_trailing(prefix, ' ');
+ strip_leading(prefix, ' ');
+ strip_trailing(prefix, '/');
+ return true;
+}
+
+// Validate and normalize hdfs fs_name. Return true if fs_name is valid.
+bool normalize_hdfs_fs_name(std::string& fs_name) {
+ if (fs_name.empty()) {
+ return false;
+ }
+
+ // Should check scheme existence?
+
+ strip_trailing(fs_name, ' ');
+ strip_leading(fs_name, ' ');
+ strip_trailing(fs_name, '/');
+ return !fs_name.empty();
+}
+
+} // namespace detail
+
static int add_hdfs_storage_vault(InstanceInfoPB& instance, Transaction* txn,
StorageVaultPB hdfs_param, MetaServiceCode&
code,
std::string& msg) {
@@ -323,6 +378,23 @@ static int add_hdfs_storage_vault(InstanceInfoPB&
instance, Transaction* txn,
msg = fmt::format("vault_name={} already created", hdfs_param.name());
return -1;
}
+
+ using namespace detail;
+ // Check and normalize hdfs conf
+ auto* prefix = hdfs_param.mutable_hdfs_info()->mutable_prefix();
+ if (!normalize_hdfs_prefix(*prefix)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("invalid prefix: ", *prefix);
+ return -1;
+ }
+
+ auto* fs_name =
hdfs_param.mutable_hdfs_info()->mutable_build_conf()->mutable_fs_name();
+ if (!normalize_hdfs_fs_name(*fs_name)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("invalid fs_name: ", *fs_name);
+ return -1;
+ }
+
std::string key;
std::string vault_id = next_available_vault_id(instance);
storage_vault_key({instance.instance_id(), vault_id}, &key);
diff --git a/cloud/src/recycler/meta_checker.cpp
b/cloud/src/recycler/meta_checker.cpp
index 00a27aa9a29..ed54de88b40 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -171,7 +171,7 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int num_row = mysql_num_rows(result);
for (int i = 0; i < num_row; ++i) {
MYSQL_ROW row = mysql_fetch_row(result);
- TabletInfo tablet_info = { 0 };
+ TabletInfo tablet_info = {0};
tablet_info.tablet_id = atoll(row[0]);
tablet_info.schema_version = atoll(row[4]);
tablets.push_back(std::move(tablet_info));
@@ -197,7 +197,7 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
tablet_info.partition_id = atoll(row[6]);
tablet_info.index_id = atoll(row[7]);
- PartitionInfo partition_info = { 0 };
+ PartitionInfo partition_info = {0};
partition_info.db_id = atoll(row[4]);
partition_info.table_id = atoll(row[5]);
partition_info.partition_id = atoll(row[6]);
@@ -354,7 +354,7 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int64_t db_id = elem.second.db_id;
int64_t table_id = elem.second.table_id;
int64_t partition_id = elem.second.partition_id;
- std::string ver_key = version_key({instance_id_, db_id, table_id,
partition_id});
+ std::string ver_key = partition_version_key({instance_id_, db_id,
table_id, partition_id});
std::string ver_val;
err = txn->get(ver_key, &ver_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 71e03afe2a7..93d4ce830f8 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -5080,6 +5080,35 @@ TEST(MetaServiceTest, LegacyUpdateAkSkTest) {
SyncPoint::get_instance()->clear_all_call_backs();
}
+namespace detail {
+bool normalize_hdfs_prefix(std::string& prefix);
+bool normalize_hdfs_fs_name(std::string& fs_name);
+} // namespace detail
+
+TEST(MetaServiceTest, NormalizeHdfsConfTest) {
+ using namespace detail;
+ std::string prefix = "hdfs://127.0.0.1:8020/test";
+ EXPECT_FALSE(normalize_hdfs_prefix(prefix));
+ prefix = "test";
+ EXPECT_TRUE(normalize_hdfs_prefix(prefix));
+ EXPECT_EQ(prefix, "test");
+ prefix = " test ";
+ EXPECT_TRUE(normalize_hdfs_prefix(prefix));
+ EXPECT_EQ(prefix, "test");
+ prefix = " /test// ";
+ EXPECT_TRUE(normalize_hdfs_prefix(prefix));
+ EXPECT_EQ(prefix, "/test");
+ prefix = "/";
+ EXPECT_TRUE(normalize_hdfs_prefix(prefix));
+ EXPECT_EQ(prefix, "");
+
+ std::string fs_name;
+ EXPECT_FALSE(normalize_hdfs_fs_name(prefix));
+ fs_name = " hdfs://127.0.0.1:8020/ ";
+ EXPECT_TRUE(normalize_hdfs_fs_name(fs_name));
+ EXPECT_EQ(fs_name, "hdfs://127.0.0.1:8020");
+}
+
TEST(MetaServiceTest, AddHdfsInfoTest) {
auto meta_service = get_meta_service();
@@ -5148,9 +5177,16 @@ TEST(MetaServiceTest, AddHdfsInfoTest) {
brpc::Controller cntl;
AlterObjStoreInfoResponse res;
+ meta_service->alter_obj_store_info(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
+ // Invalid fs name
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) <<
res.status().msg();
+
req.mutable_hdfs()->mutable_hdfs_info()->mutable_build_conf()->set_fs_name(
+ "hdfs://ip:port");
meta_service->alter_obj_store_info(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
+
InstanceInfoPB instance;
get_test_instance(instance);
ASSERT_EQ(*(instance.resource_ids().begin()), "2");
@@ -5165,6 +5201,7 @@ TEST(MetaServiceTest, AddHdfsInfoTest) {
StorageVaultPB hdfs;
hdfs.set_name("test_alter_add_hdfs_info_1");
HdfsVaultInfo params;
+ params.mutable_build_conf()->set_fs_name("hdfs://ip:port");
hdfs.mutable_hdfs_info()->CopyFrom(params);
req.mutable_hdfs()->CopyFrom(hdfs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]