This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0644f45df3b [feature](Cloud) Load index data into index cache when
writing data (#34046)
0644f45df3b is described below
commit 0644f45df3b5a63a35c67d56dfeb92a6f3fc80d0
Author: AlexYue <[email protected]>
AuthorDate: Thu Apr 25 20:41:45 2024 +0800
[feature](Cloud) Load index data into index cache when writing data (#34046)
---
be/src/io/fs/broker_file_writer.h | 3 ++-
be/src/io/fs/file_writer.h | 3 +++
be/src/io/fs/hdfs_file_writer.cpp | 3 ---
be/src/io/fs/hdfs_file_writer.h | 5 +++-
be/src/io/fs/local_file_writer.h | 4 ++-
be/src/io/fs/s3_file_bufferpool.cpp | 23 +-----------------
be/src/io/fs/s3_file_bufferpool.h | 23 ++----------------
be/src/io/fs/s3_file_writer.cpp | 31 +++++++++---------------
be/src/io/fs/s3_file_writer.h | 12 ++++-----
be/src/io/fs/stream_sink_file_writer.h | 3 +++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 15 ++++++++++++
be/test/olap/tablet_cooldown_test.cpp | 2 ++
12 files changed, 53 insertions(+), 74 deletions(-)
diff --git a/be/src/io/fs/broker_file_writer.h
b/be/src/io/fs/broker_file_writer.h
index 58bba9febd3..d6fce52a05c 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -33,7 +33,7 @@ namespace doris {
class ExecEnv;
namespace io {
-
+struct FileCacheAllocatorBuilder;
class BrokerFileWriter final : public FileWriter {
public:
// Create and open file writer
@@ -50,6 +50,7 @@ public:
const Path& path() const override { return _path; }
size_t bytes_appended() const override { return _cur_offset; }
bool closed() const override { return _closed; }
+ FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
private:
Status _write(const uint8_t* buf, size_t buf_len, size_t* written_bytes);
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 4feab99c09f..5d22dca6055 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -28,6 +28,7 @@
namespace doris::io {
class FileSystem;
+struct FileCacheAllocatorBuilder;
// Only affects remote file writers
struct FileWriterOptions {
@@ -62,6 +63,8 @@ public:
virtual size_t bytes_appended() const = 0;
virtual bool closed() const = 0;
+
+ virtual FileCacheAllocatorBuilder* cache_builder() const = 0;
};
} // namespace doris::io
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index c596c0e290f..9ea66ca4da1 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -159,9 +159,6 @@ void HdfsFileWriter::_write_into_local_file_cache() {
size_t block_size = block->range().size();
size_t append_size = std::min(data_remain_size, block_size);
if (block->state() == FileBlock::State::EMPTY) {
- if (_index_offset != 0 && block->range().right >= _index_offset) {
-
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
- }
block->get_or_set_downloader();
if (block->is_downloader()) {
Slice s(_batch_buffer.data() + pos, append_size);
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 2ce865ecfed..234835e083f 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -51,6 +51,10 @@ public:
size_t bytes_appended() const override { return _bytes_appended; }
bool closed() const override { return _closed; }
+ FileCacheAllocatorBuilder* cache_builder() const override {
+ return _cache_builder == nullptr ? nullptr : _cache_builder.get();
+ }
+
private:
// Flush buffered data into HDFS client and write local file cache if
enabled
// **Notice**: this would clear the underlying buffer
@@ -83,7 +87,6 @@ private:
std::string _batch_buffer;
};
BatchBuffer _batch_buffer;
- size_t _index_offset = 0;
};
} // namespace io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index 4cd6712b04e..81ebb0ebd1f 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -25,7 +25,7 @@
#include "util/slice.h"
namespace doris::io {
-
+struct FileCacheAllocatorBuilder;
class LocalFileWriter final : public FileWriter {
public:
LocalFileWriter(Path path, int fd, bool sync_data = true);
@@ -38,6 +38,8 @@ public:
size_t bytes_appended() const override;
bool closed() const override { return _closed; }
+ FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
+
private:
void _abort();
Status _close(bool sync);
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp
b/be/src/io/fs/s3_file_bufferpool.cpp
index 9df60dc1fd3..82493fa9514 100644
--- a/be/src/io/fs/s3_file_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -90,24 +90,6 @@ FileBuffer::~FileBuffer() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->s3_file_buffer_tracker());
_inner_data.reset();
}
-/**
- * 0. check if file cache holder allocated
- * 1. update the cache's type to index cache
- */
-void UploadFileBuffer::set_index_offset(size_t offset) {
- _index_offset = offset;
- if (_holder) {
- bool change_to_index_cache = false;
- for (auto iter = _holder->file_blocks.begin(); iter !=
_holder->file_blocks.end(); ++iter) {
- if (iter == _cur_file_block) {
- change_to_index_cache = true;
- }
- if (change_to_index_cache) {
-
static_cast<void>((*iter)->change_cache_type_self(FileCacheType::INDEX));
- }
- }
- }
-}
/**
* 0. when there is memory preserved, directly write data to buf
@@ -222,9 +204,6 @@ void UploadFileBuffer::upload_to_local_file_cache(bool
is_cancelled) {
size_t block_size = block->range().size();
size_t append_size = std::min(data_remain_size, block_size);
if (block->state() == FileBlock::State::EMPTY) {
- if (_index_offset != 0 && block->range().right >= _index_offset) {
-
static_cast<void>(block->change_cache_type_self(FileCacheType::INDEX));
- }
block->get_or_set_downloader();
// Another thread may have started downloading due to a query
// Just skip putting to cache from UploadFileBuffer
@@ -279,7 +258,7 @@ Status
FileBufferBuilder::build(std::shared_ptr<FileBuffer>* buf) {
if (_type == BufferType::UPLOAD) {
RETURN_IF_CATCH_EXCEPTION(*buf = std::make_shared<UploadFileBuffer>(
std::move(_upload_cb),
std::move(state), _offset,
- std::move(_alloc_holder_cb),
_index_offset));
+ std::move(_alloc_holder_cb)));
return Status::OK();
}
if (_type == BufferType::DOWNLOAD) {
diff --git a/be/src/io/fs/s3_file_bufferpool.h
b/be/src/io/fs/s3_file_bufferpool.h
index 189f7696967..15d0976df6b 100644
--- a/be/src/io/fs/s3_file_bufferpool.h
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -151,18 +151,10 @@ struct DownloadFileBuffer final : public FileBuffer {
struct UploadFileBuffer final : public FileBuffer {
UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb,
OperationState state,
- size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder,
- size_t index_offset)
+ size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder)
: FileBuffer(BufferType::UPLOAD, alloc_holder, offset, state),
- _upload_to_remote(std::move(upload_cb)),
- _index_offset(index_offset) {}
+ _upload_to_remote(std::move(upload_cb)) {}
~UploadFileBuffer() override = default;
- /**
- * set the index offset
- *
- * @param offset the index offset
- */
- void set_index_offset(size_t offset);
Status append_data(const Slice& s) override;
/**
* read the content from local file cache
@@ -206,7 +198,6 @@ private:
FileBlocksHolderPtr _holder;
decltype(_holder->file_blocks.begin()) _cur_file_block;
size_t _append_offset {0};
- size_t _index_offset {0};
uint32_t _crc_value = 0;
};
@@ -272,15 +263,6 @@ struct FileBufferBuilder {
return *this;
}
/**
- * set the index offset of the file buffer
- *
- * @param cb
- */
- FileBufferBuilder& set_index_offset(size_t index_offset) {
- _index_offset = index_offset;
- return *this;
- }
- /**
* set the callback which write the content into local file cache
*
* @param cb
@@ -309,7 +291,6 @@ struct FileBufferBuilder {
std::function<Status(Slice&)> _download;
std::function<void(Slice, size_t)> _write_to_use_buffer;
size_t _offset;
- size_t _index_offset;
};
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 84487f496ac..69202bd22fe 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -87,16 +87,16 @@
S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
: _path(fmt::format("s3://{}/{}", bucket, key)),
_bucket(std::move(bucket)),
_key(std::move(key)),
- _client(std::move(client)),
- _expiration_time(opts ? opts->file_cache_expiration : 0),
- _is_cold_data(opts ? opts->is_cold_data : true),
- _write_file_cache(opts ? opts->write_file_cache : false) {
+ _client(std::move(client)) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
Aws::Http::SetCompliantRfc3986Encoding(true);
- if (config::enable_file_cache && _write_file_cache) {
- _cache_hash = BlockFileCache::hash(_path.filename().native());
- _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
+ if (config::enable_file_cache && opts != nullptr &&
opts->write_file_cache) {
+ _cache_builder =
std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
+ opts ? opts->is_cold_data : false, opts ?
opts->file_cache_expiration : 0,
+ BlockFileCache::hash(_path.filename().native()),
+ FileCacheFactory::instance()->get_by_path(
+ BlockFileCache::hash(_path.filename().native()))});
}
}
@@ -264,7 +264,6 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
_upload_one_part(part_num, buf);
})
.set_file_offset(_bytes_appended)
- .set_index_offset(_index_offset)
.set_sync_after_complete_task([this, part_num =
_cur_part_num](Status s) {
bool ret = false;
if (!s.ok()) [[unlikely]] {
@@ -282,22 +281,16 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
return ret;
})
.set_is_cancelled([this]() { return _failed.load(); });
- if (_write_file_cache) {
+ if (_cache_builder != nullptr) {
// We would load the data into file cache asynchronously
which indicates
// that this instance of S3FileWriter might have been
destructed when we
// try to do writing into file cache, so we make the
lambda capture the variable
// we need by value to extend their lifetime
builder.set_allocate_file_blocks_holder(
- [cache = _cache, k = _cache_hash, offset =
_bytes_appended,
- t = _expiration_time, cold = _is_cold_data]() ->
FileBlocksHolderPtr {
- CacheContext ctx;
- ctx.cache_type =
- t == 0 ? FileCacheType::NORMAL :
FileCacheType::TTL;
- ctx.expiration_time = t;
- ctx.is_cold_data = cold;
- auto holder = cache->get_or_set(k, offset,
-
config::s3_write_buffer_size, ctx);
- return
std::make_unique<FileBlocksHolder>(std::move(holder));
+ [builder = *_cache_builder,
+ offset = _bytes_appended]() ->
FileBlocksHolderPtr {
+ return builder.allocate_cache_holder(offset,
+
config::s3_write_buffer_size);
});
}
RETURN_IF_ERROR(builder.build(&_pending_buf));
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index a2c2ec0422a..8f27b202369 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -58,6 +58,10 @@ public:
size_t bytes_appended() const override { return _bytes_appended; }
bool closed() const override { return _closed; }
+ FileCacheAllocatorBuilder* cache_builder() const override {
+ return _cache_builder == nullptr ? nullptr : _cache_builder.get();
+ }
+
private:
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
@@ -73,15 +77,12 @@ private:
std::shared_ptr<Aws::S3::S3Client> _client;
std::string _upload_id;
- size_t _index_offset {0};
// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
- UInt128Wrapper _cache_hash;
- BlockFileCache* _cache;
// **Attention** call add_count() before submitting buf to async thread
pool
bthread::CountdownEvent _countdown_event {0};
@@ -92,9 +93,8 @@ private:
size_t _bytes_appended = 0;
std::shared_ptr<FileBuffer> _pending_buf;
- uint64_t _expiration_time;
- bool _is_cold_data;
- bool _write_file_cache;
+ std::unique_ptr<FileCacheAllocatorBuilder>
+ _cache_builder; // nullptr if disable write file cache
};
} // namespace io
diff --git a/be/src/io/fs/stream_sink_file_writer.h
b/be/src/io/fs/stream_sink_file_writer.h
index 2bd91075ad1..4a0eb955c26 100644
--- a/be/src/io/fs/stream_sink_file_writer.h
+++ b/be/src/io/fs/stream_sink_file_writer.h
@@ -33,6 +33,7 @@ struct RowsetId;
struct SegmentStatistics;
namespace io {
+struct FileCacheAllocatorBuilder;
class StreamSinkFileWriter final : public FileWriter {
public:
StreamSinkFileWriter(std::vector<std::shared_ptr<LoadStreamStub>> streams)
@@ -57,6 +58,8 @@ public:
return dummy;
}
+ FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
+
private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index f6be1917e57..7a83496b7fb 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -34,6 +34,8 @@
#include "common/status.h"
#include "gutil/port.h"
#include "inverted_index_fs_directory.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
@@ -1113,6 +1115,8 @@ Status SegmentWriter::finalize(uint64_t*
segment_file_size, uint64_t* index_size
}
// write data
RETURN_IF_ERROR(finalize_columns_data());
+ // Get the index start before finalize_footer since this function would
write new data.
+ uint64_t index_start = _file_writer->bytes_appended();
// write index
RETURN_IF_ERROR(finalize_columns_index(index_size));
// write footer
@@ -1122,6 +1126,17 @@ Status SegmentWriter::finalize(uint64_t*
segment_file_size, uint64_t* index_size
LOG(INFO) << "segment flush consumes a lot time_ns " <<
timer.elapsed_time()
<< ", segmemt_size " << *segment_file_size;
}
+ // When the cache type is not ttl(expiration time == 0), the data should
be split into normal cache queue
+ // and index cache queue
+ if (auto* cache_builder = _file_writer->cache_builder(); cache_builder !=
nullptr &&
+
cache_builder->_expiration_time == 0 &&
+
config::is_cloud_mode()) {
+ auto size = *index_size + *segment_file_size;
+ auto holder = cache_builder->allocate_cache_holder(index_start, size);
+ for (auto& segment : holder->file_blocks) {
+
static_cast<void>(segment->change_cache_type_self(io::FileCacheType::INDEX));
+ }
+ }
return Status::OK();
}
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 45186246006..49de1826104 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -113,6 +113,8 @@ public:
const Path& path() const override { return _local_file_writer->path(); }
+ io::FileCacheAllocatorBuilder* cache_builder() const override { return
nullptr; }
+
private:
std::unique_ptr<io::FileWriter> _local_file_writer;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]