This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 59ebbb351ee [feature](merge-cloud) Enable write into cache when
uploading file to s3 using s3 file writer (#24364)
59ebbb351ee is described below
commit 59ebbb351ee9164062147280a32543368d867126
Author: AlexYue <[email protected]>
AuthorDate: Mon Oct 16 21:31:02 2023 +0800
[feature](merge-cloud) Enable write into cache when uploading file to s3
using s3 file writer (#24364)
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/io/cache/block/block_file_cache.cpp | 8 +-
be/src/io/cache/block/block_file_cache.h | 20 +-
be/src/io/cache/block/block_file_segment.cpp | 30 ++
be/src/io/cache/block/block_file_segment.h | 4 +
be/src/io/cache/block/block_lru_file_cache.cpp | 17 +
be/src/io/cache/block/block_lru_file_cache.h | 3 +
be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 10 +-
be/src/io/fs/s3_file_bufferpool.cpp | 336 +++++++++++++++++
be/src/io/fs/s3_file_bufferpool.h | 356 ++++++++++++++++++
be/src/io/fs/s3_file_write_bufferpool.cpp | 109 ------
be/src/io/fs/s3_file_write_bufferpool.h | 150 --------
be/src/io/fs/s3_file_writer.cpp | 168 +++++++--
be/src/io/fs/s3_file_writer.h | 15 +-
be/src/io/io_common.h | 2 +
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 11 +-
be/src/service/doris_main.cpp | 1 -
be/test/io/fs/remote_file_system_test.cpp | 4 +-
be/test/io/fs/s3_file_writer_test.cpp | 479 +++++++++++++++++++++++++
21 files changed, 1411 insertions(+), 319 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 85344697772..852aba47856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1105,6 +1105,8 @@ DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
DEFINE_Bool(exit_on_exception, "false");
+// This config controls whether the s3 file writer would flush cache
asynchronously
+DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4207b354410..caca2255cb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1178,6 +1178,8 @@ DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_String(doris_cgroup_cpu_path);
DECLARE_Bool(enable_cpu_hard_limit);
+// This config controls whether the s3 file writer would flush cache
asynchronously
+DECLARE_Bool(enable_flush_file_cache_async);
// Remove predicate that is always true for a segment.
DECLARE_Bool(ignore_always_true_predicate_for_segment);
diff --git a/be/src/io/cache/block/block_file_cache.cpp
b/be/src/io/cache/block/block_file_cache.cpp
index 2b7b8cb1343..54df255abe5 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -20,6 +20,7 @@
#include "io/cache/block/block_file_cache.h"
+#include <fmt/core.h>
#include <glog/logging.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <sys/resource.h>
@@ -57,7 +58,7 @@ IFileCache::Key IFileCache::hash(const std::string& path) {
return Key(key);
}
-std::string IFileCache::cache_type_to_string(CacheType type) {
+std::string_view IFileCache::cache_type_to_string(CacheType type) {
switch (type) {
case CacheType::INDEX:
return "_idx";
@@ -65,6 +66,8 @@ std::string IFileCache::cache_type_to_string(CacheType type) {
return "_disposable";
case CacheType::NORMAL:
return "";
+ case CacheType::TTL:
+ return "_ttl";
}
return "";
}
@@ -83,8 +86,7 @@ CacheType IFileCache::string_to_cache_type(const std::string&
str) {
std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset,
CacheType type) const {
- return get_path_in_local_cache(key) + "/" +
- (std::to_string(offset) + cache_type_to_string(type));
+ return fmt::format("{}/{}{}", get_path_in_local_cache(key), offset,
cache_type_to_string(type));
}
std::string IFileCache::get_path_in_local_cache(const Key& key) const {
diff --git a/be/src/io/cache/block/block_file_cache.h
b/be/src/io/cache/block/block_file_cache.h
index e58e873af03..8c2a34c34f9 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -40,6 +40,7 @@
#include "io/fs/file_reader.h"
#include "io/io_common.h"
#include "util/hash_util.hpp"
+#include "util/lock.h"
#include "vec/common/uint128.h"
namespace doris {
@@ -54,22 +55,28 @@ enum CacheType {
INDEX,
NORMAL,
DISPOSABLE,
+ TTL,
};
struct CacheContext {
- CacheContext(const IOContext* io_ctx) {
- if (io_ctx->is_index_data) {
+ CacheContext(const IOContext* io_context) {
+ if (io_context->is_index_data) {
cache_type = CacheType::INDEX;
- } else if (io_ctx->is_disposable) {
+ } else if (io_context->is_disposable) {
cache_type = CacheType::DISPOSABLE;
+ } else if (io_context->expiration_time != 0) {
+ cache_type = CacheType::TTL;
+ expiration_time = io_context->expiration_time;
} else {
cache_type = CacheType::NORMAL;
}
- query_id = io_ctx->query_id ? *io_ctx->query_id : TUniqueId();
+ query_id = io_context->query_id ? *io_context->query_id : TUniqueId();
}
CacheContext() = default;
TUniqueId query_id;
CacheType cache_type;
+ int64_t expiration_time {0};
+ bool is_cold_data {false};
};
/**
@@ -139,7 +146,10 @@ public:
virtual size_t get_file_segments_num(CacheType type) const = 0;
- static std::string cache_type_to_string(CacheType type);
+ virtual void change_cache_type(const Key& key, size_t offset, CacheType
new_type,
+ std::lock_guard<doris::Mutex>& cache_lock)
= 0;
+
+ static std::string_view cache_type_to_string(CacheType type);
static CacheType string_to_cache_type(const std::string& str);
IFileCache& operator=(const IFileCache&) = delete;
diff --git a/be/src/io/cache/block/block_file_segment.cpp
b/be/src/io/cache/block/block_file_segment.cpp
index 3b179de343d..4e6b09908cd 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -196,6 +196,36 @@ Status FileBlock::read_at(Slice buffer, size_t
read_offset) {
return st;
}
+bool FileBlock::change_cache_type(CacheType new_type) {
+ std::unique_lock segment_lock(_mutex);
+ if (new_type == _cache_type) {
+ return true;
+ }
+ if (_download_state == State::DOWNLOADED) {
+ std::error_code ec;
+ std::filesystem::rename(get_path_in_local_cache(),
+ _cache->get_path_in_local_cache(key(),
offset(), new_type), ec);
+ if (ec) {
+ LOG(ERROR) << "change cache type failed due to rename error " <<
ec.message();
+ return false;
+ }
+ }
+ _cache_type = new_type;
+ return true;
+}
+
+Status FileBlock::change_cache_type_self(CacheType new_type) {
+ std::lock_guard cache_lock(_cache->_mutex);
+ std::unique_lock segment_lock(_mutex);
+ Status st = Status::OK();
+ if (_cache_type == CacheType::TTL || new_type == _cache_type) {
+ return st;
+ }
+ _cache_type = new_type;
+ _cache->change_cache_type(_file_key, _segment_range.left, new_type,
cache_lock);
+ return st;
+}
+
Status FileBlock::finalize_write() {
std::lock_guard segment_lock(_mutex);
diff --git a/be/src/io/cache/block/block_file_segment.h
b/be/src/io/cache/block/block_file_segment.h
index b462259931c..24a4e6e174e 100644
--- a/be/src/io/cache/block/block_file_segment.h
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -138,6 +138,10 @@ public:
std::string get_path_in_local_cache() const;
+ bool change_cache_type(CacheType new_type);
+
+ Status change_cache_type_self(CacheType new_type);
+
State state_unlock(std::lock_guard<std::mutex>&) const;
FileBlock& operator=(const FileBlock&) = delete;
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp
b/be/src/io/cache/block/block_lru_file_cache.cpp
index 03a291ba74d..0578c82e34e 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -1005,6 +1005,23 @@ size_t
LRUFileCache::get_file_segments_num_unlocked(CacheType cache_type,
return get_queue(cache_type).get_elements_num(cache_lock);
}
+void LRUFileCache::change_cache_type(const IFileCache::Key& key, size_t
offset, CacheType new_type,
+ std::lock_guard<doris::Mutex>&
cache_lock) {
+ if (auto iter = _files.find(key); iter != _files.end()) {
+ auto& file_blocks = iter->second;
+ if (auto cell_it = file_blocks.find(offset); cell_it !=
file_blocks.end()) {
+ FileBlockCell& cell = cell_it->second;
+ auto& cur_queue = get_queue(cell.cache_type);
+ cell.cache_type = new_type;
+ DCHECK(cell.queue_iterator.has_value());
+ cur_queue.remove(*cell.queue_iterator, cache_lock);
+ auto& new_queue = get_queue(new_type);
+ cell.queue_iterator =
+ new_queue.add(key, offset,
cell.file_block->range().size(), cache_lock);
+ }
+ }
+}
+
LRUFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block, CacheType
cache_type,
std::lock_guard<std::mutex>&
cache_lock)
: file_block(file_block), cache_type(cache_type) {
diff --git a/be/src/io/cache/block/block_lru_file_cache.h
b/be/src/io/cache/block/block_lru_file_cache.h
index 5a15b10ba2d..b75218c09e2 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -158,6 +158,9 @@ private:
void remove(FileBlockSPtr file_block, std::lock_guard<std::mutex>&
cache_lock,
std::lock_guard<std::mutex>& segment_lock) override;
+ void change_cache_type(const Key& key, size_t offset, CacheType new_type,
+ std::lock_guard<doris::Mutex>& cache_lock) override;
+
size_t get_available_cache_size(CacheType cache_type) const;
Status load_cache_info_into_memory(std::lock_guard<std::mutex>&
cache_lock);
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index 2a6ad2db3d4..0ca9edc530a 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -20,7 +20,7 @@
#include <fstream>
#include "io/fs/benchmark/benchmark_factory.hpp"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "io/fs/s3_file_bufferpool.h"
#include "util/cpu_info.h"
#include "util/threadpool.h"
@@ -114,13 +114,13 @@ int main(int argc, char** argv) {
int num_cores = doris::CpuInfo::num_cores();
// init s3 write buffer pool
- std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool;
-
static_cast<void>(doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+ std::unique_ptr<doris::ThreadPool> s3_file_upload_thread_pool;
+ static_cast<void>(doris::ThreadPoolBuilder("S3FileUploadThreadPool")
.set_min_threads(num_cores)
.set_max_threads(num_cores)
- .build(&buffered_reader_prefetch_thread_pool));
+ .build(&s3_file_upload_thread_pool));
doris::io::S3FileBufferPool* s3_buffer_pool =
doris::io::S3FileBufferPool::GetInstance();
- s3_buffer_pool->init(524288000, 5242880,
buffered_reader_prefetch_thread_pool.get());
+ s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get());
try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation,
std::stoi(FLAGS_threads),
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp
b/be/src/io/fs/s3_file_bufferpool.cpp
new file mode 100644
index 00000000000..b421bfb1f71
--- /dev/null
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+ if (_buffer.empty()) {
+ return;
+ }
+ S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(),
_capacity});
+ _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+ _buffer = other;
+ _capacity = _buffer.get_size();
+ other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder,
size_t offset,
+ OperationState state, bool reserve)
+ : _alloc_holder(std::move(alloc_holder)),
+ _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+ _offset(offset),
+ _size(0),
+ _state(std::move(state)),
+ _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+ _index_offset = offset;
+ if (_holder) {
+ bool change_to_index_cache = false;
+ for (auto iter = _holder->file_segments.begin(); iter !=
_holder->file_segments.end();
+ ++iter) {
+ if (iter == _cur_file_segment) {
+ change_to_index_cache = true;
+ }
+ if (change_to_index_cache) {
+
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+ }
+ }
+ }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+ Defer defer {[&] { _size += data.get_size(); }};
+ while (true) {
+ // if buf is not empty, it means there is memory preserved for this buf
+ if (!_buffer.empty()) {
+ std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(),
data.get_size());
+ break;
+ }
+ // if the buf has no memory reserved, then write to disk first
+ if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder
!= nullptr) {
+ _holder = _alloc_holder();
+ bool cache_is_not_enough = false;
+ for (auto& segment : _holder->file_segments) {
+ DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+ segment->state() == FileBlock::State::EMPTY);
+ if (segment->state() == FileBlock::State::SKIP_CACHE)
[[unlikely]] {
+ cache_is_not_enough = true;
+ break;
+ }
+ if (_index_offset != 0) {
+
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+ }
+ }
+ // if cache_is_not_enough, cannot use it !
+ _cur_file_segment = _holder->file_segments.begin();
+ _append_offset = (*_cur_file_segment)->range().left;
+ _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+ if (_holder) {
+ (*_cur_file_segment)->get_or_set_downloader();
+ }
+ _is_cache_allocated = true;
+ }
+ if (_holder) [[likely]] {
+ size_t data_remain_size = data.get_size();
+ size_t pos = 0;
+ while (data_remain_size != 0) {
+ auto range = (*_cur_file_segment)->range();
+ size_t segment_remain_size = range.right - _append_offset + 1;
+ size_t append_size = std::min(data_remain_size,
segment_remain_size);
+ Slice append_data(data.get_data() + pos, append_size);
+ // When there is no available free memory buffer, the data
will be written to the cache first
+ // and then uploaded to S3 when there is an available free
memory buffer.
+ // However, if an error occurs during the write process to the
local cache,
+ // continuing to upload the dirty data from the cache to S3
will result in erroneous data(Bad segment).
+ // Considering that local disk write failures are rare, a
simple approach is chosen here,
+ // which is to treat the import as a failure directly when a
local write failure occurs
+ RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+ if (segment_remain_size == append_size) {
+ RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+ if (++_cur_file_segment != _holder->file_segments.end()) {
+ (*_cur_file_segment)->get_or_set_downloader();
+ }
+ }
+ data_remain_size -= append_size;
+ _append_offset += append_size;
+ pos += append_size;
+ }
+ break;
+ } else {
+ // wait allocate buffer pool
+ auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+ swap_buffer(tmp);
+ }
+ }
+ return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+ auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+ swap_buffer(tmp);
+
+ DCHECK(_holder != nullptr);
+ DCHECK(_capacity >= _size);
+ size_t pos = 0;
+ for (auto& segment : _holder->file_segments) {
+ if (pos == _size) {
+ break;
+ }
+ if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+ set_val(std::move(s));
+ return;
+ }
+ size_t segment_size = segment->range().size();
+ Slice s(_buffer.get_data() + pos, segment_size);
+ if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+ set_val(std::move(st));
+ return;
+ }
+ pos += segment_size;
+ }
+
+ // the real lenght should be the buf.get_size() in this situation(consider
it's the last part,
+ // size of it could be less than 5MB)
+ _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(),
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+ if (!_buffer.empty()) [[likely]] {
+ _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(),
_size);
+ }
+ // If the data is written into file cache
+ if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+ if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok())
[[unlikely]] {
+ set_val(std::move(s));
+ return;
+ }
+ }
+
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+ [buf = this->shared_from_this(), this]() {
+ // to extend buf's lifetime
+ // (void)buf;
+ on_upload();
+ }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+ if (!config::enable_file_cache || _alloc_holder == nullptr) {
+ return;
+ }
+ if (_holder) {
+ return;
+ }
+ if (is_cancelled) {
+ return;
+ }
+ // the data is already written to S3 in this situation
+ // so i didn't handle the file cache write error
+ _holder = _alloc_holder();
+ size_t pos = 0;
+ size_t data_remain_size = _size;
+ for (auto& segment : _holder->file_segments) {
+ if (data_remain_size == 0) {
+ break;
+ }
+ size_t segment_size = segment->range().size();
+ size_t append_size = std::min(data_remain_size, segment_size);
+ if (segment->state() == FileBlock::State::EMPTY) {
+ if (_index_offset != 0 && segment->range().right >= _index_offset)
{
+ // segment->change_cache_type_self(CacheType::INDEX);
+ }
+ segment->get_or_set_downloader();
+ // Another thread may have started downloading due to a query
+ // Just skip putting to cache from UploadFileBuffer
+ if (segment->is_downloader()) {
+ Slice s(_buffer.get_data() + pos, append_size);
+ if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+ LOG_WARNING("append data to cache segmetn failed due to
{}", st);
+ return;
+ }
+ if (auto st = segment->finalize_write(); !st.ok())
[[unlikely]] {
+ LOG_WARNING("finalize write to cache segmetn failed due to
{}", st);
+ return;
+ }
+ }
+ }
+ data_remain_size -= append_size;
+ pos += append_size;
+ }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+ _type = type;
+ return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+ std::function<void(UploadFileBuffer& buf)> cb) {
+ _upload_cb = std::move(cb);
+ return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder&
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb)
{
+ _sync_after_complete_task = std::move(cb);
+ return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+ std::function<FileBlocksHolderPtr()> cb) {
+ _alloc_holder_cb = std::move(cb);
+ return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+ OperationState state(_sync_after_complete_task, _is_cancelled);
+ if (_type == BufferType::UPLOAD) {
+ return std::make_shared<UploadFileBuffer>(std::move(_upload_cb),
std::move(state), _offset,
+ std::move(_alloc_holder_cb),
_index_offset);
+ }
+ // should never come here
+ return nullptr;
+}
+
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t
s3_write_buffer_size,
+ ThreadPool* thread_pool) {
+ // the nums could be one configuration
+ size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+ DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
+ (s3_write_buffer_whole_size > s3_write_buffer_size))
+ << "s3 write buffer size " << s3_write_buffer_size << " whole s3
write buffer size "
+ << s3_write_buffer_whole_size;
+ LOG_INFO("S3 file buffer pool with {} buffers, each with {}", buf_num,
s3_write_buffer_size);
+ _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
+ for (size_t i = 0; i < buf_num; i++) {
+ Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
+ static_cast<size_t>(s3_write_buffer_size)};
+ _free_raw_buffers.emplace_back(s);
+ }
+ _thread_pool = thread_pool;
+}
+
+Slice S3FileBufferPool::allocate(bool reserve) {
+ Slice buf;
+ // if need reserve or no cache then we must ensure return buf with memory
preserved
+ if (reserve || !config::enable_file_cache) {
+ {
+ std::unique_lock<std::mutex> lck {_lock};
+ _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
+ buf = _free_raw_buffers.front();
+ _free_raw_buffers.pop_front();
+ }
+ return buf;
+ }
+ // try to get one memory reserved buffer
+ {
+ std::unique_lock<std::mutex> lck {_lock};
+ if (!_free_raw_buffers.empty()) {
+ buf = _free_raw_buffers.front();
+ _free_raw_buffers.pop_front();
+ }
+ }
+ if (!buf.empty()) {
+ return buf;
+ }
+ // if there is no free buffer and no need to reserve memory, we could
return one empty buffer
+ buf = Slice();
+ // if the buf has no memory reserved, it would try to write the data to
file cache first
+ // or it would try to rob buffer from other S3FileBuffer
+ return buf;
+}
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_bufferpool.h
b/be/src/io/fs/s3_file_bufferpool.h
new file mode 100644
index 00000000000..8dd61aac4de
--- /dev/null
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <cstdint>
+#include <fstream>
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_segment.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+
+namespace doris {
+namespace io {
+enum class BufferType { DOWNLOAD, UPLOAD };
+using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
+struct OperationState {
+ OperationState(std::function<bool(Status)> sync_after_complete_task,
+ std::function<bool()> is_cancelled)
+ : _sync_after_complete_task(std::move(sync_after_complete_task)),
+ _is_cancelled(std::move(is_cancelled)) {}
+ /**
+ * set the val of this operation state which indicates it failed or
succeeded
+ *
+ * @param S the execution result
+ */
+ void set_val(Status s = Status::OK()) {
+ // make sure we wouldn't sync twice
+ if (_value_set) [[unlikely]] {
+ return;
+ }
+ if (nullptr != _sync_after_complete_task) {
+ _fail_after_sync = _sync_after_complete_task(s);
+ }
+ _value_set = true;
+ }
+
+ /**
+ * detect whether the execution task is done
+ *
+ * @return is the execution task is done
+ */
+ [[nodiscard]] bool is_cancelled() const {
+ DCHECK(nullptr != _is_cancelled);
+ // If _fail_after_sync is true then it means the sync task already
returns
+ // that the task failed and if the outside file writer might already be
+ // destructed
+ return _fail_after_sync ? true : _is_cancelled();
+ }
+
+ std::function<bool(Status)> _sync_after_complete_task;
+ std::function<bool()> _is_cancelled;
+ bool _value_set = false;
+ bool _fail_after_sync = false;
+};
+
+struct FileBuffer : public std::enable_shared_from_this<FileBuffer> {
+ FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t
offset,
+ OperationState state, bool reserve = false);
+ virtual ~FileBuffer() { on_finish(); }
+ /**
+ * submit the correspoding task to async executor
+ */
+ virtual void submit() = 0;
+ /**
+ * append data to the inner memory buffer
+ *
+ * @param S the content to be appended
+ */
+ virtual Status append_data(const Slice& s) = 0;
+ /**
+ * call the reclaim callback when task is done
+ */
+ void on_finish();
+ /**
+ * swap memory buffer
+ *
+ * @param other which has memory buffer allocated
+ */
+ void swap_buffer(Slice& other);
+ /**
+ * set the val of it's operation state
+ *
+ * @param S the execution result
+ */
+ void set_val(Status s) { _state.set_val(s); }
+ /**
+ * get the start offset of this file buffer
+ *
+ * @return start offset of this file buffer
+ */
+ size_t get_file_offset() const { return _offset; }
+ /**
+ * get the size of the buffered data
+ *
+ * @return the size of the buffered data
+ */
+ size_t get_size() const { return _size; }
+ /**
+ * detect whether the execution task is done
+ *
+ * @return is the execution task is done
+ */
+ bool is_cancelled() const { return _state.is_cancelled(); }
+
+ std::function<FileBlocksHolderPtr()> _alloc_holder;
+ Slice _buffer;
+ size_t _offset;
+ size_t _size;
+ OperationState _state;
+ size_t _capacity;
+};
+
+struct UploadFileBuffer final : public FileBuffer {
+ UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb,
OperationState state,
+ size_t offset, std::function<FileBlocksHolderPtr()>
alloc_holder,
+ size_t index_offset)
+ : FileBuffer(alloc_holder, offset, state),
+ _upload_to_remote(std::move(upload_cb)),
+ _index_offset(index_offset) {}
+ ~UploadFileBuffer() override = default;
+ void submit() override;
+ /**
+ * set the index offset
+ *
+ * @param offset the index offset
+ */
+ void set_index_offset(size_t offset);
+ Status append_data(const Slice& s) override;
+ /**
+ * read the content from local file cache
+ * because previously lack of memory buffer
+ */
+ void read_from_cache();
+ /**
+ * write the content inside memory buffer into
+ * local file cache
+ */
+ void upload_to_local_file_cache(bool);
+ /**
+ * do the upload work
+ * 1. read from cache if the data is written to cache first
+ * 2. upload content of buffer to S3
+ * 3. upload content to file cache if necessary
+ * 4. call the finish callback caller specified
+ * 5. reclaim self
+ */
+ void on_upload() {
+ if (_buffer.empty()) {
+ read_from_cache();
+ }
+ _upload_to_remote(*this);
+ if (config::enable_flush_file_cache_async) {
+ // If we call is_cancelled() after _state.set_val() then there
might one situation where
+ // s3 file writer is already destructed
+ bool cancelled = is_cancelled();
+ _state.set_val();
+ // this control flow means the buf and the stream shares one memory
+ // so we can directly use buf here
+ upload_to_local_file_cache(cancelled);
+ } else {
+ upload_to_local_file_cache(is_cancelled());
+ _state.set_val();
+ }
+ on_finish();
+ }
+ /**
+ *
+ * @return the stream representing the inner memory buffer
+ */
+ std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
+
+ /**
+ * Currently only used for small file to set callback
+ */
+ void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) {
+ _upload_to_remote = std::move(cb);
+ }
+
+private:
+ std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr;
+ std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data()
+
+ bool _is_cache_allocated {false};
+ FileBlocksHolderPtr _holder;
+ decltype(_holder->file_segments.begin()) _cur_file_segment;
+ size_t _append_offset {0};
+ size_t _index_offset {0};
+};
+
+struct FileBufferBuilder {
+ FileBufferBuilder() = default;
+ ~FileBufferBuilder() = default;
+ /**
+ * build one file buffer using previously set properties
+ * @return the file buffer's base shared pointer
+ */
+ std::shared_ptr<FileBuffer> build();
+ /**
+ * set the file buffer type
+ *
+ * @param type enum class for buffer type
+ */
+ FileBufferBuilder& set_type(BufferType type);
+ /**
+ * set the download callback which would download the content on cloud into
file buffer
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_download_callback(std::function<Status(Slice&)> cb)
{
+ _download = std::move(cb);
+ return *this;
+ }
+ /**
+ * set the upload callback which would upload the content inside buffer
into remote storage
+ *
+ * @param cb
+ */
+ FileBufferBuilder&
set_upload_callback(std::function<void(UploadFileBuffer& buf)> cb);
+ /**
+ * set the callback which would do task sync for the caller
+ *
+ * @param cb
+ */
+ FileBufferBuilder&
set_sync_after_complete_task(std::function<bool(Status)> cb);
+ /**
+ * set the callback which detect whether the task is done
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_is_cancelled(std::function<bool()> cb) {
+ _is_cancelled = std::move(cb);
+ return *this;
+ }
+ /**
+ * set the callback which allocate file cache segment holder
+ * **Notice**: Because the load file cache workload coule be done
+ * asynchronously so you must make sure all the dependencies of this
+ * cb could last until this cb is invoked
+ * @param cb
+ */
+ FileBufferBuilder&
set_allocate_file_segments_holder(std::function<FileBlocksHolderPtr()> cb);
+ /**
+ * set the file offset of the file buffer
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_file_offset(size_t offset) {
+ _offset = offset;
+ return *this;
+ }
+ /**
+ * set the index offset of the file buffer
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_index_offset(size_t index_offset) {
+ _index_offset = index_offset;
+ return *this;
+ }
+ /**
+ * set the callback which write the content into local file cache
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_write_to_local_file_cache(
+ std::function<void(FileBlocksHolderPtr, Slice)> cb) {
+ _write_to_local_file_cache = std::move(cb);
+ return *this;
+ }
+ /**
+ * set the callback which would write the downloaded content into user's
buffer
+ *
+ * @param cb
+ */
+ FileBufferBuilder& set_write_to_use_buffer(std::function<void(Slice,
size_t)> cb) {
+ _write_to_use_buffer = std::move(cb);
+ return *this;
+ }
+
+ BufferType _type;
+ std::function<void(UploadFileBuffer& buf)> _upload_cb = nullptr;
+ std::function<bool(Status)> _sync_after_complete_task = nullptr;
+ std::function<FileBlocksHolderPtr()> _alloc_holder_cb = nullptr;
+ std::function<bool()> _is_cancelled = nullptr;
+ std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
+ std::function<Status(Slice&)> _download;
+ std::function<void(Slice, size_t)> _write_to_use_buffer;
+ size_t _offset;
+ size_t _index_offset;
+};
+
+class S3FileBufferPool {
+public:
+ S3FileBufferPool() = default;
+ ~S3FileBufferPool() = default;
+
+ // should be called one and only once
+ // at startup
+ void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+ doris::ThreadPool* thread_pool);
+
+ /**
+ *
+ * @return singleton of the S3FileBufferPool
+ */
+ static S3FileBufferPool* GetInstance() {
+ return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
+ }
+
+ void reclaim(Slice buf) {
+ std::unique_lock<std::mutex> lck {_lock};
+ _free_raw_buffers.emplace_front(buf);
+ _cv.notify_all();
+ }
+
+ /**
+ *
+ * @param reserve must return buffer with memory allocated
+ * @return memory buffer
+ */
+ Slice allocate(bool reserve = false);
+
+ ThreadPool* thread_pool() { return _thread_pool; }
+
+private:
+ std::mutex _lock;
+ std::condition_variable _cv;
+ std::unique_ptr<char[]> _whole_mem_buffer;
+ std::list<Slice> _free_raw_buffers;
+ // not owned
+ ThreadPool* _thread_pool = nullptr;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp
b/be/src/io/fs/s3_file_write_bufferpool.cpp
deleted file mode 100644
index 30b927a2fb9..00000000000
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "s3_file_write_bufferpool.h"
-
-#include <cstring>
-
-#include "common/config.h"
-#include "common/logging.h"
-#include "io/fs/s3_common.h"
-#include "runtime/exec_env.h"
-#include "util/defer_op.h"
-#include "util/threadpool.h"
-
-namespace doris {
-namespace io {
-void S3FileBuffer::on_finished() {
- if (_buf.empty()) {
- return;
- }
- reset();
- S3FileBufferPool::GetInstance()->reclaim(_buf);
- _buf.clear();
-}
-
-// when there is memory preserved, directly write data to buf
-// TODO:(AlexYue): write to file cache otherwise, then we'll wait for free
buffer
-// and to rob it
-void S3FileBuffer::append_data(const Slice& data) {
- Defer defer {[&] { _size += data.get_size(); }};
- while (true) {
- // if buf is not empty, it means there is memory preserved for this buf
- if (!_buf.empty()) {
- memcpy(_buf.data + _size, data.get_data(), data.get_size());
- break;
- } else {
- // wait allocate buffer pool
- auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
- rob_buffer(tmp);
- }
- }
-}
-
-void S3FileBuffer::submit() {
- if (LIKELY(!_buf.empty())) {
- _stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
- }
-
- static_cast<void>(
- _thread_pool->submit_func([buf = this->shared_from_this()]() {
buf->_on_upload(); }));
-}
-
-void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t
s3_write_buffer_size,
- doris::ThreadPool* thread_pool) {
- // the nums could be one configuration
- size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
- DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
- (s3_write_buffer_whole_size > s3_write_buffer_size));
- LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
- _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
- for (size_t i = 0; i < buf_num; i++) {
- Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
- static_cast<size_t>(s3_write_buffer_size)};
- _free_raw_buffers.emplace_back(s);
- }
- _thread_pool = thread_pool;
-}
-
-std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
- std::shared_ptr<S3FileBuffer> buf =
std::make_shared<S3FileBuffer>(_thread_pool);
- // if need reserve then we must ensure return buf with memory preserved
- if (reserve) {
- {
- std::unique_lock<std::mutex> lck {_lock};
- _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
- buf->reserve_buffer(_free_raw_buffers.front());
- _free_raw_buffers.pop_front();
- }
- return buf;
- }
- // try to get one memory reserved buffer
- {
- std::unique_lock<std::mutex> lck {_lock};
- if (!_free_raw_buffers.empty()) {
- buf->reserve_buffer(_free_raw_buffers.front());
- _free_raw_buffers.pop_front();
- }
- }
- // if there is no free buffer and no need to reserve memory, we could
return one empty buffer
- // if the buf has no memory reserved, it would try to write the data to
file cache first
- // or it would try to rob buffer from other S3FileBuffer
- return buf;
-}
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h
b/be/src/io/fs/s3_file_write_bufferpool.h
deleted file mode 100644
index 7e8bf01e19f..00000000000
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <cstdint>
-#include <fstream>
-#include <functional>
-#include <list>
-#include <memory>
-#include <mutex>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "io/fs/s3_common.h"
-#include "runtime/exec_env.h"
-#include "util/slice.h"
-
-namespace doris {
-class ThreadPool;
-namespace io {
-
-// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read
buffer
-struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
- using Callback = std::function<void()>;
-
- S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
- ~S3FileBuffer() = default;
-
- void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
- _buf = other->_buf;
- // we should clear other's memory buffer in case it woule be reclaimed
twice
- // when calling on_finished
- other->_buf.clear();
- }
-
- void reserve_buffer(Slice s) { _buf = s; }
-
- // append data into the memory buffer inside
- // or into the file cache if the buffer has no memory buffer
- void append_data(const Slice& data);
- // upload to S3 and file cache in async threadpool
- void submit();
- // set the callback to upload to S3 file
- void set_upload_remote_callback(Callback cb) { _upload_to_remote_callback
= std::move(cb); }
- // set callback to do task sync for the caller
- void set_finish_upload(Callback cb) { _on_finish_upload = std::move(cb); }
- // set cancel callback to indicate if the whole task is cancelled or not
- void set_is_cancel(std::function<bool()> cb) { _is_cancelled =
std::move(cb); }
- // set callback to notify all the tasks that the whole procedure could be
cancelled
- // if this buffer's task failed
- void set_on_failed(std::function<void(Status)> cb) { _on_failed =
std::move(cb); }
- // reclaim this buffer when task is done
- void on_finished();
- // set the status of the caller if task failed
- void set_status(Status s) { _status = std::move(s); }
- // get the size of the content already appendded
- size_t get_size() const { return _size; }
- // get the underlying stream containing
- const std::shared_ptr<std::iostream>& get_stream() const { return
_stream_ptr; }
- // get file offset corresponding to the buffer
- size_t get_file_offset() const { return _offset; }
- // set the offset of the buffer
- void set_file_offset(size_t offset) { _offset = offset; }
- // reset this buffer to be reused
- void reset() {
- _upload_to_remote_callback = nullptr;
- _is_cancelled = nullptr;
- _on_failed = nullptr;
- _on_finish_upload = nullptr;
- _offset = 0;
- _size = 0;
- }
-
- Callback _upload_to_remote_callback = nullptr;
- // to control the callback control flow
- // 1. read from cache if the data is written to cache first
- // 2. upload content of buffer to S3
- // 3. upload content to file cache if necessary
- // 4. call the finish callback caller specified
- // 5. reclaim self
- void _on_upload() {
- _upload_to_remote_callback();
- _on_finish_upload();
- on_finished();
- };
- // the caller might be cancelled
- std::function<bool()> _is_cancelled = []() { return false; };
- // set the caller to be failed
- std::function<void(Status)> _on_failed = nullptr;
- // caller of this buf could use this callback to do syncronization
- Callback _on_finish_upload = nullptr;
- Status _status;
- size_t _offset {0};
- size_t _size {0};
- std::shared_ptr<std::iostream> _stream_ptr;
- // only served as one reserved buffer
- Slice _buf;
- size_t _append_offset {0};
- // not owned
- ThreadPool* _thread_pool = nullptr;
-};
-
-class S3FileBufferPool {
-public:
- S3FileBufferPool() = default;
- ~S3FileBufferPool() = default;
-
- // should be called one and only once
- // at startup
- void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
- doris::ThreadPool* thread_pool);
-
- static S3FileBufferPool* GetInstance() {
- return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
- }
-
- void reclaim(Slice buf) {
- std::unique_lock<std::mutex> lck {_lock};
- _free_raw_buffers.emplace_front(buf);
- _cv.notify_all();
- }
-
- std::shared_ptr<S3FileBuffer> allocate(bool reserve = false);
-
-private:
- std::mutex _lock;
- std::condition_variable _cv;
- std::unique_ptr<char[]> _whole_mem_buffer;
- std::list<Slice> _free_raw_buffers;
- // not owned
- ThreadPool* _thread_pool = nullptr;
-};
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index b844e122a80..0c34a9cc5e7 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -45,10 +45,13 @@
#include "common/config.h"
#include "common/status.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_factory.h"
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
#include "io/fs/s3_file_system.h"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
@@ -85,11 +88,19 @@ S3FileWriter::S3FileWriter(std::string key,
std::shared_ptr<S3FileSystem> fs,
: FileWriter(fmt::format("s3://{}/{}", fs->s3_conf().bucket, key), fs),
_bucket(fs->s3_conf().bucket),
_key(std::move(key)),
- _client(fs->get_client()) {
+ _client(fs->get_client()),
+ _cache(nullptr),
+ _expiration_time(opts ? opts->file_cache_expiration : 0),
+ _is_cold_data(opts ? opts->is_cold_data : true),
+ _write_file_cache(opts ? opts->write_file_cache : false) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
Aws::Http::SetCompliantRfc3986Encoding(true);
+ if (config::enable_file_cache && _write_file_cache) {
+ _cache_key = IFileCache::hash(_path.filename().native());
+ _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
+ }
}
S3FileWriter::~S3FileWriter() {
@@ -100,11 +111,6 @@ S3FileWriter::~S3FileWriter() {
_bytes_written = 0;
}
s3_bytes_written_total << _bytes_written;
- CHECK(_closed) << ", closed: " << _closed;
- // in case there are task which might run after this object is destroyed
- // for example, if the whole task failed and some task are still pending
- // in threadpool
- _wait_until_finish("dtor");
s3_file_being_written << -1;
}
@@ -112,6 +118,12 @@ Status S3FileWriter::_create_multi_upload_request() {
CreateMultipartUploadRequest create_request;
create_request.WithBucket(_bucket).WithKey(_key);
create_request.SetContentType("application/octet-stream");
+ DBUG_EXECUTE_IF("s3_file_writer::_create_multi_upload_request", {
+ return Status::IOError(
+ "failed to create multipart upload(bucket={}, key={},
upload_id={}): injected "
+ "error",
+ _bucket, _path.native(), _upload_id);
+ });
auto outcome = _client->CreateMultipartUpload(create_request);
s3_bvar::s3_multi_part_upload_total << 1;
@@ -150,14 +162,14 @@ Status S3FileWriter::abort() {
}
// we need to reclaim the memory
if (_pending_buf) {
- _pending_buf->on_finished();
+ _pending_buf->on_finish();
_pending_buf = nullptr;
}
+ LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
// upload id is empty means there was no create multi upload
if (_upload_id.empty()) {
return Status::OK();
}
- VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
_wait_until_finish("Abort");
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
@@ -190,13 +202,18 @@ Status S3FileWriter::close() {
// it might be one file less than 5MB, we do upload here
if (_pending_buf != nullptr) {
if (_upload_id.empty()) {
- _pending_buf->set_upload_remote_callback(
- [this, buf = _pending_buf]() { _put_object(*buf); });
+ auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ DCHECK(buf != nullptr);
+ buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
}
_countdown_event.add_count();
_pending_buf->submit();
_pending_buf = nullptr;
}
+ DBUG_EXECUTE_IF("s3_file_writer::close", {
+ static_cast<void>(_complete());
+ return Status::InternalError("failed to close s3 file writer");
+ });
RETURN_IF_ERROR(_complete());
return Status::OK();
@@ -205,6 +222,8 @@ Status S3FileWriter::close() {
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
DCHECK(!_closed);
size_t buffer_size = config::s3_write_buffer_size;
+ DBUG_EXECUTE_IF("s3_file_writer::appendv",
+ { return Status::InternalError("failed to append data");
});
for (size_t i = 0; i < data_cnt; i++) {
size_t data_size = data[i].get_size();
for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos +=
data_size_to_append) {
@@ -212,23 +231,49 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
return _st;
}
if (!_pending_buf) {
- _pending_buf = S3FileBufferPool::GetInstance()->allocate();
- // capture part num by value along with the value of the
shared ptr
- _pending_buf->set_upload_remote_callback(
- [part_num = _cur_part_num, this, cur_buf =
_pending_buf]() {
- _upload_one_part(part_num, *cur_buf);
- });
- _pending_buf->set_file_offset(_bytes_appended);
- // later we might need to wait all prior tasks to be finished
- _pending_buf->set_finish_upload([this]() {
_countdown_event.signal(); });
- _pending_buf->set_is_cancel([this]() { return _failed.load();
});
- _pending_buf->set_on_failed([this, part_num =
_cur_part_num](Status st) {
- VLOG_NOTICE << "failed at key: " << _key << ", load part "
<< part_num
- << ", st " << st.to_string();
- std::unique_lock<std::mutex> _lck {_completed_lock};
- this->_st = std::move(st);
- _failed = true;
- });
+ auto builder = FileBufferBuilder();
+ builder.set_type(BufferType::UPLOAD)
+ .set_upload_callback(
+ [part_num = _cur_part_num,
this](UploadFileBuffer& buf) {
+ _upload_one_part(part_num, buf);
+ })
+ .set_file_offset(_bytes_appended)
+ .set_index_offset(_index_offset)
+ .set_sync_after_complete_task([this, part_num =
_cur_part_num](Status s) {
+ bool ret = false;
+ if (!s.ok()) [[unlikely]] {
+ VLOG_NOTICE << "failed at key: " << _key << ",
load part "
+ << part_num << ", st " << s;
+ std::unique_lock<std::mutex> _lck
{_completed_lock};
+ _failed = true;
+ ret = true;
+ this->_st = std::move(s);
+ }
+ // After the signal, there is a scenario where the
previous invocation of _wait_until_finish
+ // returns to the caller, and subsequently, the S3
file writer is destructed.
+ // This means that accessing _failed afterwards
would result in a heap use after free vulnerability.
+ _countdown_event.signal();
+ return ret;
+ })
+ .set_is_cancelled([this]() { return _failed.load(); });
+ if (_write_file_cache) {
+ // We would load the data into file cache asynchronously
which indicates
+ // that this instance of S3FileWriter might have been
destructed when we
+ // try to do writing into file cache, so we make the
lambda capture the variable
+ // we need by value to extend their lifetime
+ builder.set_allocate_file_segments_holder(
+ [cache = _cache, k = _cache_key, offset =
_bytes_appended,
+ t = _expiration_time, cold = _is_cold_data]() ->
FileBlocksHolderPtr {
+ CacheContext ctx;
+ ctx.cache_type = t == 0 ? CacheType::NORMAL :
CacheType::TTL;
+ ctx.expiration_time = t;
+ ctx.is_cold_data = cold;
+ auto holder = cache->get_or_set(k, offset,
+
config::s3_write_buffer_size, ctx);
+ return
std::make_unique<FileBlocksHolder>(std::move(holder));
+ });
+ }
+ _pending_buf = builder.build();
}
// we need to make sure all parts except the last one to be 5MB or
more
// and shouldn't be larger than buf
@@ -237,7 +282,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
// if the buffer has memory buf inside, the data would be written
into memory first then S3 then file cache
// it would be written to cache then S3 if the buffer doesn't have
memory preserved
- _pending_buf->append_data(Slice {data[i].get_data() + pos,
data_size_to_append});
+ RETURN_IF_ERROR(_pending_buf->append_data(
+ Slice {data[i].get_data() + pos, data_size_to_append}));
// if it's the last part, it could be less than 5MB, or it must
// satisfy that the size is larger than or euqal to 5MB
@@ -259,8 +305,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t
data_cnt) {
return Status::OK();
}
-void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
- if (buf._is_cancelled()) {
+void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
+ if (buf.is_cancelled()) {
return;
}
UploadPartRequest upload_request;
@@ -279,13 +325,25 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
S3FileBuffer& buf) {
s3_bvar::s3_multi_part_upload_total << 1;
UploadPartOutcome upload_part_outcome = upload_part_callable.get();
+ DBUG_EXECUTE_IF("s3_file_writer::_upload_one_part", {
+ if (part_num > 1) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ auto s = Status::IOError(
+ "failed to upload part (bucket={}, key={}, part_num={},
up_load_id={}): "
+ "injected error",
+ _bucket, _path.native(), part_num, _upload_id);
+ LOG_WARNING(s.to_string());
+ buf.set_val(s);
+ return;
+ }
+ });
if (!upload_part_outcome.IsSuccess()) {
auto s = Status::IOError(
"failed to upload part (bucket={}, key={}, part_num={},
up_load_id={}): {}",
_bucket, _path.native(), part_num, _upload_id,
upload_part_outcome.GetError().GetMessage());
LOG_WARNING(s.to_string());
- buf._on_failed(s);
+ buf.set_val(s);
return;
}
s3_bytes_written_total << buf.get_size();
@@ -315,16 +373,41 @@ Status S3FileWriter::_complete() {
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
_wait_until_finish("Complete");
+ DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
+ if (_failed || _completed_parts.size() != _cur_part_num) {
+ auto st = Status::IOError("error status {}, complete parts {}, cur
part num {}", _st,
+ _completed_parts.size(), _cur_part_num);
+ LOG(WARNING) << st;
+ _st = st;
+ return st;
+ }
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() <
p2->GetPartNumber(); });
+ DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
+ { _completed_parts.back()->SetPartNumber(10 *
_completed_parts.size()); });
CompletedMultipartUpload completed_upload;
- for (auto& part : _completed_parts) {
- completed_upload.AddParts(*part);
+ for (size_t i = 0; i < _completed_parts.size(); i++) {
+ if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
+ auto st = Status::IOError(
+ "error status {}, part num not continous, expected num {},
actual num {}", _st,
+ i + 1, _completed_parts[i]->GetPartNumber());
+ LOG(WARNING) << st;
+ _st = st;
+ return st;
+ }
+ completed_upload.AddParts(*_completed_parts[i]);
}
complete_request.WithMultipartUpload(completed_upload);
+ DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
+ auto s = Status::IOError(
+ "failed to create complete multi part upload (bucket={},
key={}): injected error",
+ _bucket, _path.native());
+ LOG_WARNING(s.to_string());
+ return s;
+ });
auto compute_outcome = _client->CompleteMultipartUpload(complete_request);
s3_bvar::s3_multi_part_upload_total << 1;
@@ -341,14 +424,17 @@ Status S3FileWriter::_complete() {
Status S3FileWriter::finalize() {
DCHECK(!_closed);
+ DBUG_EXECUTE_IF("s3_file_writer::finalize",
+ { return Status::IOError("failed to finalize due to
injected error"); });
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
- _pending_buf->set_upload_remote_callback(
- [this, buf = _pending_buf]() { _put_object(*buf); });
+ auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ DCHECK(buf != nullptr);
+ buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
}
_countdown_event.add_count();
_pending_buf->submit();
@@ -358,7 +444,7 @@ Status S3FileWriter::finalize() {
return _st;
}
-void S3FileWriter::_put_object(S3FileBuffer& buf) {
+void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!_closed) << "closed " << _closed;
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
@@ -367,6 +453,12 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
request.SetBody(buf.get_stream());
request.SetContentLength(buf.get_size());
request.SetContentType("application/octet-stream");
+ DBUG_EXECUTE_IF("s3_file_writer::_put_object", {
+ _st = Status::InternalError("failed to put object");
+ buf.set_val(_st);
+ LOG(WARNING) << _st;
+ return;
+ });
auto response = _client->PutObject(request);
s3_bvar::s3_put_total << 1;
if (!response.IsSuccess()) {
@@ -374,7 +466,7 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
response.GetError().GetExceptionName(),
response.GetError().GetMessage(),
static_cast<int>(response.GetError().GetResponseCode()));
- buf._on_failed(_st);
+ buf.set_val(_st);
LOG(WARNING) << _st;
return;
}
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 4fb64b2d00f..b8d53cf6482 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -27,6 +27,7 @@
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
namespace Aws::S3 {
namespace Model {
@@ -58,8 +59,8 @@ private:
void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
- void _put_object(S3FileBuffer& buf);
- void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
+ void _put_object(UploadFileBuffer& buf);
+ void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
std::string _bucket;
std::string _key;
@@ -68,20 +69,26 @@ private:
std::shared_ptr<Aws::S3::S3Client> _client;
std::string _upload_id;
+ size_t _index_offset {0};
// Current Part Num for CompletedPart
int _cur_part_num = 1;
std::mutex _completed_lock;
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>
_completed_parts;
+ IFileCache::Key _cache_key;
+ IFileCache* _cache;
// **Attention** call add_count() before submitting buf to async thread
pool
bthread::CountdownEvent _countdown_event {0};
std::atomic_bool _failed = false;
- Status _st = Status::OK();
+ Status _st;
size_t _bytes_written = 0;
- std::shared_ptr<S3FileBuffer> _pending_buf = nullptr;
+ std::shared_ptr<FileBuffer> _pending_buf;
+ int64_t _expiration_time;
+ bool _is_cold_data;
+ bool _write_file_cache;
};
} // namespace io
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index bef81e6256b..2d7766e714a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -53,6 +53,8 @@ struct IOContext {
bool is_disposable = false;
bool is_index_data = false;
bool read_file_cache = true;
+ // TODO(lightman): use following member variables to control file cache
+ bool is_persistent = false;
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 9fffd859ba3..0f7a2e64b17 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -162,6 +162,7 @@ public:
ThreadPool* buffered_reader_prefetch_thread_pool() {
return _buffered_reader_prefetch_thread_pool.get();
}
+ ThreadPool* s3_file_upload_thread_pool() { return
_s3_file_upload_thread_pool.get(); }
ThreadPool* send_report_thread_pool() { return
_send_report_thread_pool.get(); }
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
@@ -309,6 +310,8 @@ private:
std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
+ // Threadpool used to upload local file to s3
+ std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool;
// A token used to submit download cache task serially
std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
// Pool used by fragment manager to send profile or status to FE
coordinator
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 646c8fca2be..36502585426 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -39,7 +39,7 @@
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
#include "io/fs/file_meta_cache.h"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "io/fs/s3_file_bufferpool.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
@@ -173,6 +173,11 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_threads(64)
.build(&_buffered_reader_prefetch_thread_pool));
+ static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
+ .set_min_threads(16)
+ .set_max_threads(64)
+ .build(&_s3_file_upload_thread_pool));
+
// min num equal to fragment pool's min num
// max num is useless because it will start as many as requested in the
past
// queue size is useless because the max thread num is very large
@@ -244,7 +249,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
// S3 buffer pool
_s3_buffer_pool = new io::S3FileBufferPool();
_s3_buffer_pool->init(config::s3_write_buffer_whole_size,
config::s3_write_buffer_size,
- this->buffered_reader_prefetch_thread_pool());
+ this->s3_file_upload_thread_pool());
// Storage engine
doris::EngineOptions options;
@@ -548,6 +553,7 @@ void ExecEnv::destroy() {
_stream_load_executor.reset();
SAFE_STOP(_storage_engine);
SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+ SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
SAFE_SHUTDOWN(_join_node_thread_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
@@ -613,6 +619,7 @@ void ExecEnv::destroy() {
_join_node_thread_pool.reset(nullptr);
_send_report_thread_pool.reset(nullptr);
_buffered_reader_prefetch_thread_pool.reset(nullptr);
+ _s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 61759147e8c..33fe22b12e7 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -60,7 +60,6 @@
#include "common/signal_handler.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
-#include "io/fs/s3_file_write_bufferpool.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
diff --git a/be/test/io/fs/remote_file_system_test.cpp
b/be/test/io/fs/remote_file_system_test.cpp
index a24d4e3932b..c5d80d1b65d 100644
--- a/be/test/io/fs/remote_file_system_test.cpp
+++ b/be/test/io/fs/remote_file_system_test.cpp
@@ -410,11 +410,11 @@ TEST_F(RemoteFileSystemTest, TestHdfsFileSystem) {
TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
std::unique_ptr<ThreadPool> _pool;
- ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+ ThreadPoolBuilder("S3FileUploadThreadPool")
.set_min_threads(5)
.set_max_threads(10)
.build(&_pool);
- ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool =
std::move(_pool);
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
S3Conf s3_conf;
S3URI s3_uri(s3_location);
CHECK_STATUS_OK(s3_uri.parse());
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
new file mode 100644
index 00000000000..a1df6a1ea8b
--- /dev/null
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/CompletedPart.h>
+#include <aws/s3/model/UploadPartRequest.h>
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <cstdlib>
+#include <memory>
+#include <thread>
+
+#include "common/config.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/s3_file_bufferpool.h"
+#include "io/fs/s3_file_system.h"
+#include "io/io_common.h"
+#include "runtime/exec_env.h"
+#include "testutil/http_utils.h"
+#include "util/debug_points.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+namespace doris {
+
+static std::shared_ptr<io::S3FileSystem> s3_fs {nullptr};
+
+class S3FileWriterTest : public testing::Test {
+public:
+ static void SetUpTestSuite() {
+ S3Conf s3_conf;
+ config::enable_debug_points = true;
+ DebugPoints::instance()->clear();
+ s3_conf.ak = config::test_s3_ak;
+ s3_conf.sk = config::test_s3_sk;
+ s3_conf.endpoint = config::test_s3_endpoint;
+ s3_conf.region = config::test_s3_region;
+ s3_conf.bucket = config::test_s3_bucket;
+ s3_conf.prefix = "s3_file_writer_test";
+ static_cast<void>(
+ io::S3FileSystem::create(std::move(s3_conf),
"s3_file_writer_test", &s3_fs));
+ std::cout << "s3 conf: " << s3_conf.to_string() << std::endl;
+ ASSERT_EQ(Status::OK(), s3_fs->connect());
+
+ std::unique_ptr<doris::ThreadPool> _s3_file_upload_thread_pool;
+ static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
+ .set_min_threads(16)
+ .set_max_threads(64)
+ .build(&_s3_file_upload_thread_pool));
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool =
+ std::move(_s3_file_upload_thread_pool);
+ ExecEnv::GetInstance()->_s3_buffer_pool = new io::S3FileBufferPool();
+ io::S3FileBufferPool::GetInstance()->init(
+ config::s3_write_buffer_whole_size,
config::s3_write_buffer_size,
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool.get());
+ }
+
+ static void TearDownTestSuite() {
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown();
+ ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr;
+ delete ExecEnv::GetInstance()->_s3_buffer_pool;
+ ExecEnv::GetInstance()->_s3_buffer_pool = nullptr;
+ }
+
+private:
+};
+
+TEST_F(S3FileWriterTest, multi_part_io_error) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_upload_one_part");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_upload_one_part");
+ }};
+ auto client = s3_fs->get_client();
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ doris::Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(!s3_file_writer->finalize().ok());
+ // The second part would fail uploading itself to s3
+ // so the result of close should be not ok
+ ASSERT_TRUE(!s3_file_writer->close().ok());
+ bool exits = false;
+ auto s = s3_fs->exists("multi_part_io_error", &exits);
+ LOG(INFO) << "status is " << s;
+ ASSERT_TRUE(!exits);
+ }
+}
+
+TEST_F(S3FileWriterTest, put_object_io_error) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_put_object");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_put_object");
+ }};
+ auto client = s3_fs->get_client();
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("put_object_io_error",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ // Only upload 4MB to trigger put object operation
+ auto file_size = 4 * 1024 * 1024;
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(!s3_file_writer->finalize().ok());
+ // The object might be timeout but still succeed in loading
+ ASSERT_TRUE(!s3_file_writer->close().ok());
+ }
+}
+
+TEST_F(S3FileWriterTest, appendv_random_quit) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::appendv");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::appendv");
+ }};
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("appendv_random_quit",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_TRUE(!s3_file_writer->append(Slice(buf, bytes_read)).ok());
+ bool exits = false;
+ static_cast<void>(s3_fs->exists("appendv_random_quit", &exits));
+ ASSERT_TRUE(!exits);
+ }
+}
+
+TEST_F(S3FileWriterTest, multi_part_open_error) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 5 * 1024 * 1024;
+ POST_HTTP_TO_TEST_SERVER(
+
"/api/debug_point/add/s3_file_writer::_create_multi_upload_request");
+ Defer defer {[&]() {
+ POST_HTTP_TO_TEST_SERVER(
+
"/api/debug_point/remove/s3_file_writer::_create_multi_upload_request");
+ }};
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(),
+ s3_fs->create_file("multi_part_open_error", &s3_file_writer,
&state));
+
+ auto buf = std::make_unique<char[]>(buf_size);
+ Slice slice(buf.get(), buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ // Directly write 5MB would cause one create multi part upload request
+ // and it would be rejectd one error
+ auto st = s3_file_writer->append(Slice(buf.get(), bytes_read));
+ ASSERT_TRUE(!st.ok());
+ bool exits = false;
+ static_cast<void>(s3_fs->exists("multi_part_open_error", &exits));
+ ASSERT_TRUE(!exits);
+ }
+}
+
+TEST_F(S3FileWriterTest, normal) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("normal", &s3_file_writer,
&state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(s3_file_writer->finalize().ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->close());
+ int64_t s3_file_size = 0;
+ ASSERT_EQ(Status::OK(), s3_fs->file_size("normal", &s3_file_size));
+ ASSERT_EQ(s3_file_size, file_size);
+ }
+}
+
+TEST_F(S3FileWriterTest, smallFile) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("small", &s3_file_writer,
&state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(s3_file_writer->finalize().ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->close());
+ int64_t s3_file_size = 0;
+ ASSERT_EQ(Status::OK(), s3_fs->file_size("small", &s3_file_size));
+ ASSERT_EQ(s3_file_size, file_size);
+ }
+}
+
+TEST_F(S3FileWriterTest, close_error) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt",
&local_file_reader)
+ .ok());
+
+ POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::close");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::close");
+ }};
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_TRUE(s3_fs->create_file("close_error", &s3_file_writer,
&state).ok());
+ ASSERT_TRUE(!s3_file_writer->close().ok());
+ bool exits = false;
+ static_cast<void>(s3_fs->exists("close_error", &exits));
+ ASSERT_TRUE(!exits);
+ }
+}
+
+TEST_F(S3FileWriterTest, finalize_error) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+ io::FileReaderSPtr local_file_reader;
+
+
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt",
&local_file_reader)
+ .ok());
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("finalize_error",
&s3_file_writer, &state));
+
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::finalize");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::finalize");
+ }};
+
+ constexpr int buf_size = 8192;
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(!s3_file_writer->finalize().ok());
+ bool exits = false;
+ static_cast<void>(s3_fs->exists("finalize_error", &exits));
+ ASSERT_TRUE(!exits);
+ }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:2");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:2");
+ }};
+ auto client = s3_fs->get_client();
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(s3_file_writer->finalize().ok());
+ // The second part would fail uploading itself to s3
+ // so the result of close should be not ok
+ auto st = s3_file_writer->close();
+ ASSERT_TRUE(!st.ok());
+ std::cout << st << std::endl;
+ }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:1");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:1");
+ }};
+ auto client = s3_fs->get_client();
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(s3_file_writer->finalize().ok());
+ // The second part would fail uploading itself to s3
+ // so the result of close should be not ok
+ auto st = s3_file_writer->close();
+ ASSERT_TRUE(!st.ok());
+ std::cout << st << std::endl;
+ }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
+ doris::io::FileWriterOptions state;
+ auto fs = io::global_local_filesystem();
+ {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:3");
+ Defer defer {[&]() {
+
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:3");
+ }};
+ auto client = s3_fs->get_client();
+ io::FileReaderSPtr local_file_reader;
+
+ ASSERT_TRUE(
+ fs->open_file("./be/test/olap/test_data/all_types_100000.txt",
&local_file_reader)
+ .ok());
+
+ constexpr int buf_size = 8192;
+
+ io::FileWriterPtr s3_file_writer;
+ ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error",
&s3_file_writer, &state));
+
+ char buf[buf_size];
+ Slice slice(buf, buf_size);
+ size_t offset = 0;
+ size_t bytes_read = 0;
+ auto file_size = local_file_reader->size();
+ while (offset < file_size) {
+ ASSERT_TRUE(local_file_reader->read_at(offset, slice,
&bytes_read).ok());
+ ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf,
bytes_read)));
+ offset += bytes_read;
+ }
+ ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+ ASSERT_TRUE(s3_file_writer->finalize().ok());
+ // The second part would fail uploading itself to s3
+ // so the result of close should be not ok
+ auto st = s3_file_writer->close();
+ ASSERT_TRUE(!st.ok());
+ std::cout << st << std::endl;
+ }
+}
+
+} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]