This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 872ddba92f565817ef77f45a0bb89a00872e32d2
Author: AlexYue <[email protected]>
AuthorDate: Fri Apr 19 10:46:52 2024 +0800

    [feature](Cloud) Enable write into file cache for hdfs writer (#33796)
---
 be/src/common/config.cpp              |   1 +
 be/src/common/config.h                |   1 +
 be/src/io/cache/file_cache_common.cpp |  11 ++
 be/src/io/cache/file_cache_common.h   |  12 ++
 be/src/io/fs/hdfs_file_system.cpp     |   4 +-
 be/src/io/fs/hdfs_file_writer.cpp     | 199 ++++++++++++++++++++++++++++------
 be/src/io/fs/hdfs_file_writer.h       |  35 +++++-
 be/src/io/hdfs_util.cpp               |  11 ++
 be/src/io/hdfs_util.h                 |  12 ++
 9 files changed, 245 insertions(+), 41 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 326d9cfc555..988cf3b53d6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1028,6 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880");
 // The timeout config for S3 buffer allocation
 DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
+DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB
 
 //disable shrink memory by default
 DEFINE_mBool(enable_shrink_memory, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 191ebcc4f3b..10228fbfbd7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1074,6 +1074,7 @@ DECLARE_mInt64(s3_write_buffer_size);
 DECLARE_mInt32(s3_writer_buffer_allocation_timeout);
 // the max number of cached file handle for block segemnt
 DECLARE_mInt64(file_cache_max_file_reader_cache_size);
+DECLARE_mInt64(hdfs_write_batch_buffer_size_mb);
 //enable shrink memory
 DECLARE_mBool(enable_shrink_memory);
 // enable cache for high concurrent point query work load
diff --git a/be/src/io/cache/file_cache_common.cpp 
b/be/src/io/cache/file_cache_common.cpp
index 364f38c3a99..3ce647b4a0d 100644
--- a/be/src/io/cache/file_cache_common.cpp
+++ b/be/src/io/cache/file_cache_common.cpp
@@ -21,6 +21,7 @@
 #include "io/cache/file_cache_common.h"
 
 #include "common/config.h"
+#include "io/cache/block_file_cache.h"
 #include "vec/common/hex.h"
 
 namespace doris::io {
@@ -55,4 +56,14 @@ std::string UInt128Wrapper::to_string() const {
     return vectorized::get_hex_uint_lowercase(value_);
 }
 
+FileBlocksHolderPtr FileCacheAllocatorBuilder::allocate_cache_holder(size_t 
offset,
+                                                                     size_t 
size) const {
+    CacheContext ctx;
+    ctx.cache_type = _expiration_time == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
+    ctx.expiration_time = _expiration_time;
+    ctx.is_cold_data = _is_cold_data;
+    auto holder = _cache->get_or_set(_cache_hash, offset, size, ctx);
+    return std::make_unique<FileBlocksHolder>(std::move(holder));
+}
+
 } // namespace doris::io
diff --git a/be/src/io/cache/file_cache_common.h 
b/be/src/io/cache/file_cache_common.h
index 3e293ac221c..21ee4dfc8b0 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -50,6 +50,18 @@ struct UInt128Wrapper {
     bool operator==(const UInt128Wrapper& other) const { return value_ == 
other.value_; }
 };
 
+class BlockFileCache;
+struct FileBlocksHolder;
+using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
+
+struct FileCacheAllocatorBuilder {
+    bool _is_cold_data;
+    uint64_t _expiration_time;
+    UInt128Wrapper _cache_hash;
+    BlockFileCache* _cache; // Only one ref, the lifetime is owned by FileCache
+    FileBlocksHolderPtr allocate_cache_holder(size_t offset, size_t size) 
const;
+};
+
 struct KeyHash {
     std::size_t operator()(const UInt128Wrapper& w) const { return 
UInt128Hash()(w.value_); }
 };
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 4ab32920c8c..f86b3ef588a 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -106,9 +106,9 @@ Status HdfsFileSystem::init() {
 }
 
 Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer,
-                                        const FileWriterOptions*) {
+                                        const FileWriterOptions* opts) {
     _fs_handle->inc_ref();
-    auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name);
+    auto res = io::HdfsFileWriter::create(file, _fs_handle, _fs_name, opts);
     if (res.has_value()) {
         *writer = std::move(res).value();
         return Status::OK();
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 46f5a626e42..b49ceac6bbf 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -18,6 +18,7 @@
 #include "io/fs/hdfs_file_writer.h"
 
 #include <fcntl.h>
+#include <fmt/core.h>
 
 #include <filesystem>
 #include <ostream>
@@ -26,23 +27,46 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/file_cache_common.h"
 #include "io/fs/err_utils.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/hdfs_util.h"
 #include "service/backend_options.h"
+#include "util/bvar_helper.h"
 
 namespace doris::io {
 
+bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num");
+bvar::Adder<uint64_t> 
hdfs_bytes_written_total("hdfs_file_writer_bytes_written");
+bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
+bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
+
+static constexpr size_t MB = 1024 * 1024;
+
 HdfsFileWriter::HdfsFileWriter(Path path, HdfsHandler* handler, hdfsFile 
hdfs_file,
                                std::string fs_name, const FileWriterOptions* 
opts)
         : _path(std::move(path)),
           _hdfs_handler(handler),
           _hdfs_file(hdfs_file),
           _fs_name(std::move(fs_name)),
-          _sync_file_data(opts ? opts->sync_file_data : true) {}
+          _sync_file_data(opts ? opts->sync_file_data : true),
+          _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) {
+    if (config::enable_file_cache && opts != nullptr && 
opts->write_file_cache) {
+        _cache_builder = 
std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
+                opts ? opts->is_cold_data : false, opts ? 
opts->file_cache_expiration : 0,
+                BlockFileCache::hash(_path.filename().native()),
+                FileCacheFactory::instance()->get_by_path(
+                        BlockFileCache::hash(_path.filename().native()))});
+    }
+    hdfs_file_writer_total << 1;
+    hdfs_file_being_written << 1;
+}
 
 HdfsFileWriter::~HdfsFileWriter() {
     if (_hdfs_file) {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
     }
 
@@ -51,6 +75,7 @@ HdfsFileWriter::~HdfsFileWriter() {
     } else {
         delete _hdfs_handler;
     }
+    hdfs_file_being_written << -1;
 }
 
 Status HdfsFileWriter::close() {
@@ -58,29 +83,150 @@ Status HdfsFileWriter::close() {
         return Status::OK();
     }
     _closed = true;
-
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
+    int ret;
     if (_sync_file_data) {
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
 #ifdef USE_LIBHDFS3
-        int ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+            ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file);
 #else
-        int ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
 #endif
+        }
+
         if (ret != 0) {
             return Status::InternalError("failed to sync hdfs file. fs_name={} 
path={} : {}",
                                          _fs_name, _path.native(), 
hdfs_error());
         }
     }
 
