This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b3e4c75410bbd28c39a430331049f61f4e5bd6ba Author: slothever <[email protected]> AuthorDate: Sat Aug 26 00:16:00 2023 +0800 [fix](multi-catalog)fix hive table with cosn location issue (#23409) Sometimes, the partitions of a hive table may on different storage, eg, some is on HDFS, others on object storage(cos, etc). This PR mainly changes: 1. Fix the bug of accessing files via cosn. 2. Add a new field `fs_name` in TFileRangeDesc This is because, when accessing a file, the BE will get a hdfs client from hdfs client cache, and different file in one query request may have different fs name, eg, some of are `hdfs://`, some of are `cosn://`, so we need to specify fs name for each file, otherwise, it may return error: `reason: IllegalArgumentException: Wrong FS: cosn://doris-build-1308700295/xxxx, expected: hdfs://[172.xxxx:4007](http://172.xxxxx:4007/)` --- be/src/io/file_factory.cpp | 4 +- be/src/io/fs/benchmark/hdfs_benchmark.hpp | 6 +-- be/src/io/fs/fs_utils.h | 4 ++ be/src/io/fs/hdfs_file_reader.cpp | 5 +- be/src/io/fs/hdfs_file_system.cpp | 62 +++++++++++++--------- be/src/io/fs/hdfs_file_system.h | 2 +- be/src/io/fs/hdfs_file_writer.cpp | 12 ++--- be/src/io/hdfs_builder.cpp | 11 ++-- be/src/io/hdfs_builder.h | 13 ++--- be/src/runtime/snapshot_loader.cpp | 2 +- be/src/util/hdfs_util.cpp | 4 ++ be/src/util/hdfs_util.h | 5 ++ be/src/vec/exec/format/csv/csv_reader.cpp | 3 ++ be/src/vec/exec/format/json/new_json_reader.cpp | 4 ++ be/src/vec/exec/format/orc/vorc_reader.cpp | 3 ++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 3 ++ be/src/vec/exec/format/table/iceberg_reader.cpp | 2 + .../format/table/transactional_hive_reader.cpp | 2 + be/src/vec/runtime/vfile_result_writer.cpp | 3 +- fe/be-java-extensions/preload-extensions/pom.xml | 6 +++ .../doris/planner/external/FileQueryScanNode.java | 14 ++--- .../doris/planner/external/HiveScanNode.java | 2 +- .../planner/external/iceberg/IcebergScanNode.java | 2 +- .../planner/external/paimon/PaimonScanNode.java | 2 +- gensrc/thrift/PlanNodes.thrift | 3 ++ .../hive/test_mixed_par_locations.groovy | 9 ++-- 26 files changed, 118 insertions(+), 70 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index d46d2c5b4c..42d55d5fc0 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -97,7 +97,7 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, case TFileType::FILE_HDFS: { THdfsParams hdfs_params = parse_properties(properties); std::shared_ptr<io::HdfsFileSystem> fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(path, &file_writer)); break; } @@ -181,7 +181,7 @@ Status FileFactory::create_hdfs_reader(const THdfsParams& hdfs_params, std::shared_ptr<io::FileSystem>* hdfs_file_system, io::FileReaderSPtr* reader, RuntimeProfile* profile) { std::shared_ptr<io::HdfsFileSystem> fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", profile, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, fd.fs_name, profile, &fs)); RETURN_IF_ERROR(fs->open_file(fd, reader_options, reader)); *hdfs_file_system = std::move(fs); return Status::OK(); diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp index 2a6f97d5e8..8a5b6f240f 100644 --- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp +++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp @@ -96,7 +96,7 @@ public: std::shared_ptr<io::HdfsFileSystem> fs; io::FileWriterPtr writer; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &fs)); RETURN_IF_ERROR(fs->create_file(file_path, &writer)); return write(state, writer.get()); } @@ -117,7 +117,7 @@ public: auto new_file_path = file_path + "_new"; THdfsParams hdfs_params = parse_properties(_conf_map); std::shared_ptr<io::HdfsFileSystem> fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); RETURN_IF_ERROR(fs->rename(file_path, new_file_path)); @@ -144,7 +144,7 @@ public: std::shared_ptr<io::HdfsFileSystem> fs; THdfsParams hdfs_params = parse_properties(_conf_map); - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &fs)); auto start = std::chrono::high_resolution_clock::now(); bool res = false; diff --git a/be/src/io/fs/fs_utils.h b/be/src/io/fs/fs_utils.h index 64ad2c6ea8..2befc58a3e 100644 --- a/be/src/io/fs/fs_utils.h +++ b/be/src/io/fs/fs_utils.h @@ -46,6 +46,10 @@ struct FileDescription { // modification time of this file. // 0 means unset. int64_t mtime = 0; + // for hdfs, eg: hdfs://nameservices1/ + // because for a hive table, differenet partitions may have different + // locations(or fs), so different files may have different fs. + std::string fs_name; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 0f999dfc98..6c4f456e37 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -31,6 +31,7 @@ // #include "io/fs/hdfs_file_system.h" #include "service/backend_options.h" #include "util/doris_metrics.h" +#include "util/hdfs_util.h" namespace doris { namespace io { @@ -45,7 +46,7 @@ HdfsFileReader::HdfsFileReader(Path path, const std::string& name_node, DorisMetrics::instance()->hdfs_file_open_reading->increment(1); DorisMetrics::instance()->hdfs_file_reader_total->increment(1); - if (_profile != nullptr) { + if (_profile != nullptr && is_hdfs(_name_node)) { #ifdef USE_HADOOP_HDFS const char* hdfs_profile_name = "HdfsIO"; ADD_TIMER(_profile, hdfs_profile_name); @@ -76,7 +77,7 @@ Status HdfsFileReader::close() { bool expected = false; if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); - if (_profile != nullptr) { + if (_profile != nullptr && is_hdfs(_name_node)) { #ifdef USE_HADOOP_HDFS struct hdfsReadStatistics* hdfs_statistics = nullptr; auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 4473ea2a3a..9cca7fdad0 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -69,7 +69,8 @@ public: const HdfsFileSystemCache& operator=(const HdfsFileSystemCache&) = delete; // This function is thread-safe - Status get_connection(const THdfsParams& hdfs_params, HdfsFileSystemHandle** fs_handle); + Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name, + HdfsFileSystemHandle** fs_handle); private: std::mutex _lock; @@ -77,8 +78,9 @@ private: HdfsFileSystemCache() = default; - uint64 _hdfs_hash_code(const THdfsParams& hdfs_params); - Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* is_kerberos); + uint64 _hdfs_hash_code(const THdfsParams& hdfs_params, const std::string& fs_name); + Status _create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, hdfsFS* fs, + bool* is_kerberos); void _clean_invalid(); void _clean_oldest(); }; @@ -117,7 +119,7 @@ Status HdfsFileHandleCache::get_file(const std::shared_ptr<HdfsFileSystem>& fs, return Status::OK(); } -Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& path, +Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& fs_name, RuntimeProfile* profile, std::shared_ptr<HdfsFileSystem>* fs) { #ifdef USE_HADOOP_HDFS if (!config::enable_java_support) { @@ -126,17 +128,21 @@ Status HdfsFileSystem::create(const THdfsParams& hdfs_params, const std::string& "true."); } #endif - (*fs).reset(new HdfsFileSystem(hdfs_params, path, profile)); + (*fs).reset(new HdfsFileSystem(hdfs_params, fs_name, profile)); return (*fs)->connect(); } -HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, +HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& fs_name, RuntimeProfile* profile) - : RemoteFileSystem(path, "", FileSystemType::HDFS), + : RemoteFileSystem("", "", FileSystemType::HDFS), _hdfs_params(hdfs_params), _fs_handle(nullptr), _profile(profile) { - _namenode = _hdfs_params.fs_name; + if (_hdfs_params.__isset.fs_name) { + _fs_name = _hdfs_params.fs_name; + } else { + _fs_name = fs_name; + } } HdfsFileSystem::~HdfsFileSystem() { @@ -150,7 +156,8 @@ HdfsFileSystem::~HdfsFileSystem() { } Status HdfsFileSystem::connect_impl() { - RETURN_IF_ERROR(HdfsFileSystemCache::instance()->get_connection(_hdfs_params, &_fs_handle)); + RETURN_IF_ERROR( + HdfsFileSystemCache::instance()->get_connection(_hdfs_params, _fs_name, &_fs_handle)); if (!_fs_handle) { return Status::IOError("failed to init Hdfs handle with, please check hdfs params."); } @@ -165,20 +172,20 @@ Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) Status HdfsFileSystem::open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) { CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(abs_path, _namenode); + Path real_path = convert_path(abs_path, _fs_name); FileHandleCache::Accessor accessor; RETURN_IF_ERROR(HdfsFileHandleCache::instance()->get_file( std::static_pointer_cast<HdfsFileSystem>(shared_from_this()), real_path, fd.mtime, fd.file_size, &accessor)); - *reader = std::make_shared<HdfsFileReader>(abs_path, _namenode, std::move(accessor), _profile); + *reader = std::make_shared<HdfsFileReader>(abs_path, _fs_name, std::move(accessor), _profile); return Status::OK(); } Status HdfsFileSystem::create_directory_impl(const Path& dir, bool failed_if_exists) { CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(dir, _namenode); + Path real_path = convert_path(dir, _fs_name); int res = hdfsCreateDirectory(_fs_handle->hdfs_fs, real_path.string().c_str()); if (res == -1) { return Status::IOError("failed to create directory {}: {}", dir.native(), hdfs_error()); @@ -208,7 +215,7 @@ Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) { return Status::OK(); } CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(path, _namenode); + Path real_path = convert_path(path, _fs_name); int res = hdfsDelete(_fs_handle->hdfs_fs, real_path.string().c_str(), is_recursive); if (res == -1) { return Status::IOError("failed to delete directory {}: {}", path.native(), hdfs_error()); @@ -218,7 +225,7 @@ Status HdfsFileSystem::delete_internal(const Path& path, int is_recursive) { Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const { CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(path, _namenode); + Path real_path = convert_path(path, _fs_name); int is_exists = hdfsExists(_fs_handle->hdfs_fs, real_path.string().c_str()); #ifdef USE_HADOOP_HDFS // when calling hdfsExists() and return non-zero code, @@ -236,7 +243,7 @@ Status HdfsFileSystem::exists_impl(const Path& path, bool* res) const { Status HdfsFileSystem::file_size_impl(const Path& path, int64_t* file_size) const { CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(path, _namenode); + Path real_path = convert_path(path, _fs_name); hdfsFileInfo* file_info = hdfsGetPathInfo(_fs_handle->hdfs_fs, real_path.string().c_str()); if (file_info == nullptr) { return Status::IOError("failed to get file size of {}: {}", path.native(), hdfs_error()); @@ -254,7 +261,7 @@ Status HdfsFileSystem::list_impl(const Path& path, bool only_file, std::vector<F } CHECK_HDFS_HANDLE(_fs_handle); - Path real_path = convert_path(path, _namenode); + Path real_path = convert_path(path, _fs_name); int numEntries = 0; hdfsFileInfo* hdfs_file_info = hdfsListDirectory(_fs_handle->hdfs_fs, real_path.c_str(), &numEntries); @@ -278,8 +285,8 @@ Status HdfsFileSystem::list_impl(const Path& path, bool only_file, std::vector<F } Status HdfsFileSystem::rename_impl(const Path& orig_name, const Path& new_name) { - Path normal_orig_name = convert_path(orig_name, _namenode); - Path normal_new_name = convert_path(new_name, _namenode); + Path normal_orig_name = convert_path(orig_name, _fs_name); + Path normal_new_name = convert_path(new_name, _fs_name); int ret = hdfsRename(_fs_handle->hdfs_fs, normal_orig_name.c_str(), normal_new_name.c_str()); if (ret == 0) { LOG(INFO) << "finished to rename file. orig: " << normal_orig_name @@ -424,15 +431,14 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() { // ************* HdfsFileSystemCache ****************** int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; -Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, - bool* is_kerberos) { +Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, const std::string& fs_name, + hdfsFS* fs, bool* is_kerberos) { HDFSCommonBuilder builder; - RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder)); + RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder)); *is_kerberos = builder.is_need_kinit(); hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); if (hdfs_fs == nullptr) { - return Status::IOError("faield to connect to hdfs {}: {}", hdfs_params.fs_name, - hdfs_error()); + return Status::IOError("faield to connect to hdfs {}: {}", fs_name, hdfs_error()); } *fs = hdfs_fs; return Status::OK(); @@ -463,8 +469,9 @@ void HdfsFileSystemCache::_clean_oldest() { } Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, + const std::string& fs_name, HdfsFileSystemHandle** fs_handle) { - uint64 hash_code = _hdfs_hash_code(hdfs_params); + uint64 hash_code = _hdfs_hash_code(hdfs_params, fs_name); { std::lock_guard<std::mutex> l(_lock); auto it = _cache.find(hash_code); @@ -484,7 +491,7 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, // create a new one and try to put it into cache hdfsFS hdfs_fs = nullptr; bool is_kerberos = false; - RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos)); + RETURN_IF_ERROR(_create_fs(hdfs_params, fs_name, &hdfs_fs, &is_kerberos)); if (_cache.size() >= MAX_CACHE_HANDLE) { _clean_invalid(); _clean_oldest(); @@ -502,10 +509,13 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, return Status::OK(); } -uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params) { +uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params, + const std::string& fs_name) { uint64 hash_code = 0; if (hdfs_params.__isset.fs_name) { hash_code += Fingerprint(hdfs_params.fs_name); + } else { + hash_code += Fingerprint(fs_name); } if (hdfs_params.__isset.user) { hash_code += Fingerprint(hdfs_params.user); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index f365891aa9..229d236195 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -153,7 +153,7 @@ private: HdfsFileSystem(const THdfsParams& hdfs_params, const std::string& path, RuntimeProfile* profile); const THdfsParams& _hdfs_params; - std::string _namenode; + std::string _fs_name; // do not use std::shared_ptr or std::unique_ptr // _fs_handle is managed by HdfsFileSystemCache HdfsFileSystemHandle* _fs_handle; diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 5eaaa8a404..936defb2f6 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -58,7 +58,7 @@ Status HdfsFileWriter::close() { std::stringstream ss; ss << "failed to flush hdfs file. " << "(BE: " << BackendOptions::get_localhost() << ")" - << "namenode:" << _hdfs_fs->_namenode << " path:" << _path << ", err: " << hdfs_error(); + << "namenode:" << _hdfs_fs->_fs_name << " path:" << _path << ", err: " << hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } @@ -88,7 +88,7 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { hdfsWrite(_hdfs_fs->_fs_handle->hdfs_fs, _hdfs_file, p, left_bytes); if (written_bytes < 0) { return Status::InternalError("write hdfs failed. namenode: {}, path: {}, error: {}", - _hdfs_fs->_namenode, _path.native(), hdfs_error()); + _hdfs_fs->_fs_name, _path.native(), hdfs_error()); } left_bytes -= written_bytes; p += written_bytes; @@ -109,7 +109,7 @@ Status HdfsFileWriter::finalize() { } Status HdfsFileWriter::_open() { - _path = convert_path(_path, _hdfs_fs->_namenode); + _path = convert_path(_path, _hdfs_fs->_fs_name); std::string hdfs_dir = _path.parent_path().string(); int exists = hdfsExists(_hdfs_fs->_fs_handle->hdfs_fs, hdfs_dir.c_str()); if (exists != 0) { @@ -119,7 +119,7 @@ Status HdfsFileWriter::_open() { std::stringstream ss; ss << "create dir failed. " << "(BE: " << BackendOptions::get_localhost() << ")" - << " namenode: " << _hdfs_fs->_namenode << " path: " << hdfs_dir + << " namenode: " << _hdfs_fs->_fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); @@ -131,11 +131,11 @@ Status HdfsFileWriter::_open() { std::stringstream ss; ss << "open file failed. " << "(BE: " << BackendOptions::get_localhost() << ")" - << " namenode:" << _hdfs_fs->_namenode << " path:" << _path << ", err: " << hdfs_error(); + << " namenode:" << _hdfs_fs->_fs_name << " path:" << _path << ", err: " << hdfs_error(); LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } - VLOG_NOTICE << "open file. namenode:" << _hdfs_fs->_namenode << ", path:" << _path; + VLOG_NOTICE << "open file. namenode:" << _hdfs_fs->_fs_name << ", path:" << _path; return Status::OK(); } diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index b420c84e13..754eb61680 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -109,9 +109,10 @@ THdfsParams parse_properties(const std::map<std::string, std::string>& propertie return hdfsParams; } -Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder) { +Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name, + HDFSCommonBuilder* builder) { RETURN_IF_ERROR(builder->init_hdfs_builder()); - hdfsBuilderSetNameNode(builder->get(), hdfsParams.fs_name.c_str()); + hdfsBuilderSetNameNode(builder->get(), fs_name.c_str()); // set kerberos conf if (hdfsParams.__isset.hdfs_kerberos_principal) { builder->need_kinit = true; @@ -165,10 +166,10 @@ Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* build return Status::OK(); } -Status createHDFSBuilder(const std::map<std::string, std::string>& properties, - HDFSCommonBuilder* builder) { +Status create_hdfs_builder(const std::map<std::string, std::string>& properties, + HDFSCommonBuilder* builder) { THdfsParams hdfsParams = parse_properties(properties); - return createHDFSBuilder(hdfsParams, builder); + return create_hdfs_builder(hdfsParams, hdfsParams.fs_name, builder); } } // namespace doris diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h index d3f7c35017..159544fc58 100644 --- a/be/src/io/hdfs_builder.h +++ b/be/src/io/hdfs_builder.h @@ -36,9 +36,10 @@ const std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_"; class HDFSCommonBuilder { - friend Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder); - friend Status createHDFSBuilder(const std::map<std::string, std::string>& properties, - HDFSCommonBuilder* builder); + friend Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name, + HDFSCommonBuilder* builder); + friend Status create_hdfs_builder(const std::map<std::string, std::string>& properties, + HDFSCommonBuilder* builder); public: HDFSCommonBuilder() {} @@ -67,8 +68,8 @@ private: THdfsParams parse_properties(const std::map<std::string, std::string>& properties); -Status createHDFSBuilder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder); -Status createHDFSBuilder(const std::map<std::string, std::string>& properties, - HDFSCommonBuilder* builder); +Status create_hdfs_builder(const THdfsParams& hdfsParams, HDFSCommonBuilder* builder); +Status create_hdfs_builder(const std::map<std::string, std::string>& properties, + HDFSCommonBuilder* builder); } // namespace doris diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 3ff8229bc3..d52afb0b85 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -86,7 +86,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l } else if (TStorageBackendType::type::HDFS == type) { THdfsParams hdfs_params = parse_properties(_prop); std::shared_ptr<io::HdfsFileSystem> fs; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &fs)); + RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &fs)); _remote_fs = std::move(fs); } else if (TStorageBackendType::type::BROKER == type) { std::shared_ptr<io::BrokerFileSystem> fs; diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp index 643053292e..f82b51cb27 100644 --- a/be/src/util/hdfs_util.cpp +++ b/be/src/util/hdfs_util.cpp @@ -50,5 +50,9 @@ Path convert_path(const Path& path, const std::string& namenode) { return real_path; } +bool is_hdfs(const std::string& path_or_fs) { + return path_or_fs.rfind("hdfs://") == 0; +} + } // namespace io } // namespace doris diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h index 39dc8bc416..0e10cc578b 100644 --- a/be/src/util/hdfs_util.h +++ b/be/src/util/hdfs_util.h @@ -43,5 +43,10 @@ private: // path like hdfs://ip:port/path can't be used by libhdfs3. Path convert_path(const Path& path, const std::string& namenode); +std::string get_fs_name(const std::string& path); + +// return true if path_or_fs contains "hdfs://" +bool is_hdfs(const std::string& path_or_fs); + } // namespace io } // namespace doris diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 10b5ced968..6def26def0 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -233,6 +233,9 @@ void CsvReader::_init_file_description() { _file_description.path = _range.path; _file_description.start_offset = _range.start_offset; _file_description.file_size = _range.__isset.file_size ? _range.file_size : 0; + if (_range.__isset.fs_name) { + _file_description.fs_name = _range.fs_name; + } } Status CsvReader::init_reader(bool is_load) { diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index b02c30807d..5dbd8fa8bc 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -154,6 +154,10 @@ void NewJsonReader::_init_file_description() { _file_description.path = _range.path; _file_description.start_offset = _range.start_offset; _file_description.file_size = _range.__isset.file_size ? _range.file_size : 0; + + if (_range.__isset.fs_name) { + _file_description.fs_name = _range.fs_name; + } } Status NewJsonReader::init_reader( diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 6878cecfad..fe29180585 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -905,6 +905,9 @@ void OrcReader::_init_file_description() { _file_description.path = _scan_range.path; _file_description.start_offset = _scan_range.start_offset; _file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0; + if (_scan_range.__isset.fs_name) { + _file_description.fs_name = _scan_range.fs_name; + } } TypeDescriptor OrcReader::_convert_to_doris_type(const orc::Type* orc_type) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index f8dab141b7..e627a96e0d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -291,6 +291,9 @@ void ParquetReader::_init_file_description() { _file_description.path = _scan_range.path; _file_description.start_offset = _scan_range.start_offset; _file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : 0; + if (_scan_range.__isset.fs_name) { + _file_description.fs_name = _scan_range.fs_name; + } } Status ParquetReader::init_reader( diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 1b0f05cc95..95994723ae 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -261,6 +261,8 @@ Status IcebergTableReader::_position_delete( DeleteFile* delete_file_cache = _kv_cache->get< DeleteFile>(_delet_file_cache_key(delete_file.path), [&]() -> DeleteFile* { TFileRangeDesc delete_range; + // must use __set() method to make sure __isset is true + delete_range.__set_fs_name(_range.fs_name); delete_range.path = delete_file.path; delete_range.start_offset = 0; delete_range.size = -1; diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 79341f40aa..dbac1386d0 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -129,6 +129,8 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) { auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name); TFileRangeDesc delete_range; + // must use __set() method to make sure __isset is true + delete_range.__set_fs_name(_range.fs_name); delete_range.path = delete_file; delete_range.start_offset = 0; delete_range.size = -1; diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 0f5d5c416d..9d5fc4e158 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -599,7 +599,8 @@ Status VFileResultWriter::_delete_dir() { case TStorageBackendType::HDFS: { THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties); std::shared_ptr<io::HdfsFileSystem> hdfs_fs = nullptr; - RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", nullptr, &hdfs_fs)); + RETURN_IF_ERROR( + io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, nullptr, &hdfs_fs)); file_system = hdfs_fs; break; } diff --git a/fe/be-java-extensions/preload-extensions/pom.xml b/fe/be-java-extensions/preload-extensions/pom.xml index db8663212a..830ee1ea8f 100644 --- a/fe/be-java-extensions/preload-extensions/pom.xml +++ b/fe/be-java-extensions/preload-extensions/pom.xml @@ -214,6 +214,12 @@ under the License. <groupId>org.apache.doris</groupId> <artifactId>hive-catalog-shade</artifactId> </dependency> + <!-- For BE CosN Access --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-cos</artifactId> + <version>3.3.5</version> + </dependency> </dependencies> <build> diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index f5021c2608..40dba10c98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -73,6 +73,7 @@ import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -358,9 +359,8 @@ public abstract class FileQueryScanNode extends FileScanNode { Map<String, String> locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { if (!params.isSetHdfsParams()) { - String fsName = getFsName(fileSplit); THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); + // tHdfsParams.setFsName(getFsName(fileSplit)); params.setHdfsParams(tHdfsParams); } @@ -413,14 +413,10 @@ public abstract class FileQueryScanNode extends FileScanNode { rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); rangeDesc.setFileType(locationType); + rangeDesc.setPath(fileSplit.getPath().toString()); if (locationType == TFileType.FILE_HDFS) { - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (locationType == TFileType.FILE_S3 - || locationType == TFileType.FILE_BROKER - || locationType == TFileType.FILE_LOCAL - || locationType == TFileType.FILE_NET) { - // need full path - rangeDesc.setPath(fileSplit.getPath().toString()); + URI fileUri = fileSplit.getPath().toUri(); + rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority()); } rangeDesc.setModificationTime(fileSplit.getModificationTime()); return rangeDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index d3e80ba4d9..1209be1cb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -336,7 +336,7 @@ public class HiveScanNode extends FileQueryScanNode { @Override protected Map<String, String> getLocationProperties() throws UserException { - return hmsTable.getCatalogProperties(); + return hmsTable.getHadoopProperties(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 9dcd1716be..4837ba5545 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -362,7 +362,7 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public Map<String, String> getLocationProperties() throws UserException { - return source.getCatalog().getProperties(); + return source.getCatalog().getCatalogProperty().getHadoopProperties(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java index be3b04fb33..a68d5edcb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -174,7 +174,7 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException { - return source.getCatalog().getProperties(); + return source.getCatalog().getCatalogProperty().getHadoopProperties(); } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 6be2a508f6..bb670c7063 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -403,6 +403,9 @@ struct TFileRangeDesc { 9: optional i64 modification_time 10: optional Types.TFileType file_type; 11: optional TFileCompressType compress_type; + // for hive table, different files may have different fs, + // so fs_name should be with TFileRangeDesc + 12: optional string fs_name } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. diff --git a/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy b/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy index ec092f99e7..ca8278bfdc 100644 --- a/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy +++ b/regression-test/suites/external_table_p2/hive/test_mixed_par_locations.groovy @@ -29,7 +29,7 @@ suite("test_mixed_par_locations", "p2") { String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") String extAk = context.config.otherConfigs.get("extAk"); String extSk = context.config.otherConfigs.get("extSk"); - String ext3Endpoint = context.config.otherConfigs.get("ext3Endpoint"); + String extS3Endpoint = context.config.otherConfigs.get("extS3Endpoint"); String extS3Region = context.config.otherConfigs.get("extS3Region"); String catalog_name = "test_mixed_par_locations" @@ -38,10 +38,9 @@ suite("test_mixed_par_locations", "p2") { create catalog if not exists ${catalog_name} properties ( 'type'='hms', 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}', - 'AWS_ACCESS_KEY' = "${extAk}", - 'AWS_SECRET_KEY' = "${extSk}", - 'AWS_ENDPOINT' = "${ext3Endpoint}", - 'AWS_REGION' = "${extS3Region}" + 'cos.access_key' = '${extAk}', + 'cos.secret_key' = '${extSk}', + 'cos.endpoint' = '${extS3Endpoint}' ); """ logger.info("catalog " + catalog_name + " created") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
