This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 872ddba92f565817ef77f45a0bb89a00872e32d2 Author: AlexYue <[email protected]> AuthorDate: Fri Apr 19 10:46:52 2024 +0800 [feature](Cloud) Enable write into file cache for hdfs writer (#33796) --- be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/io/cache/file_cache_common.cpp | 11 ++ be/src/io/cache/file_cache_common.h | 12 ++ be/src/io/fs/hdfs_file_system.cpp | 4 +- be/src/io/fs/hdfs_file_writer.cpp | 199 ++++++++++++++++++++++++++++------ be/src/io/fs/hdfs_file_writer.h | 35 +++++- be/src/io/hdfs_util.cpp | 11 ++ be/src/io/hdfs_util.h | 12 ++ 9 files changed, 245 insertions(+), 41 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 326d9cfc555..988cf3b53d6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1028,6 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880"); // The timeout config for S3 buffer allocation DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); +DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB //disable shrink memory by default DEFINE_mBool(enable_shrink_memory, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 191ebcc4f3b..10228fbfbd7 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1074,6 +1074,7 @@ DECLARE_mInt64(s3_write_buffer_size); DECLARE_mInt32(s3_writer_buffer_allocation_timeout); // the max number of cached file handle for block segemnt DECLARE_mInt64(file_cache_max_file_reader_cache_size); +DECLARE_mInt64(hdfs_write_batch_buffer_size_mb); //enable shrink memory DECLARE_mBool(enable_shrink_memory); // enable cache for high concurrent point query work load diff --git a/be/src/io/cache/file_cache_common.cpp b/be/src/io/cache/file_cache_common.cpp index 364f38c3a99..3ce647b4a0d 100644 --- a/be/src/io/cache/file_cache_common.cpp +++ b/be/src/io/cache/file_cache_common.cpp @@ -21,6 +21,7 @@ #include "io/cache/file_cache_common.h" #include "common/config.h" +#include "io/cache/block_file_cache.h" #include "vec/common/hex.h" namespace doris::io { @@ -55,4 +56,14 @@ std::string UInt128Wrapper::to_string() const { return vectorized::get_hex_uint_lowercase(value_); } +FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t offset, + size_t size) const { + CacheContext ctx; + ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : FileCacheType::TTL; + ctx.expiration_time = _expiration_time; + ctx.is_cold_data = _is_cold_data; + auto holder = _cache->get_or_set(_cache_hash, offset, size, ctx); + return std::make_unique<FileBlocksHolder>(std::move(holder)); +} + } // namespace doris::io diff --git a/be/src/io/cache/file_cache_common.h b/be/src/io/cache/file_cache_common.h index 3e293ac221c..21ee4dfc8b0 100644 --- a/be/src/io/cache/file_cache_common.h +++ b/be/src/io/cache/file_cache_common.h @@ -50,6 +50,18 @@ struct UInt128Wrapper { bool operator==(const UInt128Wrapper& other) const { return value_ == other.value_; } }; +class BlockFileCache; +struct FileBlocksHolder; +using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>; + +struct FileCacheAllocatorBuilder { + bool _is_cold_data; + uint64_t _expiration_time; + UInt128Wrapper _cache_hash; + BlockFileCache* _cache; // Only one ref, the lifetime is owned by FileCache + FileBlocksHolderPtr allocate_cache_holder(size_t offset, size_t size) const; +}; + struct KeyHash { std::size_t operator()(const UInt128Wrapper& w) const { return UInt128Hash()(w.value_); } }; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 4ab32920c8c..f86b3ef588a 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -106,9 +106,9 @@ Status HdfsFileSystem::init() { } Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, - const FileWriterOptions*) { + const FileWriterOptions* opts) { _fs_handle->inc_ref(); - auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name); + auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts); if (res.has_value()) { *writer = std::move(res).value(); return Status::OK(); diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 46f5a626e42..b49ceac6bbf 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -18,6 +18,7 @@ #include "io/fs/hdfs_file_writer.h" #include <fcntl.h> +#include <fmt/core.h> #include <filesystem> #include <ostream> @@ -26,23 +27,46 @@ #include "common/logging.h" #include "common/status.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/file_cache_common.h" #include "io/fs/err_utils.h" #include "io/fs/hdfs_file_system.h" #include "io/hdfs_util.h" #include "service/backend_options.h" +#include "util/bvar_helper.h" namespace doris::io { +bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num"); +bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written"); +bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); +bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); + +static constexpr size_t MB = 1024 * 1024; + HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) : _path(std::move(path)), _hdfs_handler(handler), _hdfs_file(hdfs_file), _fs_name(std::move(fs_name)), - _sync_file_data(opts ? opts->sync_file_data : true) {} + _sync_file_data(opts ? opts->sync_file_data : true), + _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) { + if (config::enable_file_cache && opts != nullptr && opts->write_file_cache) { + _cache_builder = std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder { + opts ? opts->is_cold_data : false, opts ? opts->file_cache_expiration : 0, + BlockFileCache::hash(_path.filename().native()), + FileCacheFactory::instance()->get_by_path( + BlockFileCache::hash(_path.filename().native()))}); + } + hdfs_file_writer_total << 1; + hdfs_file_being_written << 1; +} HdfsFileWriter::~HdfsFileWriter() { if (_hdfs_file) { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); } @@ -51,6 +75,7 @@ HdfsFileWriter::~HdfsFileWriter() { } else { delete _hdfs_handler; } + hdfs_file_being_written << -1; } Status HdfsFileWriter::close() { @@ -58,29 +83,150 @@ Status HdfsFileWriter::close() { return Status::OK(); } _closed = true; - + if (_batch_buffer.size() != 0) { + RETURN_IF_ERROR(_flush_buffer()); + } + int ret; if (_sync_file_data) { + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); #ifdef USE_LIBHDFS3 - int ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file); + ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file); #else - int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); #endif + } + if (ret != 0) { return Status::InternalError("failed to sync hdfs file. fs_name={} path={} : {}", _fs_name, _path.native(), hdfs_error()); } } - // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for - // the HDFS response, but won't guarantee the synchronization of data to HDFS. - int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for + // the HDFS response, but won't guarantee the synchronization of data to HDFS. + ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + } _hdfs_file = nullptr; if (ret != 0) { return Status::InternalError( "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error()); } + hdfs_file_created_total << 1; + return Status::OK(); +} + +HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) { + _batch_buffer.reserve(capacity); +} + +bool HdfsFileWriter::BatchBuffer::full() const { + return size() == capacity(); +} + +const char* HdfsFileWriter::BatchBuffer::data() const { + return _batch_buffer.data(); +} + +size_t HdfsFileWriter::BatchBuffer::capacity() const { + return _batch_buffer.capacity(); +} + +size_t HdfsFileWriter::BatchBuffer::size() const { + return _batch_buffer.size(); +} +void HdfsFileWriter::BatchBuffer::clear() { + _batch_buffer.clear(); +} + +// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code +void HdfsFileWriter::_write_into_local_file_cache() { + auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - _batch_buffer.size(), + _batch_buffer.capacity()); + size_t pos = 0; + size_t data_remain_size = _batch_buffer.size(); + for (auto& block : holder->file_blocks) { + if (data_remain_size == 0) { + break; + } + size_t block_size = block->range().size(); + size_t append_size = std::min(data_remain_size, block_size); + if (block->state() == FileBlock::State::EMPTY) { + if (_index_offset != 0 && block->range().right >= _index_offset) { + static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX)); + } + block->get_or_set_downloader(); + if (block->is_downloader()) { + Slice s(_batch_buffer.data() + pos, append_size); + Status st = block->append(s); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_WARNING("failed to append data to file cache").error(st); + } + } + } + data_remain_size -= append_size; + pos += append_size; + } +} + +Status HdfsFileWriter::append_hdfs_file(std::string_view content) { + while (!content.empty()) { + int64_t written_bytes; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); + written_bytes = + hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), content.size()); + } + if (written_bytes < 0) { + return Status::InternalError("write hdfs failed. fs_name: {}, path: {}, error: {}", + _fs_name, _path.native(), hdfs_error()); + } + hdfs_bytes_written_total << written_bytes; + content.remove_prefix(written_bytes); + } + return Status::OK(); +} + +Status HdfsFileWriter::_flush_buffer() { + RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content())); + if (_cache_builder != nullptr) { + _write_into_local_file_cache(); + } + _batch_buffer.clear(); + return Status::OK(); +} + +size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) { + size_t append_size = std::min(capacity() - size(), content.size()); + _batch_buffer.append(content.data(), append_size); + return append_size; +} + +std::string_view HdfsFileWriter::BatchBuffer::content() const { + return _batch_buffer; +} + +Status HdfsFileWriter::_append(std::string_view content) { + while (!content.empty()) { + if (_batch_buffer.full()) { + auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}", + _batch_buffer.capacity(), _batch_buffer.size()); + DCHECK(false) << error_msg; + return Status::InternalError(error_msg); + } + size_t append_size = _batch_buffer.append(content); + content.remove_prefix(append_size); + if (_batch_buffer.full()) { + RETURN_IF_ERROR(_flush_buffer()); + } + } return Status::OK(); } @@ -90,19 +236,8 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { } for (size_t i = 0; i < data_cnt; i++) { - const Slice& result = data[i]; - size_t left_bytes = result.size; - const char* p = result.data; - while (left_bytes > 0) { - int64_t written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, p, left_bytes); - if (written_bytes < 0) { - return Status::InternalError("write hdfs failed. fs_name: {}, path: {}, error: {}", - _fs_name, _path.native(), hdfs_error()); - } - left_bytes -= written_bytes; - p += written_bytes; - _bytes_appended += written_bytes; - } + RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()})); + _bytes_appended += data[i].get_size(); } return Status::OK(); } @@ -112,6 +247,9 @@ Status HdfsFileWriter::finalize() { if (_closed) [[unlikely]] { return Status::InternalError("finalize closed file: {}", _path.native()); } + if (_batch_buffer.size() != 0) { + RETURN_IF_ERROR(_flush_buffer()); + } // Flush buffered data to HDFS without waiting for HDFS response int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); @@ -127,23 +265,12 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, HdfsHandler* handle const std::string& fs_name, const FileWriterOptions* opts) { auto path = convert_path(full_path, fs_name); - std::string hdfs_dir = path.parent_path().string(); - int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str()); - if (exists != 0) { - // FIXME(plat1ko): Directly return error here? - VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir; - int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str()); - if (ret != 0) { - // TODO(plat1ko): Normalized error handling - std::stringstream ss; - ss << "create dir failed. " - << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error(); - LOG(WARNING) << ss.str(); - return ResultError(Status::InternalError(ss.str())); - } - } // open file - auto* hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0); + hdfsFile hdfs_file = nullptr; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency); + hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0); + } if (hdfs_file == nullptr) { std::stringstream ss; ss << "open file failed. " diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 7ad041c94b7..e6aa623cada 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -21,11 +21,14 @@ #include "io/fs/file_writer.h" #include "io/fs/hdfs.h" #include "io/fs/path.h" -#include "util/slice.h" -namespace doris::io { +namespace doris { +struct Slice; +namespace io { class HdfsHandler; +class BlockFileCache; +struct FileCacheAllocatorBuilder; class HdfsFileWriter final : public FileWriter { public: @@ -48,6 +51,13 @@ public: bool closed() const override { return _closed; } private: + // Flush buffered data into HDFS client and write local file cache if enabled + // **Notice**: this would clear the underlying buffer + Status _flush_buffer(); + Status append_hdfs_file(std::string_view content); + void _write_into_local_file_cache(); + Status _append(std::string_view content); + Path _path; HdfsHandler* _hdfs_handler = nullptr; hdfsFile _hdfs_file = nullptr; @@ -55,6 +65,25 @@ private: size_t _bytes_appended = 0; bool _closed = false; bool _sync_file_data; + std::unique_ptr<FileCacheAllocatorBuilder> + _cache_builder; // nullptr if disable write file cache + class BatchBuffer { + public: + BatchBuffer(size_t capacity); + size_t append(std::string_view content); + bool full() const; + const char* data() const; + size_t capacity() const; + size_t size() const; + void clear(); + std::string_view content() const; + + private: + std::string _batch_buffer; + }; + BatchBuffer _batch_buffer; + size_t _index_offset; }; -} // namespace doris::io +} // namespace io +} // namespace doris diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index da390d3922c..0ae5d2f371b 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -17,6 +17,7 @@ #include "io/hdfs_util.h" +#include <bvar/latency_recorder.h> #include <gen_cpp/cloud.pb.h> #include <ostream> @@ -67,6 +68,16 @@ uint64 hdfs_hash_code(const THdfsParams& hdfs_params) { } // namespace +namespace hdfs_bvar { +bvar::LatencyRecorder hdfs_read_latency("hdfs_read"); +bvar::LatencyRecorder hdfs_write_latency("hdfs_write"); +bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir"); +bvar::LatencyRecorder hdfs_open_latency("hdfs_open"); +bvar::LatencyRecorder hdfs_close_latency("hdfs_close"); +bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush"); +bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); +}; // namespace hdfs_bvar + void HdfsHandlerCache::_clean_invalid() { std::vector<uint64> removed_handle; for (auto& item : _cache) { diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index f1b236887d5..f450063c7dc 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -17,6 +17,8 @@ #pragma once +#include <bvar/bvar.h> + #include <atomic> #include <cstdint> #include <memory> @@ -37,6 +39,16 @@ class THdfsParams; namespace io { +namespace hdfs_bvar { +extern bvar::LatencyRecorder hdfs_read_latency; +extern bvar::LatencyRecorder hdfs_write_latency; +extern bvar::LatencyRecorder hdfs_create_dir_latency; +extern bvar::LatencyRecorder hdfs_open_latency; +extern bvar::LatencyRecorder hdfs_close_latency; +extern bvar::LatencyRecorder hdfs_flush_latency; +extern bvar::LatencyRecorder hdfs_hsync_latency; +}; // namespace hdfs_bvar + class HdfsHandler { public: HdfsHandler(hdfsFS fs, bool cached) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