-    // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
-    // the HDFS response, but won't guarantee the synchronization of data to 
HDFS.
-    int ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
+        // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
+        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+    }
     _hdfs_file = nullptr;
     if (ret != 0) {
         return Status::InternalError(
                 "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
                 BackendOptions::get_localhost(), _fs_name, _path.native(), 
hdfs_error());
     }
+    hdfs_file_created_total << 1;
+    return Status::OK();
+}
+
+HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) {
+    _batch_buffer.reserve(capacity);
+}
+
+bool HdfsFileWriter::BatchBuffer::full() const {
+    return size() == capacity();
+}
+
+const char* HdfsFileWriter::BatchBuffer::data() const {
+    return _batch_buffer.data();
+}
+
+size_t HdfsFileWriter::BatchBuffer::capacity() const {
+    return _batch_buffer.capacity();
+}
+
+size_t HdfsFileWriter::BatchBuffer::size() const {
+    return _batch_buffer.size();
+}
 
+void HdfsFileWriter::BatchBuffer::clear() {
+    _batch_buffer.clear();
+}
+
+// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
+void HdfsFileWriter::_write_into_local_file_cache() {
+    auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - 
_batch_buffer.size(),
+                                                        
_batch_buffer.capacity());
+    size_t pos = 0;
+    size_t data_remain_size = _batch_buffer.size();
+    for (auto& block : holder->file_blocks) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t block_size = block->range().size();
+        size_t append_size = std::min(data_remain_size, block_size);
+        if (block->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && block->range().right >= _index_offset) {
+                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
+            }
+            block->get_or_set_downloader();
+            if (block->is_downloader()) {
+                Slice s(_batch_buffer.data() + pos, append_size);
+                Status st = block->append(s);
+                if (st.ok()) {
+                    st = block->finalize();
+                }
+                if (!st.ok()) {
+                    LOG_WARNING("failed to append data to file 
cache").error(st);
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
+    while (!content.empty()) {
+        int64_t written_bytes;
+        {
+            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
+            written_bytes =
+                    hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, 
content.data(), content.size());
+        }
+        if (written_bytes < 0) {
+            return Status::InternalError("write hdfs failed. fs_name: {}, 
path: {}, error: {}",
+                                         _fs_name, _path.native(), 
hdfs_error());
+        }
+        hdfs_bytes_written_total << written_bytes;
+        content.remove_prefix(written_bytes);
+    }
+    return Status::OK();
+}
+
+Status HdfsFileWriter::_flush_buffer() {
+    RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content()));
+    if (_cache_builder != nullptr) {
+        _write_into_local_file_cache();
+    }
+    _batch_buffer.clear();
+    return Status::OK();
+}
+
+size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) {
+    size_t append_size = std::min(capacity() - size(), content.size());
+    _batch_buffer.append(content.data(), append_size);
+    return append_size;
+}
+
+std::string_view HdfsFileWriter::BatchBuffer::content() const {
+    return _batch_buffer;
+}
+
+Status HdfsFileWriter::_append(std::string_view content) {
+    while (!content.empty()) {
+        if (_batch_buffer.full()) {
+            auto error_msg = fmt::format("invalid batch buffer status, 
capacity {}, size {}",
+                                         _batch_buffer.capacity(), 
_batch_buffer.size());
+            DCHECK(false) << error_msg;
+            return Status::InternalError(error_msg);
+        }
+        size_t append_size = _batch_buffer.append(content);
+        content.remove_prefix(append_size);
+        if (_batch_buffer.full()) {
+            RETURN_IF_ERROR(_flush_buffer());
+        }
+    }
     return Status::OK();
 }
 
