This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0644f45df3b [feature](Cloud) Load index data into index cache when 
writing data (#34046)
0644f45df3b is described below

commit 0644f45df3b5a63a35c67d56dfeb92a6f3fc80d0
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 20:41:45 2024 +0800

    [feature](Cloud) Load index data into index cache when writing data (#34046)
---
 be/src/io/fs/broker_file_writer.h                |  3 ++-
 be/src/io/fs/file_writer.h                       |  3 +++
 be/src/io/fs/hdfs_file_writer.cpp                |  3 ---
 be/src/io/fs/hdfs_file_writer.h                  |  5 +++-
 be/src/io/fs/local_file_writer.h                 |  4 ++-
 be/src/io/fs/s3_file_bufferpool.cpp              | 23 +-----------------
 be/src/io/fs/s3_file_bufferpool.h                | 23 ++----------------
 be/src/io/fs/s3_file_writer.cpp                  | 31 +++++++++---------------
 be/src/io/fs/s3_file_writer.h                    | 12 ++++-----
 be/src/io/fs/stream_sink_file_writer.h           |  3 +++
 be/src/olap/rowset/segment_v2/segment_writer.cpp | 15 ++++++++++++
 be/test/olap/tablet_cooldown_test.cpp            |  2 ++
 12 files changed, 53 insertions(+), 74 deletions(-)

diff --git a/be/src/io/fs/broker_file_writer.h 
b/be/src/io/fs/broker_file_writer.h
index 58bba9febd3..d6fce52a05c 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -33,7 +33,7 @@ namespace doris {
 class ExecEnv;
 
 namespace io {
-
+struct FileCacheAllocatorBuilder;
 class BrokerFileWriter final : public FileWriter {
 public:
     // Create and open file writer
@@ -50,6 +50,7 @@ public:
     const Path& path() const override { return _path; }
     size_t bytes_appended() const override { return _cur_offset; }
     bool closed() const override { return _closed; }
+    FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
 
 private:
     Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes);
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 4feab99c09f..5d22dca6055 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -28,6 +28,7 @@
 
 namespace doris::io {
 class FileSystem;
+struct FileCacheAllocatorBuilder;
 
 // Only affects remote file writers
 struct FileWriterOptions {
@@ -62,6 +63,8 @@ public:
     virtual size_t bytes_appended() const = 0;
 
     virtual bool closed() const = 0;
+
+    virtual FileCacheAllocatorBuilder* cache_builder() const = 0;
 };
 
 } // namespace doris::io
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index c596c0e290f..9ea66ca4da1 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -159,9 +159,6 @@ void HdfsFileWriter::_write_into_local_file_cache() {
         size_t block_size = block->range().size();
         size_t append_size = std::min(data_remain_size, block_size);
         if (block->state() == FileBlock::State::EMPTY) {
-            if (_index_offset != 0 && block->range().right >= _index_offset) {
-                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
-            }
             block->get_or_set_downloader();
             if (block->is_downloader()) {
                 Slice s(_batch_buffer.data() + pos, append_size);
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 2ce865ecfed..234835e083f 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -51,6 +51,10 @@ public:
     size_t bytes_appended() const override { return _bytes_appended; }
     bool closed() const override { return _closed; }
 
+    FileCacheAllocatorBuilder* cache_builder() const override {
+        return _cache_builder == nullptr ? nullptr : _cache_builder.get();
+    }
+
 private:
     // Flush buffered data into HDFS client and write local file cache if 
enabled
     // **Notice**: this would clear the underlying buffer
@@ -83,7 +87,6 @@ private:
         std::string _batch_buffer;
     };
     BatchBuffer _batch_buffer;
-    size_t _index_offset = 0;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index 4cd6712b04e..81ebb0ebd1f 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -25,7 +25,7 @@
 #include "util/slice.h"
 
 namespace doris::io {
-
+struct FileCacheAllocatorBuilder;
 class LocalFileWriter final : public FileWriter {
 public:
     LocalFileWriter(Path path, int fd, bool sync_data = true);
@@ -38,6 +38,8 @@ public:
     size_t bytes_appended() const override;
     bool closed() const override { return _closed; }
 
+    FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
+
 private:
     void _abort();
     Status _close(bool sync);
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp 
b/be/src/io/fs/s3_file_bufferpool.cpp
index 9df60dc1fd3..82493fa9514 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -90,24 +90,6 @@ FileBuffer::~FileBuffer() {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
     _inner_data.reset();
 }
-/**
- * 0. check if file cache holder allocated
- * 1. update the cache's type to index cache
- */
-void UploadFileBuffer::set_index_offset(size_t offset) {
-    _index_offset = offset;
-    if (_holder) {
-        bool change_to_index_cache = false;
-        for (auto iter = _holder->file_blocks.begin(); iter != 
_holder->file_blocks.end(); ++iter) {
-            if (iter == _cur_file_block) {
-                change_to_index_cache = true;
-            }
-            if (change_to_index_cache) {
-                
static_cast<void>((*iter)->change_cache_type_self(FileCacheType::INDEX));
-            }
-        }
-    }
-}
 
 /**
  * 0. when there is memory preserved, directly write data to buf
@@ -222,9 +204,6 @@ void UploadFileBuffer::upload_to_local_file_cache(bool 
is_cancelled) {
         size_t block_size = block->range().size();
         size_t append_size = std::min(data_remain_size, block_size);
         if (block->state() == FileBlock::State::EMPTY) {
-            if (_index_offset != 0 && block->range().right >= _index_offset) {
-                
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
-            }
             block->get_or_set_downloader();
             // Another thread may have started downloading due to a query
             // Just skip putting to cache from UploadFileBuffer
@@ -279,7 +258,7 @@ Status 
FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
     if (_type == BufferType::UPLOAD) {
         RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
                                           std::move(_upload_cb), 
std::move(state), _offset,
-                                          std::move(_alloc_holder_cb), 
_index_offset));
+                                          std::move(_alloc_holder_cb)));
         return Status::OK();
     }
     if (_type == BufferType::DOWNLOAD) {
diff --git a/be/src/io/fs/s3_file_bufferpool.h 
b/be/src/io/fs/s3_file_bufferpool.h
index 189f7696967..15d0976df6b 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -151,18 +151,10 @@ struct DownloadFileBuffer final : public FileBuffer {
 
 struct UploadFileBuffer final : public FileBuffer {
     UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, 
OperationState state,
-                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
-                     size_t index_offset)
+                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder)
             : FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
-              _upload_to_remote(std::move(upload_cb)),
-              _index_offset(index_offset) {}
+              _upload_to_remote(std::move(upload_cb)) {}
     ~UploadFileBuffer() override = default;
-    /**
-    * set the index offset
-    *
-    * @param offset the index offset
-    */
-    void set_index_offset(size_t offset);
     Status append_data(const Slice& s) override;
     /**
     * read the content from local file cache
@@ -206,7 +198,6 @@ private:
     FileBlocksHolderPtr _holder;
     decltype(_holder->file_blocks.begin()) _cur_file_block;
     size_t _append_offset {0};
-    size_t _index_offset {0};
     uint32_t _crc_value = 0;
 };
 
@@ -272,15 +263,6 @@ struct FileBufferBuilder {
         return *this;
     }
     /**
-    * set the index offset of the file buffer
-    *
-    * @param cb 
-    */
-    FileBufferBuilder& set_index_offset(size_t index_offset) {
-        _index_offset = index_offset;
-        return *this;
-    }
-    /**
     * set the callback which write the content into local file cache
     *
     * @param cb 
@@ -309,7 +291,6 @@ struct FileBufferBuilder {
     std::function<Status(Slice&)> _download;
     std::function<void(Slice, size_t)> _write_to_use_buffer;
     size_t _offset;
-    size_t _index_offset;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 84487f496ac..69202bd22fe 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -87,16 +87,16 @@ 
S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
         : _path(fmt::format("s3://{}/{}", bucket, key)),
           _bucket(std::move(bucket)),
           _key(std::move(key)),
-          _client(std::move(client)),
-          _expiration_time(opts ? opts->file_cache_expiration : 0),
-          _is_cold_data(opts ? opts->is_cold_data : true),
-          _write_file_cache(opts ? opts->write_file_cache : false) {
+          _client(std::move(client)) {
     s3_file_writer_total << 1;
     s3_file_being_written << 1;
     Aws::Http::SetCompliantRfc3986Encoding(true);
-    if (config::enable_file_cache && _write_file_cache) {
-        _cache_hash = BlockFileCache::hash(_path.filename().native());
-        _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
+    if (config::enable_file_cache && opts != nullptr && 
opts->write_file_cache) {
+        _cache_builder = 
std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
+                opts ? opts->is_cold_data : false, opts ? 
opts->file_cache_expiration : 0,
+                BlockFileCache::hash(_path.filename().native()),
+                FileCacheFactory::instance()->get_by_path(
+                        BlockFileCache::hash(_path.filename().native()))});
     }
 }
 
@@ -264,7 +264,6 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                                     _upload_one_part(part_num, buf);
                                 })
                         .set_file_offset(_bytes_appended)
-                        .set_index_offset(_index_offset)
                         .set_sync_after_complete_task([this, part_num = 
_cur_part_num](Status s) {
                             bool ret = false;
                             if (!s.ok()) [[unlikely]] {
@@ -282,22 +281,16 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                             return ret;
                         })
                         .set_is_cancelled([this]() { return _failed.load(); });
-                if (_write_file_cache) {
+                if (_cache_builder != nullptr) {
                     // We would load the data into file cache asynchronously 
which indicates
                     // that this instance of S3FileWriter might have been 
destructed when we
                     // try to do writing into file cache, so we make the 
lambda capture the variable
                     // we need by value to extend their lifetime
                     builder.set_allocate_file_blocks_holder(
-                            [cache = _cache, k = _cache_hash, offset = 
_bytes_appended,
-                             t = _expiration_time, cold = _is_cold_data]() -> 
FileBlocksHolderPtr {
-                                CacheContext ctx;
-                                ctx.cache_type =
-                                        t == 0 ? FileCacheType::NORMAL : 
FileCacheType::TTL;
-                                ctx.expiration_time = t;
-                                ctx.is_cold_data = cold;
-                                auto holder = cache->get_or_set(k, offset,
-                                                                
config::s3_write_buffer_size, ctx);
-                                return 
std::make_unique<FileBlocksHolder>(std::move(holder));
+                            [builder = *_cache_builder,
+                             offset = _bytes_appended]() -> 
FileBlocksHolderPtr {
+                                return builder.allocate_cache_holder(offset,
+                                                                     
config::s3_write_buffer_size);
                             });
                 }
                 RETURN_IF_ERROR(builder.build(&_pending_buf));
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index a2c2ec0422a..8f27b202369 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -58,6 +58,10 @@ public:
     size_t bytes_appended() const override { return _bytes_appended; }
     bool closed() const override { return _closed; }
 
+    FileCacheAllocatorBuilder* cache_builder() const override {
+        return _cache_builder == nullptr ? nullptr : _cache_builder.get();
+    }
+
 private:
     Status _abort();
     [[nodiscard]] std::string _dump_completed_part() const;
@@ -73,15 +77,12 @@ private:
 
     std::shared_ptr<Aws::S3::S3Client> _client;
     std::string _upload_id;
-    size_t _index_offset {0};
 
     // Current Part Num for CompletedPart
     int _cur_part_num = 1;
     std::mutex _completed_lock;
     std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
 
-    UInt128Wrapper _cache_hash;
-    BlockFileCache* _cache;
     // **Attention** call add_count() before submitting buf to async thread 
pool
     bthread::CountdownEvent _countdown_event {0};
 
@@ -92,9 +93,8 @@ private:
     size_t _bytes_appended = 0;
 
     std::shared_ptr<FileBuffer> _pending_buf;
-    uint64_t _expiration_time;
-    bool _is_cold_data;
-    bool _write_file_cache;
+    std::unique_ptr<FileCacheAllocatorBuilder>
+            _cache_builder; // nullptr if disable write file cache
 };
 
 } // namespace io
diff --git a/be/src/io/fs/stream_sink_file_writer.h 
b/be/src/io/fs/stream_sink_file_writer.h
index 2bd91075ad1..4a0eb955c26 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -33,6 +33,7 @@ struct RowsetId;
 struct SegmentStatistics;
 
 namespace io {
+struct FileCacheAllocatorBuilder;
 class StreamSinkFileWriter final : public FileWriter {
 public:
     StreamSinkFileWriter(std::vector<std::shared_ptr<LoadStreamStub>> streams)
@@ -57,6 +58,8 @@ public:
         return dummy;
     }
 
+    FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
+
 private:
     std::vector<std::shared_ptr<LoadStreamStub>> _streams;
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index f6be1917e57..7a83496b7fb 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -34,6 +34,8 @@
 #include "common/status.h"
 #include "gutil/port.h"
 #include "inverted_index_fs_directory.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
@@ -1113,6 +1115,8 @@ Status SegmentWriter::finalize(uint64_t* 
segment_file_size, uint64_t* index_size
     }
     // write data
     RETURN_IF_ERROR(finalize_columns_data());
+    // Get the index start before finalize_footer since this function would 
write new data.
+    uint64_t index_start = _file_writer->bytes_appended();
     // write index
     RETURN_IF_ERROR(finalize_columns_index(index_size));
     // write footer
@@ -1122,6 +1126,17 @@ Status SegmentWriter::finalize(uint64_t* 
segment_file_size, uint64_t* index_size
         LOG(INFO) << "segment flush consumes a lot time_ns " << 
timer.elapsed_time()
                   << ", segmemt_size " << *segment_file_size;
     }
+    // When the cache type is not ttl(expiration time == 0), the data should 
be split into normal cache queue
+    // and index cache queue
+    if (auto* cache_builder = _file_writer->cache_builder(); cache_builder != 
nullptr &&
+                                                             
cache_builder->_expiration_time == 0 &&
+                                                             
config::is_cloud_mode()) {
+        auto size = *index_size + *segment_file_size;
+        auto holder = cache_builder->allocate_cache_holder(index_start, size);
+        for (auto& segment : holder->file_blocks) {
+            
static_cast<void>(segment->change_cache_type_self(io::FileCacheType::INDEX));
+        }
+    }
     return Status::OK();
 }
 
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 45186246006..49de1826104 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -113,6 +113,8 @@ public:
 
     const Path& path() const override { return _local_file_writer->path(); }
 
+    io::FileCacheAllocatorBuilder* cache_builder() const override { return 
nullptr; }
+
 private:
     std::unique_ptr<io::FileWriter> _local_file_writer;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to