This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c84463fe99a [feature](cloud) Support path prefix for HDFS storage
vault. (#32681)
c84463fe99a is described below
commit c84463fe99aa8faf2165816b67e906d06d31f5d1
Author: Shuo Wang <[email protected]>
AuthorDate: Wed Mar 27 14:22:53 2024 +0800
[feature](cloud) Support path prefix for HDFS storage vault. (#32681)
---
be/src/cloud/cloud_meta_mgr.cpp | 16 ++------
be/src/cloud/cloud_meta_mgr.h | 5 ++-
be/src/cloud/cloud_storage_engine.cpp | 17 +++++----
be/src/io/hdfs_util.cpp | 26 +++++++++++++
be/src/io/hdfs_util.h | 6 +++
be/src/olap/storage_policy.cpp | 2 +-
be/src/runtime/tablets_channel.cpp | 1 +
.../org/apache/doris/catalog/HdfsStorageVault.java | 43 +++++++++++-----------
.../org/apache/doris/catalog/StorageVault.java | 9 +----
9 files changed, 72 insertions(+), 53 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index bfd15a3f24b..c636d92ed82 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -801,8 +801,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext&
ctx) {
return retry_rpc("precommit txn", req, &res,
&MetaService_Stub::precommit_txn);
}
-Status CloudMetaMgr::get_storage_vault_info(
- std::vector<std::tuple<std::string, std::variant<S3Conf,
THdfsParams>>>* vault_infos) {
+Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
GetObjStoreInfoRequest req;
GetObjStoreInfoResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
@@ -826,18 +825,9 @@ Status CloudMetaMgr::get_storage_vault_info(
});
}
for (const auto& vault : resp.storage_vault()) {
- THdfsParams params;
- params.fs_name = vault.hdfs_info().build_conf().fs_name();
- params.user = vault.hdfs_info().build_conf().user();
- params.hdfs_kerberos_keytab =
vault.hdfs_info().build_conf().hdfs_kerberos_keytab();
- params.hdfs_kerberos_principal =
vault.hdfs_info().build_conf().hdfs_kerberos_principal();
- for (const auto& confs : vault.hdfs_info().build_conf().hdfs_confs()) {
- THdfsConf conf;
- conf.key = confs.key();
- conf.value = confs.value();
- params.hdfs_conf.emplace_back(std::move(conf));
+ if (vault.has_hdfs_info()) {
+ vault_infos->emplace_back(vault.id(), vault.hdfs_info());
}
- vault_infos->emplace_back(vault.id(), std::move(params));
}
return Status::OK();
}
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index ede347af81c..909df671c3a 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -43,6 +43,8 @@ class TabletJobInfoPB;
class TabletStatsPB;
class TabletIndexPB;
+using StorageVaultInfos = std::vector<std::tuple<std::string,
std::variant<S3Conf, HdfsVaultInfo>>>;
+
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks,
int concurrency);
class CloudMetaMgr {
@@ -70,8 +72,7 @@ public:
Status precommit_txn(const StreamLoadContext& ctx);
- Status get_storage_vault_info(
- std::vector<std::tuple<std::string, std::variant<S3Conf,
THdfsParams>>>* vault_infos);
+ Status get_storage_vault_info(StorageVaultInfos* vault_infos);
Status prepare_tablet_job(const TabletJobInfoPB& job,
StartTabletJobResponse* res);
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index efff5a26adb..fd44adbc959 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -18,6 +18,7 @@
#include "cloud/cloud_storage_engine.h"
#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/cloud.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
@@ -34,6 +35,7 @@
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
+#include "io/hdfs_util.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/memtable_flush_executor.h"
#include "olap/storage_policy.h"
@@ -82,9 +84,11 @@ struct VaultCreateFSVisitor {
}
// TODO(ByteYue): Make sure enable_java_support is on
- Status operator()(const THdfsParams& hdfs_params) const {
- auto fs = DORIS_TRY(
- io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name,
id, nullptr));
+ Status operator()(const cloud::HdfsVaultInfo& vault) const {
+ auto hdfs_params = io::to_hdfs_params(vault);
+ auto fs =
+ DORIS_TRY(io::HdfsFileSystem::create(hdfs_params,
hdfs_params.fs_name, id, nullptr,
+ vault.has_prefix() ?
vault.prefix() : ""));
put_storage_resource(id, {std::move(fs), 0});
LOG_INFO("successfully create hdfs vault, vault id {}", id);
return Status::OK();
@@ -107,7 +111,7 @@ struct RefreshFSVaultVisitor {
return st;
}
- Status operator()(const THdfsParams& hdfs_params) const {
+ Status operator()(const cloud::HdfsVaultInfo& vault_info) const {
// TODO(ByteYue): Implmente the hdfs fs refresh logic
return Status::OK();
}
@@ -117,7 +121,7 @@ struct RefreshFSVaultVisitor {
};
Status CloudStorageEngine::open() {
- std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>>
vault_infos;
+ cloud::StorageVaultInfos vault_infos;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (st.ok()) {
@@ -133,7 +137,6 @@ Status CloudStorageEngine::open() {
for (auto& [id, vault_info] : vault_infos) {
RETURN_IF_ERROR(std::visit(VaultCreateFSVisitor {id}, vault_info));
}
-
set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
// TODO(plat1ko): DeleteBitmapTxnManager
@@ -239,7 +242,7 @@ Status CloudStorageEngine::start_bg_threads() {
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::refresh_s3_info_interval_s))) {
- std::vector<std::tuple<std::string, std::variant<S3Conf,
THdfsParams>>> vault_infos;
+ cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp
index d6b491c8db9..21a9140ad4a 100644
--- a/be/src/io/hdfs_util.cpp
+++ b/be/src/io/hdfs_util.cpp
@@ -17,6 +17,8 @@
#include "io/hdfs_util.h"
+#include <gen_cpp/cloud.pb.h>
+
#include <ostream>
#include "common/logging.h"
@@ -140,4 +142,28 @@ bool is_hdfs(const std::string& path_or_fs) {
return path_or_fs.rfind("hdfs://") == 0;
}
+THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) {
+ THdfsParams params;
+ auto build_conf = vault.build_conf();
+ params.__set_fs_name(build_conf.fs_name());
+ if (build_conf.has_user()) {
+ params.__set_user(build_conf.user());
+ }
+ if (build_conf.has_hdfs_kerberos_principal()) {
+
params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_principal());
+ }
+ if (build_conf.has_hdfs_kerberos_keytab()) {
+
params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_keytab());
+ }
+ std::vector<THdfsConf> tconfs;
+ for (const auto& confs : vault.build_conf().hdfs_confs()) {
+ THdfsConf conf;
+ conf.__set_key(confs.key());
+ conf.__set_value(confs.value());
+ tconfs.emplace_back(conf);
+ }
+ params.__set_hdfs_conf(tconfs);
+ return params;
+}
+
} // namespace doris::io
diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h
index ccb4c9458ad..f1b236887d5 100644
--- a/be/src/io/hdfs_util.h
+++ b/be/src/io/hdfs_util.h
@@ -27,6 +27,10 @@
#include "io/fs/hdfs.h"
#include "io/fs/path.h"
+namespace cloud {
+class HdfsVaultInfo;
+}
+
namespace doris {
class HDFSCommonBuilder;
class THdfsParams;
@@ -125,5 +129,7 @@ std::string get_fs_name(const std::string& path);
// return true if path_or_fs contains "hdfs://"
bool is_hdfs(const std::string& path_or_fs);
+THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault);
+
} // namespace io
} // namespace doris
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index 4192610e2f0..6c08f7c49f8 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -43,7 +43,7 @@ Status get_remote_file_system(int64_t storage_policy_id,
auto resource = get_storage_resource(storage_policy->resource_id);
*fs = resource.fs;
if (*fs == nullptr) {
- return Status::NotFound<false>("could not find resource,
resouce_id={}",
+ return Status::NotFound<false>("could not find resource,
resource_id={}",
storage_policy->resource_id);
}
return Status::OK();
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 38eb11e814b..7007f097e69 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -466,6 +466,7 @@ Status BaseTabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& req
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = request.write_file_cache(),
+ .storage_vault_id = request.storage_vault_id(),
};
auto delta_writer = create_delta_writer(wrequest);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
index f1a50102eb6..6332acd04b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
@@ -21,12 +21,14 @@ import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.Set;
/**
* HDFS resource
@@ -37,21 +39,31 @@ import java.util.Map;
* (
* "type" = "hdfs",
* "fs.defaultFS" = "hdfs://10.220.147.151:8020",
- * "fs.prefix" = "",
+ * "path_prefix" = "/path/to/data",
* "hadoop.username" = "root"
* );
*/
public class HdfsStorageVault extends StorageVault {
private static final Logger LOG =
LogManager.getLogger(HdfsStorageVault.class);
+
+ public static final String VAULT_TYPE = "type";
public static final String HADOOP_FS_PREFIX = "dfs.";
public static String HADOOP_FS_NAME = "fs.defaultFS";
- public static String HADOOP_PREFIX = "fs.prefix";
+ public static String VAULT_PATH_PREFIX = "path_prefix";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
public static final String HDFS_PREFIX = "hdfs:";
public static final String HDFS_FILE_PREFIX = "hdfs://";
+ /**
+ * Property keys used by Doris, and should not be put in HDFS client
configs,
+ * such as `type`, `path_prefix`, etc.
+ */
+ private static final Set<String> nonHdfsConfPropertyKeys =
ImmutableSet.of(VAULT_TYPE, VAULT_PATH_PREFIX)
+ .stream().map(String::toLowerCase)
+ .collect(ImmutableSet.toImmutableSet());
+
@SerializedName(value = "properties")
private Map<String, String> properties;
@@ -67,26 +79,11 @@ public class HdfsStorageVault extends StorageVault {
}
}
- @Override
- protected void setProperties(Map<String, String> properties) throws
DdlException {
- // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should
be both set to enable short circuit read.
- // We should disable short circuit read if they are not both set
because it will cause performance down.
- if (!(enableShortCircuitRead(properties))) {
- properties.put(HADOOP_SHORT_CIRCUIT, "false");
- }
- this.properties = properties;
- }
-
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}
- public static boolean enableShortCircuitRead(Map<String, String>
properties) {
- return
"true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false"))
- && properties.containsKey(HADOOP_SOCKET_PATH);
- }
-
public static Cloud.HdfsVaultInfo generateHdfsParam(Map<String, String>
properties) {
Cloud.HdfsVaultInfo.Builder hdfsVaultInfoBuilder =
Cloud.HdfsVaultInfo.newBuilder();
@@ -94,7 +91,7 @@ public class HdfsStorageVault extends StorageVault {
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
hdfsConfBuilder.setFsName(property.getValue());
- } else if (property.getKey().equalsIgnoreCase(HADOOP_PREFIX)) {
+ } else if (property.getKey().equalsIgnoreCase(VAULT_PATH_PREFIX)) {
hdfsVaultInfoBuilder.setPrefix(property.getValue());
} else if
(property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) {
hdfsConfBuilder.setUser(property.getValue());
@@ -103,10 +100,12 @@ public class HdfsStorageVault extends StorageVault {
} else if
(property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB))
{
hdfsConfBuilder.setHdfsKerberosKeytab(property.getValue());
} else {
- Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf =
Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder();
- conf.setKey(property.getKey());
- conf.setValue(property.getValue());
- hdfsConfBuilder.addHdfsConfs(conf.build());
+ if
(!nonHdfsConfPropertyKeys.contains(property.getKey().toLowerCase())) {
+ Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf =
Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder();
+ conf.setKey(property.getKey());
+ conf.setValue(property.getValue());
+ hdfsConfBuilder.addHdfsConfs(conf.build());
+ }
}
}
return
hdfsVaultInfoBuilder.setBuildConf(hdfsConfBuilder.build()).build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
index 98f35ac1695..f6e0525d17b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
@@ -84,9 +84,7 @@ public abstract class StorageVault {
}
public static StorageVault fromStmt(CreateStorageVaultStmt stmt) throws
DdlException {
- StorageVault storageVault = getStorageVaultInstance(stmt);
- storageVault.setProperties(stmt.getProperties());
- return storageVault;
+ return getStorageVaultInstance(stmt);
}
public boolean ifNotExists() {
@@ -153,10 +151,5 @@ public abstract class StorageVault {
}
}
- /**
- * Set and check the properties in child resources
- */
- protected abstract void setProperties(Map<String, String> properties)
throws DdlException;
-
public abstract Map<String, String> getCopiedProperties();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]