@@ -90,19 +236,8 @@ Status HdfsFileWriter::appendv(const Slice* data, size_t 
data_cnt) {
     }
 
     for (size_t i = 0; i < data_cnt; i++) {
-        const Slice& result = data[i];
-        size_t left_bytes = result.size;
-        const char* p = result.data;
-        while (left_bytes > 0) {
-            int64_t written_bytes = hdfsWrite(_hdfs_handler->hdfs_fs, 
_hdfs_file, p, left_bytes);
-            if (written_bytes < 0) {
-                return Status::InternalError("write hdfs failed. fs_name: {}, 
path: {}, error: {}",
-                                             _fs_name, _path.native(), 
hdfs_error());
-            }
-            left_bytes -= written_bytes;
-            p += written_bytes;
-            _bytes_appended += written_bytes;
-        }
+        RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()}));
+        _bytes_appended += data[i].get_size();
     }
     return Status::OK();
 }
@@ -112,6 +247,9 @@ Status HdfsFileWriter::finalize() {
     if (_closed) [[unlikely]] {
         return Status::InternalError("finalize closed file: {}", 
_path.native());
     }
+    if (_batch_buffer.size() != 0) {
+        RETURN_IF_ERROR(_flush_buffer());
+    }
 
     // Flush buffered data to HDFS without waiting for HDFS response
     int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
@@ -127,23 +265,12 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path 
full_path, HdfsHandler* handle
                                              const std::string& fs_name,
                                              const FileWriterOptions* opts) {
     auto path = convert_path(full_path, fs_name);
-    std::string hdfs_dir = path.parent_path().string();
-    int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
-    if (exists != 0) {
-        // FIXME(plat1ko): Directly return error here?
-        VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir;
-        int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str());
-        if (ret != 0) {
-            // TODO(plat1ko): Normalized error handling
-            std::stringstream ss;
-            ss << "create dir failed. "
-               << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: 
" << hdfs_error();
-            LOG(WARNING) << ss.str();
-            return ResultError(Status::InternalError(ss.str()));
-        }
-    }
     // open file
-    auto* hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 
0, 0, 0);
+    hdfsFile hdfs_file = nullptr;
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency);
+        hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 
0, 0);
+    }
     if (hdfs_file == nullptr) {
         std::stringstream ss;
         ss << "open file failed. "
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 7ad041c94b7..e6aa623cada 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -21,11 +21,14 @@
 #include "io/fs/file_writer.h"
 #include "io/fs/hdfs.h"
 #include "io/fs/path.h"
-#include "util/slice.h"
 
-namespace doris::io {
+namespace doris {
+struct Slice;
+namespace io {
 
 class HdfsHandler;
+class BlockFileCache;
+struct FileCacheAllocatorBuilder;
 
 class HdfsFileWriter final : public FileWriter {
 public:
@@ -48,6 +51,13 @@ public:
     bool closed() const override { return _closed; }
 
 private:
+    // Flush buffered data into HDFS client and write local file cache if 
enabled
+    // **Notice**: this would clear the underlying buffer
+    Status _flush_buffer();
+    Status append_hdfs_file(std::string_view content);
+    void _write_into_local_file_cache();
+    Status _append(std::string_view content);
+
     Path _path;
     HdfsHandler* _hdfs_handler = nullptr;
     hdfsFile _hdfs_file = nullptr;
@@ -55,6 +65,25 @@ private:
     size_t _bytes_appended = 0;
     bool _closed = false;
     bool _sync_file_data;
+    std::unique_ptr<FileCacheAllocatorBuilder>
+            _cache_builder; // nullptr if disable write file cache
+    class BatchBuffer {
+    public:
+        BatchBuffer(size_t capacity);
+        size_t append(std::string_view content);
+        bool full() const;
+        const char* data() const;
+        size_t capacity() const;
+        size_t size() const;
+        void clear();
+        std::string_view content() const;
+
+    private:
+        std::string _batch_buffer;
+    };
+    BatchBuffer _batch_buffer;
+    size_t _index_offset;
 };
 
-} // namespace doris::io
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp
index da390d3922c..0ae5d2f371b 100644
--- a/be/src/io/hdfs_util.cpp
+++ b/be/src/io/hdfs_util.cpp
@@ -17,6 +17,7 @@
 
 #include "io/hdfs_util.h"
 
+#include <bvar/latency_recorder.h>
 #include <gen_cpp/cloud.pb.h>
 
 #include <ostream>
@@ -67,6 +68,16 @@ uint64 hdfs_hash_code(const THdfsParams& hdfs_params) {
 
 } // namespace
 
+namespace hdfs_bvar {
+bvar::LatencyRecorder hdfs_read_latency("hdfs_read");
+bvar::LatencyRecorder hdfs_write_latency("hdfs_write");
+bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir");
+bvar::LatencyRecorder hdfs_open_latency("hdfs_open");
+bvar::LatencyRecorder hdfs_close_latency("hdfs_close");
+bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush");
+bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
+}; // namespace hdfs_bvar
+
 void HdfsHandlerCache::_clean_invalid() {
     std::vector<uint64> removed_handle;
     for (auto& item : _cache) {
diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h
index f1b236887d5..f450063c7dc 100644
--- a/be/src/io/hdfs_util.h
+++ b/be/src/io/hdfs_util.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <bvar/bvar.h>
+
 #include <atomic>
 #include <cstdint>
 #include <memory>
@@ -37,6 +39,16 @@ class THdfsParams;
 
 namespace io {
 
+namespace hdfs_bvar {
+extern bvar::LatencyRecorder hdfs_read_latency;
+extern bvar::LatencyRecorder hdfs_write_latency;
+extern bvar::LatencyRecorder hdfs_create_dir_latency;
+extern bvar::LatencyRecorder hdfs_open_latency;
+extern bvar::LatencyRecorder hdfs_close_latency;
+extern bvar::LatencyRecorder hdfs_flush_latency;
+extern bvar::LatencyRecorder hdfs_hsync_latency;
+}; // namespace hdfs_bvar
+
 class HdfsHandler {
 public:
     HdfsHandler(hdfsFS fs, bool cached)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to