This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 09bcedb116 [feature](merge-cloud) Remove deprecated old cache (#23881)
09bcedb116 is described below
commit 09bcedb116639e64e905dcd45cf536b1064ff4bf
Author: plat1ko <[email protected]>
AuthorDate: Wed Sep 6 08:07:05 2023 +0800
[feature](merge-cloud) Remove deprecated old cache (#23881)
* Remove deprecated old cache
---
be/src/common/config.cpp | 10 +-
be/src/common/config.h | 5 -
.../io/cache/block/cached_remote_file_reader.cpp | 38 +--
be/src/io/cache/block/cached_remote_file_reader.h | 7 +-
be/src/io/cache/dummy_file_cache.cpp | 104 -------
be/src/io/cache/dummy_file_cache.h | 102 ------
be/src/io/cache/file_cache.cpp | 210 -------------
be/src/io/cache/file_cache.h | 106 -------
be/src/io/cache/file_cache_manager.cpp | 256 ---------------
be/src/io/cache/file_cache_manager.h | 84 -----
be/src/io/cache/sub_file_cache.cpp | 342 ---------------------
be/src/io/cache/sub_file_cache.h | 117 -------
be/src/io/cache/whole_file_cache.cpp | 184 -----------
be/src/io/cache/whole_file_cache.h | 92 ------
be/src/io/file_factory.cpp | 16 +-
be/src/io/file_factory.h | 4 +-
be/src/io/fs/file_reader.h | 24 ++
be/src/io/fs/file_reader_options.cpp | 39 ---
be/src/io/fs/file_reader_options.h | 88 ------
be/src/io/fs/file_reader_writer_fwd.h | 2 +
be/src/io/fs/file_system.cpp | 11 +
be/src/io/fs/file_system.h | 13 +-
be/src/io/fs/file_writer.h | 11 +-
be/src/io/fs/local_file_system.cpp | 1 -
be/src/io/fs/local_file_system.h | 1 -
be/src/io/fs/remote_file_system.cpp | 26 +-
be/src/io/fs/remote_file_system.h | 1 -
be/src/olap/compaction.cpp | 2 +-
be/src/olap/olap_server.cpp | 26 --
be/src/olap/rowset/beta_rowset.cpp | 60 +---
be/src/olap/rowset/beta_rowset.h | 11 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 14 +-
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 1 -
be/src/olap/rowset/rowset.cpp | 5 +-
be/src/olap/rowset/rowset.h | 14 +-
be/src/olap/rowset/segment_v2/segment.cpp | 16 -
be/src/olap/rowset/segment_v2/segment.h | 4 -
be/src/olap/storage_engine.cpp | 3 +-
be/src/olap/storage_engine.h | 2 -
be/src/olap/tablet.cpp | 3 +-
be/src/service/backend_service.cpp | 5 +-
be/src/service/point_query_executor.cpp | 2 +-
be/src/service/point_query_executor.h | 2 +-
be/src/vec/core/block_spill_reader.cpp | 4 +-
be/test/io/cache/remote_file_cache_test.cpp | 192 ------------
be/test/olap/delete_bitmap_calculator_test.cpp | 5 +-
be/test/olap/rowset/rowset_tree_test.cpp | 2 +-
be/test/olap/tablet_cooldown_test.cpp | 45 +--
be/test/olap/tablet_meta_test.cpp | 2 +-
be/test/runtime/load_stream_test.cpp | 3 +-
be/test/testutil/mock_rowset.h | 12 +-
51 files changed, 131 insertions(+), 2198 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0bac22880d..2719046a03 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -882,20 +882,14 @@ DEFINE_mInt32(remove_unused_remote_files_interval_sec,
"21600"); // 6h
DEFINE_mInt32(confirm_unused_remote_files_interval_sec, "60");
DEFINE_Int32(cold_data_compaction_thread_num, "2");
DEFINE_mInt32(cold_data_compaction_interval_sec, "1800");
-DEFINE_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
DEFINE_Int32(concurrency_per_dir, "2");
-DEFINE_mInt64(cooldown_lag_time_sec, "10800"); // 3h
-DEFINE_mInt64(max_sub_cache_file_size, "104857600"); // 100MB
-DEFINE_mInt64(file_cache_alive_time_sec, "604800"); // 1 week
// file_cache_type is used to set the type of file cache for remote files.
// "": no cache, "sub_file_cache": split sub files from remote file.
// "whole_file_cache": the whole file.
DEFINE_mString(file_cache_type, "file_block_cache");
-DEFINE_Validator(file_cache_type, [](const std::string config) -> bool {
- return config == "sub_file_cache" || config == "whole_file_cache" ||
config == "" ||
- config == "file_block_cache";
+DEFINE_Validator(file_cache_type, [](std::string_view config) -> bool {
+ return config == "" || config == "file_block_cache";
});
-DEFINE_mInt64(file_cache_max_size_per_disk, "0"); // zero for no limit
DEFINE_Int32(s3_transfer_executor_pool_size, "2");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d83e012719..95c4d219e0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -933,16 +933,11 @@ DECLARE_mInt32(remove_unused_remote_files_interval_sec);
// 6h
DECLARE_mInt32(confirm_unused_remote_files_interval_sec);
DECLARE_Int32(cold_data_compaction_thread_num);
DECLARE_mInt32(cold_data_compaction_interval_sec);
-DECLARE_mInt64(generate_cache_cleaner_task_interval_sec); // 12 h
DECLARE_Int32(concurrency_per_dir);
-DECLARE_mInt64(cooldown_lag_time_sec); // 3h
-DECLARE_mInt64(max_sub_cache_file_size); // 100MB
-DECLARE_mInt64(file_cache_alive_time_sec); // 1 week
// file_cache_type is used to set the type of file cache for remote files.
// "": no cache, "sub_file_cache": split sub files from remote file.
// "whole_file_cache": the whole file.
DECLARE_mString(file_cache_type);
-DECLARE_mInt64(file_cache_max_size_per_disk); // zero for no limit
DECLARE_Int32(s3_transfer_executor_pool_size);
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index 4ee5b7970d..e2a629fd4c 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -42,27 +42,27 @@ namespace doris {
namespace io {
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
- const std::string& cache_path,
- const long modification_time)
+ const FileReaderOptions* opts)
: _remote_file_reader(std::move(remote_file_reader)) {
- // Use path and modification time to build cache key
- std::string unique_path = fmt::format("{}:{}", cache_path,
modification_time);
- _cache_key = IFileCache::hash(unique_path);
- _cache = FileCacheFactory::instance().get_by_path(_cache_key);
-}
-
-CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
- const std::string&
cache_base_path,
- const std::string& cache_path,
- const long modification_time)
- : _remote_file_reader(std::move(remote_file_reader)) {
- std::string unique_path = fmt::format("{}:{}", cache_path,
modification_time);
- _cache_key = IFileCache::hash(unique_path);
- _cache = FileCacheFactory::instance().get_by_path(cache_base_path);
- if (_cache == nullptr) {
- LOG(WARNING) << "Can't get cache from base path: " << cache_base_path
- << ", using random instead.";
+ DCHECK(opts) << remote_file_reader->path().native();
+ _is_doris_table = opts->is_doris_table;
+ if (_is_doris_table) {
+ _cache_key = IFileCache::hash(path().filename().native());
_cache = FileCacheFactory::instance().get_by_path(_cache_key);
+ } else {
+ // Use path and modification time to build cache key
+ std::string unique_path = fmt::format("{}:{}", path().native(),
opts->modification_time);
+ _cache_key = IFileCache::hash(unique_path);
+ if (!opts->cache_base_path.empty()) {
+ // from query session variable: file_cache_base_path
+ _cache =
FileCacheFactory::instance().get_by_path(opts->cache_base_path);
+ if (_cache == nullptr) {
+ LOG(WARNING) << "Can't get cache from base path: " <<
opts->cache_base_path
+ << ", using random instead.";
+ _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+ }
+ }
+ _cache = FileCacheFactory::instance().get_by_path(path().native());
}
}
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h
b/be/src/io/cache/block/cached_remote_file_reader.h
index 5e66f9970a..51e9e562a2 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.h
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -39,11 +39,7 @@ struct FileCacheStatistics;
class CachedRemoteFileReader final : public FileReader {
public:
- CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
std::string& cache_path,
- const long modification_time);
-
- CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
std::string& cache_base_path,
- const std::string& cache_path, const long
modification_time);
+ CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const
FileReaderOptions* opts);
~CachedRemoteFileReader() override;
@@ -69,6 +65,7 @@ private:
FileReaderSPtr _remote_file_reader;
IFileCache::Key _cache_key;
CloudFileCachePtr _cache;
+ bool _is_doris_table;
struct ReadStatistics {
bool hit_cache = true;
diff --git a/be/src/io/cache/dummy_file_cache.cpp
b/be/src/io/cache/dummy_file_cache.cpp
deleted file mode 100644
index 14dda6898a..0000000000
--- a/be/src/io/cache/dummy_file_cache.cpp
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/cache/dummy_file_cache.h"
-
-#include <time.h>
-
-#include <memory>
-#include <string>
-
-#include "io/fs/local_file_system.h"
-
-namespace doris {
-namespace io {
-
-DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec)
- : _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {}
-
-DummyFileCache::~DummyFileCache() = default;
-
-void DummyFileCache::_add_file_cache(const Path& data_file) {
- Path cache_file = _cache_dir / data_file;
- int64_t file_size = -1;
- time_t m_time = 0;
- if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok()
&&
- io::global_local_filesystem()->mtime(cache_file, &m_time).ok()) {
- _gc_lru_queue.push({cache_file, m_time});
- _cache_file_size += file_size;
- } else {
- _unfinished_files.push_back(cache_file);
- }
-}
-
-void DummyFileCache::_load() {
- std::vector<Path> cache_names;
- if (!_get_dir_files_and_remove_unfinished(_cache_dir, cache_names).ok()) {
- return;
- }
-
- for (const auto& file : cache_names) {
- _add_file_cache(file);
- }
-}
-
-Status DummyFileCache::load_and_clean() {
- _load();
- RETURN_IF_ERROR(_clean_unfinished_files(_unfinished_files));
- return _check_and_delete_empty_dir(_cache_dir);
-}
-
-Status DummyFileCache::clean_timeout_cache() {
- while (!_gc_lru_queue.empty() &&
- time(nullptr) - _gc_lru_queue.top().last_match_time >
_alive_time_sec) {
- size_t cleaned_size = 0;
- RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file,
&cleaned_size));
- _cache_file_size -= cleaned_size;
- _gc_lru_queue.pop();
- }
- return Status::OK();
-}
-
-Status DummyFileCache::clean_all_cache() {
- while (!_gc_lru_queue.empty()) {
- RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file,
nullptr));
- _gc_lru_queue.pop();
- }
- _cache_file_size = 0;
- return _check_and_delete_empty_dir(_cache_dir);
-}
-
-Status DummyFileCache::clean_one_cache(size_t* cleaned_size) {
- if (!_gc_lru_queue.empty()) {
- const auto& cache = _gc_lru_queue.top();
- RETURN_IF_ERROR(_clean_cache_internal(cache.file, cleaned_size));
- _cache_file_size -= *cleaned_size;
- _gc_lru_queue.pop();
- }
- if (_gc_lru_queue.empty()) {
- RETURN_IF_ERROR(_check_and_delete_empty_dir(_cache_dir));
- }
- return Status::OK();
-}
-
-Status DummyFileCache::_clean_cache_internal(const Path& cache_file_path,
size_t* cleaned_size) {
- Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX;
- return _remove_cache_and_done(cache_file_path, done_file_path,
cleaned_size);
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/dummy_file_cache.h
b/be/src/io/cache/dummy_file_cache.h
deleted file mode 100644
index 46d21d99a2..0000000000
--- a/be/src/io/cache/dummy_file_cache.h
+++ /dev/null
@@ -1,102 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <filesystem>
-#include <queue>
-#include <vector>
-
-#include "common/status.h"
-#include "io/cache/file_cache.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
-
-namespace doris {
-namespace io {
-class IOContext;
-
-// Only used for GC
-class DummyFileCache final : public FileCache {
-public:
- DummyFileCache(const Path& cache_dir, int64_t alive_time_sec);
-
- ~DummyFileCache() override;
-
- Status close() override { return Status::OK(); }
-
- const Path& path() const override { return _cache_dir; }
-
- size_t size() const override { return 0; }
-
- bool closed() const override { return true; }
-
- const Path& cache_dir() const override { return _cache_dir; }
-
- io::FileReaderSPtr remote_file_reader() const override { return nullptr; }
-
- Status clean_timeout_cache() override;
-
- Status clean_all_cache() override;
-
- Status clean_one_cache(size_t* cleaned_size) override;
-
- Status load_and_clean();
-
- bool is_dummy_file_cache() override { return true; }
-
- int64_t get_oldest_match_time() const override {
- return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
- }
-
- bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
-
- FileSystemSPtr fs() const override { return nullptr; }
-
-protected:
- Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
- const IOContext* io_ctx) override {
- return Status::NotSupported("dummy file cache only used for GC");
- }
-
-private:
- void _add_file_cache(const Path& data_file);
- void _load();
- Status _clean_cache_internal(const Path&, size_t*);
-
-private:
- struct DummyFileInfo {
- Path file;
- int64_t last_match_time;
- };
- using DummyGcQueue = std::priority_queue<DummyFileInfo,
std::vector<DummyFileInfo>,
-
SubFileLRUComparator<DummyFileInfo>>;
- DummyGcQueue _gc_lru_queue;
-
- Path _cache_dir;
- int64_t _alive_time_sec;
-
- std::vector<Path> _unfinished_files;
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp
deleted file mode 100644
index 38ea672c82..0000000000
--- a/be/src/io/cache/file_cache.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/cache/file_cache.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-#include <string.h>
-
-#include <algorithm>
-#include <filesystem>
-#include <list>
-#include <ostream>
-#include <set>
-#include <utility>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "gutil/strings/util.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/local_file_system.h"
-#include "runtime/exec_env.h"
-#include "util/string_util.h"
-
-namespace doris {
-using namespace ErrorCode;
-namespace io {
-
-Status FileCache::download_cache_to_local(const Path& cache_file, const Path&
cache_done_file,
- io::FileReaderSPtr
remote_file_reader, size_t req_size,
- size_t offset) {
- LOG(INFO) << "Download cache file from remote file: " <<
remote_file_reader->path().native()
- << " -> " << cache_file.native() << ". offset: " << offset
- << ", request size: " << req_size;
- io::FileWriterPtr file_writer;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->create_file(cache_file,
&file_writer),
- fmt::format("Create local cache file failed: {}",
cache_file.native()));
- auto func = [cache_file, cache_done_file, remote_file_reader, req_size,
- offset](io::FileWriter* file_writer) {
- char* file_buf = ExecEnv::GetInstance()->get_download_cache_buf(
-
ExecEnv::GetInstance()->get_serial_download_cache_thread_token());
- size_t count_bytes_read = 0;
- size_t need_req_size = config::download_cache_buffer_size;
- while (count_bytes_read < req_size) {
- memset(file_buf, 0, need_req_size);
- if (req_size - count_bytes_read <
config::download_cache_buffer_size) {
- need_req_size = req_size - count_bytes_read;
- }
- Slice file_slice(file_buf, need_req_size);
- size_t bytes_read = 0;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- remote_file_reader->read_at(offset + count_bytes_read,
file_slice, &bytes_read),
- fmt::format("read remote file failed. {}. offset: {},
request size: {}",
- remote_file_reader->path().native(), offset +
count_bytes_read,
- need_req_size));
- if (bytes_read != need_req_size) {
- return Status::Error<OS_ERROR>(
- "read remote file failed: {}, bytes read: {} vs need
read size: {}",
- remote_file_reader->path().native(), bytes_read,
need_req_size);
- }
- count_bytes_read += bytes_read;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- file_writer->append(file_slice),
- fmt::format("Write local cache file failed: {}",
cache_file.native()));
- }
- return Status::OK();
- };
- auto st = func(file_writer.get());
- if (!st.ok()) {
- WARN_IF_ERROR(file_writer->close(),
- fmt::format("Close local cache file failed: {}",
cache_file.native()));
- return st;
- }
- RETURN_NOT_OK_STATUS_WITH_WARN(
- file_writer->close(),
- fmt::format("Close local cache file failed: {}",
cache_file.native()));
- io::FileWriterPtr done_file_writer;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->create_file(cache_done_file,
&done_file_writer),
- fmt::format("Create local done file failed: {}",
cache_done_file.native()));
- RETURN_NOT_OK_STATUS_WITH_WARN(
- done_file_writer->close(),
- fmt::format("Close local done file failed: {}",
cache_done_file.native()));
- return Status::OK();
-}
-
-Status FileCache::_remove_file(const Path& file, size_t* cleaned_size) {
- bool cache_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(file,
&cache_file_exist),
- "Check local cache file exist failed.");
- int64_t file_size = -1;
- if (cache_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->file_size(file, &file_size),
- fmt::format("get local cache file size failed: {}",
file.native()));
-
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->delete_file(file),
- fmt::format("Delete local cache file failed: {}",
file.native()));
- LOG(INFO) << "Delete local cache file successfully: " << file.native()
- << ", file size: " << file_size;
- }
- if (cleaned_size) {
- *cleaned_size = file_size;
- }
- return Status::OK();
-}
-
-Status FileCache::_remove_cache_and_done(const Path& cache_file, const Path&
cache_done_file,
- size_t* cleaned_size) {
- RETURN_IF_ERROR(_remove_file(cache_done_file, nullptr));
- RETURN_IF_ERROR(_remove_file(cache_file, cleaned_size));
- return Status::OK();
-}
-
-Status FileCache::_get_dir_files_and_remove_unfinished(const Path& cache_dir,
- std::vector<Path>&
cache_names) {
- bool cache_dir_exist = true;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_dir, &cache_dir_exist),
- fmt::format("Check local cache dir exist failed. {}",
cache_dir.native()));
- if (!cache_dir_exist) {
- return Status::OK();
- }
-
- // list all files
- std::vector<FileInfo> cache_files;
- bool exists = true;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->list(cache_dir, true, &cache_files,
&exists),
- fmt::format("List dir failed: {}", cache_dir.native()))
-
- // separate DATA file and DONE file
- std::set<Path> cache_names_temp;
- std::list<Path> done_names_temp;
- for (auto& cache_file : cache_files) {
- if (ends_with(cache_file.file_name, CACHE_DONE_FILE_SUFFIX)) {
- done_names_temp.push_back(cache_file.file_name);
- } else {
- cache_names_temp.insert(cache_file.file_name);
- }
- }
-
- // match DONE file with DATA file
- for (auto done_file : done_names_temp) {
- Path cache_filename = StringReplace(done_file.native(),
CACHE_DONE_FILE_SUFFIX, "", true);
- if (auto cache_iter = cache_names_temp.find(cache_filename);
- cache_iter != cache_names_temp.end()) {
- cache_names_temp.erase(cache_iter);
- cache_names.push_back(std::move(cache_filename));
- } else {
- // not data file, but with DONE file
- RETURN_IF_ERROR(_remove_file(done_file, nullptr));
- }
- }
- // data file without DONE file
- for (auto& file : cache_names_temp) {
- RETURN_IF_ERROR(_remove_file(file, nullptr));
- }
- return Status::OK();
-}
-
-Status FileCache::_clean_unfinished_files(const std::vector<Path>&
unfinished_files) {
- // remove cache file without done file
- for (auto file : unfinished_files) {
- RETURN_IF_ERROR(_remove_file(file, nullptr));
- }
- return Status::OK();
-}
-
-Status FileCache::_check_and_delete_empty_dir(const Path& cache_dir) {
- bool cache_dir_exist = true;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_dir, &cache_dir_exist),
- fmt::format("Check local cache dir exist failed. {}",
cache_dir.native()));
- if (!cache_dir_exist) {
- return Status::OK();
- }
-
- std::vector<FileInfo> cache_files;
- bool exists = true;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->list(cache_dir, true, &cache_files,
&exists),
- fmt::format("List dir failed: {}", cache_dir.native()));
- if (cache_files.empty()) {
-
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->delete_directory(cache_dir),
- fmt::format("Delete dir failed: {}",
cache_dir.native()));
- LOG(INFO) << "Delete empty dir: " << cache_dir.native();
- }
- return Status::OK();
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h
deleted file mode 100644
index 2b2156ea46..0000000000
--- a/be/src/io/cache/file_cache.h
+++ /dev/null
@@ -1,106 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <butil/macros.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "common/status.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
-
-namespace doris {
-namespace io {
-class IOContext;
-
-const std::string CACHE_DONE_FILE_SUFFIX = "_DONE";
-
-class FileCache : public FileReader {
-public:
- FileCache() : _cache_file_size(0) {}
- ~FileCache() override = default;
-
- DISALLOW_COPY_AND_ASSIGN(FileCache);
-
- virtual const Path& cache_dir() const = 0;
-
- size_t cache_file_size() const { return _cache_file_size; }
-
- virtual io::FileReaderSPtr remote_file_reader() const = 0;
-
- virtual Status clean_timeout_cache() = 0;
-
- virtual Status clean_all_cache() = 0;
-
- virtual Status clean_one_cache(size_t* cleaned_size) = 0;
-
- virtual bool is_gc_finish() const = 0;
-
- virtual bool is_dummy_file_cache() { return false; }
-
- Status download_cache_to_local(const Path& cache_file, const Path&
cache_done_file,
- io::FileReaderSPtr remote_file_reader,
size_t req_size,
- size_t offset = 0);
-
- virtual int64_t get_oldest_match_time() const = 0;
-
-protected:
- Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
- const IOContext* io_ctx) override {
- return Status::NotSupported("dummy file cache only used for GC");
- }
-
- Status _remove_file(const Path& file, size_t* cleaned_size);
-
- Status _remove_cache_and_done(const Path& cache_file, const Path&
cache_done_file,
- size_t* cleaned_size);
-
- Status _get_dir_files_and_remove_unfinished(const Path& cache_dir,
- std::vector<Path>&
cache_names);
-
- Status _clean_unfinished_files(const std::vector<Path>& unfinished_files);
-
- Status _check_and_delete_empty_dir(const Path& cache_dir);
-
- template <typename T>
- struct SubFileLRUComparator {
- bool operator()(const T& lhs, const T& rhs) const {
- return lhs.last_match_time > rhs.last_match_time;
- }
- };
-
- size_t _cache_file_size;
-};
-
-using FileCachePtr = std::shared_ptr<FileCache>;
-
-struct FileCacheLRUComparator {
- bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const {
- return lhs->get_oldest_match_time() > rhs->get_oldest_match_time();
- }
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.cpp
b/be/src/io/cache/file_cache_manager.cpp
deleted file mode 100644
index f581b7d651..0000000000
--- a/be/src/io/cache/file_cache_manager.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/cache/file_cache_manager.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-#include <stddef.h>
-
-#include <memory>
-#include <mutex>
-#include <ostream>
-#include <utility>
-
-#include "common/config.h"
-#include "io/cache/dummy_file_cache.h"
-#include "io/cache/sub_file_cache.h"
-#include "io/cache/whole_file_cache.h"
-#include "io/fs/file_reader_options.h"
-#include "io/fs/file_system.h"
-#include "io/fs/local_file_system.h"
-#include "olap/data_dir.h"
-#include "olap/rowset/beta_rowset.h"
-#include "olap/storage_engine.h"
-#include "olap/tablet.h"
-#include "olap/tablet_manager.h"
-
-namespace doris {
-namespace io {
-
-void GCContextPerDisk::init(const std::string& path, int64_t max_size) {
- _disk_path = path;
- _conf_max_size = max_size;
- _used_size = 0;
-}
-
-bool GCContextPerDisk::try_add_file_cache(FileCachePtr cache, int64_t
file_size) {
- if (cache->cache_dir().string().substr(0, _disk_path.size()) ==
_disk_path) {
- _lru_queue.push(cache);
- _used_size += file_size;
- return true;
- }
- return false;
-}
-
-FileCachePtr GCContextPerDisk::top() {
- if (!_lru_queue.empty() && _used_size > _conf_max_size) {
- return _lru_queue.top();
- }
- return nullptr;
-}
-
-void GCContextPerDisk::pop() {
- if (!_lru_queue.empty()) {
- _lru_queue.pop();
- }
-}
-
-Status GCContextPerDisk::gc_top() {
- if (!_lru_queue.empty() && _used_size > _conf_max_size) {
- auto file_cache = _lru_queue.top();
- size_t cleaned_size = 0;
- RETURN_IF_ERROR(file_cache->clean_one_cache(&cleaned_size));
- _used_size -= cleaned_size;
- _lru_queue.pop();
- if (!file_cache->is_gc_finish()) {
- _lru_queue.push(file_cache);
- }
- }
- return Status::OK();
-}
-
-void FileCacheManager::add_file_cache(const std::string& cache_path,
FileCachePtr file_cache) {
- std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
- _file_cache_map.emplace(cache_path, file_cache);
-}
-
-void FileCacheManager::remove_file_cache(const std::string& cache_path) {
- bool cache_path_exist = false;
- {
- std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
- if (_file_cache_map.find(cache_path) == _file_cache_map.end()) {
- bool cache_dir_exist = false;
- if (global_local_filesystem()->exists(cache_path,
&cache_dir_exist).ok() &&
- cache_dir_exist) {
- Status st =
global_local_filesystem()->delete_directory(cache_path);
- if (!st.ok()) {
- LOG(WARNING) << st.to_string();
- }
- }
- } else {
- cache_path_exist = true;
- _file_cache_map.find(cache_path)->second->clean_all_cache();
- }
- }
- if (cache_path_exist) {
- std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
- _file_cache_map.erase(cache_path);
- }
-}
-
-void
FileCacheManager::_add_file_cache_for_gc_by_disk(std::vector<GCContextPerDisk>&
contexts,
- FileCachePtr file_cache)
{
- // sort file cache by last match time
- if (config::file_cache_max_size_per_disk > 0) {
- auto file_size = file_cache->cache_file_size();
- if (file_size <= 0) {
- return;
- }
- for (size_t i = 0; i < contexts.size(); ++i) {
- if (contexts[i].try_add_file_cache(file_cache, file_size)) {
- break;
- }
- }
- }
-}
-
-void FileCacheManager::_gc_unused_file_caches(std::list<FileCachePtr>& result)
{
- std::vector<TabletSharedPtr> tablets =
- StorageEngine::instance()->tablet_manager()->get_all_tablet();
- bool exists = true;
- for (const auto& tablet : tablets) {
- std::vector<FileInfo> seg_files;
- if (io::global_local_filesystem()
- ->list(tablet->tablet_path(), true, &seg_files, &exists)
- .ok()) {
- for (auto& seg_file : seg_files) {
- std::string seg_filename = seg_file.file_name;
- // check if it is a dir name
- if (!BetaRowset::is_segment_cache_dir(seg_filename)) {
- continue;
- }
- // skip file cache already in memory
- std::stringstream ss;
- ss << tablet->tablet_path() << "/" << seg_filename;
- std::string cache_path = ss.str();
-
- std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
- if (_file_cache_map.find(cache_path) != _file_cache_map.end())
{
- continue;
- }
- auto file_cache = std::make_shared<DummyFileCache>(
- cache_path, config::file_cache_alive_time_sec);
- // load cache meta from disk and clean unfinished cache files
- file_cache->load_and_clean();
- // policy1: GC file cache by timeout
- file_cache->clean_timeout_cache();
-
- result.push_back(file_cache);
- }
- }
- }
-}
-
-void FileCacheManager::gc_file_caches() {
- int64_t gc_conf_size = config::file_cache_max_size_per_disk;
- std::vector<GCContextPerDisk> contexts;
- // init for GC by disk size
- if (gc_conf_size > 0) {
- std::vector<DataDir*> data_dirs =
doris::StorageEngine::instance()->get_stores();
- contexts.resize(data_dirs.size());
- for (size_t i = 0; i < contexts.size(); ++i) {
- contexts[i].init(data_dirs[i]->path(), gc_conf_size);
- }
- }
-
- // process unused file caches
- std::list<FileCachePtr> dummy_file_list;
- _gc_unused_file_caches(dummy_file_list);
-
- {
- std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
- for (auto item : dummy_file_list) {
- // check again after _cache_map_lock hold
- if (_file_cache_map.find(item->cache_dir().native()) !=
_file_cache_map.end()) {
- continue;
- }
- // sort file cache by last match time
- _add_file_cache_for_gc_by_disk(contexts, item);
- }
-
- // process file caches in memory
- for (std::map<std::string, FileCachePtr>::const_iterator iter =
_file_cache_map.cbegin();
- iter != _file_cache_map.cend(); ++iter) {
- if (iter->second == nullptr) {
- continue;
- }
- // policy1: GC file cache by timeout
- iter->second->clean_timeout_cache();
- // sort file cache by last match time
- _add_file_cache_for_gc_by_disk(contexts, iter->second);
- }
- }
-
- // policy2: GC file cache by disk size
- if (gc_conf_size > 0) {
- for (size_t i = 0; i < contexts.size(); ++i) {
- auto& context = contexts[i];
- FileCachePtr file_cache;
- while ((file_cache = context.top()) != nullptr) {
- {
- std::shared_lock<std::shared_mutex>
rdlock(_cache_map_lock);
- // for dummy file cache, check already used or not again
- if (file_cache->is_dummy_file_cache() &&
- _file_cache_map.find(file_cache->cache_dir().native())
!=
- _file_cache_map.end()) {
- context.pop();
- continue;
- }
- }
- WARN_IF_ERROR(context.gc_top(),
- fmt::format("gc {} error",
file_cache->cache_dir().native()));
- }
- }
- }
-}
-
-FileCachePtr FileCacheManager::new_file_cache(const std::string& cache_dir,
int64_t alive_time_sec,
- io::FileReaderSPtr
remote_file_reader,
- io::FileCachePolicy cache_type) {
- switch (cache_type) {
- case io::FileCachePolicy::WHOLE_FILE_CACHE:
- return std::make_unique<WholeFileCache>(cache_dir, alive_time_sec,
remote_file_reader);
- case io::FileCachePolicy::SUB_FILE_CACHE:
- return std::make_unique<SubFileCache>(cache_dir, alive_time_sec,
remote_file_reader);
- default:
- return nullptr;
- }
-}
-
-bool FileCacheManager::exist(const std::string& cache_path) {
- std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
- return _file_cache_map.find(cache_path) != _file_cache_map.end();
-}
-
-FileCacheManager* FileCacheManager::instance() {
- static FileCacheManager cache_manager;
- return &cache_manager;
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.h
b/be/src/io/cache/file_cache_manager.h
deleted file mode 100644
index 7ac5e652d4..0000000000
--- a/be/src/io/cache/file_cache_manager.h
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <list>
-#include <map>
-#include <queue>
-#include <shared_mutex>
-#include <string>
-#include <vector>
-
-#include "common/status.h"
-#include "io/cache/file_cache.h"
-#include "io/fs/file_reader_writer_fwd.h"
-
-namespace doris {
-namespace io {
-enum class FileCachePolicy : uint8_t;
-
-class GCContextPerDisk {
-public:
- GCContextPerDisk() : _conf_max_size(0), _used_size(0) {}
- void init(const std::string& path, int64_t max_size);
- bool try_add_file_cache(FileCachePtr cache, int64_t file_size);
- FileCachePtr top();
- Status gc_top();
- void pop();
-
-private:
- std::string _disk_path;
- int64_t _conf_max_size;
- int64_t _used_size;
- std::priority_queue<FileCachePtr, std::vector<FileCachePtr>,
FileCacheLRUComparator> _lru_queue;
-};
-
-class FileCacheManager {
-public:
- FileCacheManager() = default;
- ~FileCacheManager() = default;
-
- static FileCacheManager* instance();
-
- void add_file_cache(const std::string& cache_path, FileCachePtr
file_cache);
-
- void remove_file_cache(const std::string& cache_path);
-
- void gc_file_caches();
-
- FileCachePtr new_file_cache(const std::string& cache_dir, int64_t
alive_time_sec,
- io::FileReaderSPtr remote_file_reader,
- io::FileCachePolicy cache_type);
-
- bool exist(const std::string& cache_path);
-
-private:
- void _gc_unused_file_caches(std::list<FileCachePtr>& result);
- void _add_file_cache_for_gc_by_disk(std::vector<GCContextPerDisk>&
contexts,
- FileCachePtr file_cache);
-
-private:
- std::shared_mutex _cache_map_lock;
- // cache_path -> FileCache
- std::map<std::string, FileCachePtr> _file_cache_map;
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.cpp
b/be/src/io/cache/sub_file_cache.cpp
deleted file mode 100644
index 1d64c64d4d..0000000000
--- a/be/src/io/cache/sub_file_cache.cpp
+++ /dev/null
@@ -1,342 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/cache/sub_file_cache.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-#include <time.h>
-
-#include <algorithm>
-#include <cstdlib>
-#include <filesystem>
-#include <future>
-#include <mutex>
-#include <ostream>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "io/fs/local_file_system.h"
-#include "io/io_common.h"
-#include "runtime/exec_env.h"
-#include "util/string_util.h"
-#include "util/threadpool.h"
-
-namespace doris {
-using namespace ErrorCode;
-namespace io {
-
-using std::vector;
-
-const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE";
-
-SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
- io::FileReaderSPtr remote_file_reader)
- : _cache_dir(cache_dir),
- _alive_time_sec(alive_time_sec),
- _remote_file_reader(remote_file_reader) {}
-
-SubFileCache::~SubFileCache() = default;
-
-Status SubFileCache::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* io_ctx) {
- RETURN_IF_ERROR(_init());
- if (io_ctx != nullptr && io_ctx->reader_type != ReaderType::READER_QUERY) {
- return _remote_file_reader->read_at(offset, result, bytes_read,
io_ctx);
- }
- std::vector<size_t> need_cache_offsets;
- RETURN_IF_ERROR(_get_need_cache_offsets(offset, result.size,
&need_cache_offsets));
- bool need_download = false;
- {
- std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
- for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
- iter != need_cache_offsets.cend(); ++iter) {
- if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
- _cache_file_readers[*iter] == nullptr) {
- need_download = true;
- break;
- }
- }
- }
- if (need_download) {
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- bool cache_dir_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
- fmt::format("Check local cache dir exist failed. {}",
_cache_dir.native()));
- if (!cache_dir_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->create_directory(_cache_dir),
- fmt::format("Create local cache dir failed. {}",
_cache_dir.native()));
- }
- for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
- iter != need_cache_offsets.cend(); ++iter) {
- if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
- _cache_file_readers[*iter] == nullptr) {
- size_t offset_begin = *iter;
- size_t req_size = config::max_sub_cache_file_size;
- if (offset_begin + req_size > _remote_file_reader->size()) {
- req_size = _remote_file_reader->size() - offset_begin;
- }
- RETURN_IF_ERROR(_generate_cache_reader(offset_begin,
req_size));
- }
- }
- }
- {
- std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
- *bytes_read = 0;
- for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
- iter != need_cache_offsets.cend(); ++iter) {
- size_t offset_begin = *iter;
- if (_cache_file_readers.find(*iter) == _cache_file_readers.end()) {
- return Status::Error<OS_ERROR>("Local cache file reader can't
be found: {}",
- offset_begin);
- }
- if (offset_begin < offset) {
- offset_begin = offset;
- }
- size_t req_size = *iter + config::max_sub_cache_file_size -
offset_begin;
- if (offset + result.size < *iter +
config::max_sub_cache_file_size) {
- req_size = offset + result.size - offset_begin;
- }
- Slice read_slice(result.mutable_data() + offset_begin - offset,
req_size);
- size_t sub_bytes_read = -1;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- _cache_file_readers[*iter]->read_at(offset_begin - *iter,
read_slice,
- &sub_bytes_read),
- fmt::format("Read local cache file failed: {}",
- _cache_file_readers[*iter]->path().native()));
- if (sub_bytes_read != read_slice.size) {
- return Status::Error<OS_ERROR>(
- "read local cache file failed: {} , bytes read: {} vs
req size: {}",
- _cache_file_readers[*iter]->path().native(),
sub_bytes_read, req_size);
- }
- *bytes_read += sub_bytes_read;
- _last_match_times[*iter] = time(nullptr);
- }
- }
- return Status::OK();
-}
-
-std::pair<Path, Path> SubFileCache::_cache_path(size_t offset) {
- return {_cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset),
- _cache_dir /
- fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset,
CACHE_DONE_FILE_SUFFIX)};
-}
-
-Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
- auto [cache_file, cache_done_file] = _cache_path(offset);
- bool done_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
- fmt::format("Check local cache done file exist failed. {}",
cache_done_file.native()));
-
- std::promise<Status> download_st;
- std::future<Status> future = download_st.get_future();
- if (!done_file_exist) {
- ThreadPoolToken* thread_token =
-
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
- if (thread_token != nullptr) {
- auto st = thread_token->submit_func([this, &download_st,
- cache_done_file =
cache_done_file,
- cache_file = cache_file,
offset, req_size] {
- auto func = [this, cache_done_file, cache_file, offset,
req_size] {
- bool done_file_exist = false;
- // Judge again whether cache_done_file exists, it is
possible that the cache
- // is downloaded while waiting in the thread pool
-
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
- cache_done_file,
&done_file_exist),
- "Check local cache done
file exist failed.");
- bool cache_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
- fmt::format("Check local cache file exist failed.
{}",
- cache_file.native()));
- if (done_file_exist && cache_file_exist) {
- return Status::OK();
- } else if (!done_file_exist && cache_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->delete_file(cache_file),
- fmt::format("Check local cache file exist
failed. {}",
- cache_file.native()));
- }
- RETURN_NOT_OK_STATUS_WITH_WARN(
- download_cache_to_local(cache_file,
cache_done_file,
- _remote_file_reader,
req_size, offset),
- "Download cache from remote to local failed.");
- return Status::OK();
- };
- download_st.set_value(func());
- });
- if (!st.ok()) {
- LOG(FATAL) << "Failed to submit download cache task to thread
pool! " << st;
- }
- } else {
- return Status::InternalError("Failed to get download cache thread
token");
- }
- auto st = future.get();
- if (!st.ok()) {
- return st;
- }
- }
- io::FileReaderSPtr cache_reader;
- RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file,
&cache_reader));
- _cache_file_readers.emplace(offset, cache_reader);
- _last_match_times.emplace(offset, time(nullptr));
- LOG(INFO) << "Create cache file from remote file successfully: "
- << _remote_file_reader->path().native() << "(" << offset << ", "
<< req_size
- << ") -> " << cache_file.native();
- return Status::OK();
-}
-
-Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size,
- std::vector<size_t>*
cache_offsets) {
- size_t first_offset_begin =
- offset / config::max_sub_cache_file_size *
config::max_sub_cache_file_size;
- for (size_t begin = first_offset_begin; begin < offset + req_size;
- begin += config::max_sub_cache_file_size) {
- cache_offsets->push_back(begin);
- }
- return Status::OK();
-}
-
-Status SubFileCache::clean_timeout_cache() {
- RETURN_IF_ERROR(_init());
- SubGcQueue gc_queue;
- _gc_lru_queue.swap(gc_queue);
- std::vector<size_t> timeout_keys;
- {
- std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
- for (std::map<size_t, int64_t>::const_iterator iter =
_last_match_times.cbegin();
- iter != _last_match_times.cend(); ++iter) {
- if (time(nullptr) - iter->second > _alive_time_sec) {
- timeout_keys.emplace_back(iter->first);
- } else {
- _gc_lru_queue.push({iter->first, iter->second});
- }
- }
- }
-
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- if (timeout_keys.size() > 0) {
- for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
- iter != timeout_keys.cend(); ++iter) {
- size_t cleaned_size = 0;
- RETURN_IF_ERROR(_clean_cache_internal(*iter, &cleaned_size));
- _cache_file_size -= cleaned_size;
- }
- }
- return _check_and_delete_empty_dir(_cache_dir);
-}
-
-Status SubFileCache::clean_all_cache() {
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- for (std::map<size_t, int64_t>::const_iterator iter =
_last_match_times.cbegin();
- iter != _last_match_times.cend(); ++iter) {
- RETURN_IF_ERROR(_clean_cache_internal(iter->first, nullptr));
- }
- _cache_file_size = 0;
- return _check_and_delete_empty_dir(_cache_dir);
-}
-
-Status SubFileCache::clean_one_cache(size_t* cleaned_size) {
- if (!_gc_lru_queue.empty()) {
- const auto& cache = _gc_lru_queue.top();
- {
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- if (auto it = _last_match_times.find(cache.offset);
- it != _last_match_times.end() && it->second ==
cache.last_match_time) {
- RETURN_IF_ERROR(_clean_cache_internal(cache.offset,
cleaned_size));
- _cache_file_size -= *cleaned_size;
- _gc_lru_queue.pop();
- }
- }
- decltype(_last_match_times.begin()) it;
- while (!_gc_lru_queue.empty() &&
- (it = _last_match_times.find(_gc_lru_queue.top().offset)) !=
- _last_match_times.end() &&
- it->second != _gc_lru_queue.top().last_match_time) {
- _gc_lru_queue.pop();
- }
- }
- if (_gc_lru_queue.empty()) {
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- RETURN_IF_ERROR(_check_and_delete_empty_dir(_cache_dir));
- }
- return Status::OK();
-}
-
-Status SubFileCache::_clean_cache_internal(size_t offset, size_t*
cleaned_size) {
- if (_cache_file_readers.find(offset) != _cache_file_readers.end()) {
- _cache_file_readers.erase(offset);
- }
- if (_last_match_times.find(offset) != _last_match_times.end()) {
- _last_match_times.erase(offset);
- }
- auto [cache_file, done_file] = _cache_path(offset);
- return _remove_cache_and_done(cache_file, done_file, cleaned_size);
-}
-
-Status SubFileCache::_init() {
- if (_is_inited) {
- return Status::OK();
- }
- std::vector<Path> cache_names;
-
- std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
- size_t cache_file_size = 0;
- RETURN_IF_ERROR(_get_dir_files_and_remove_unfinished(_cache_dir,
cache_names));
- std::map<int64_t, int64_t> expect_file_size_map;
- RETURN_IF_ERROR(_get_all_sub_file_size(&expect_file_size_map));
- for (const auto& file : cache_names) {
- auto str_vec = split(file.native(), "_");
- size_t offset = std::strtoul(str_vec[str_vec.size() - 1].c_str(),
nullptr, 10);
-
- int64_t file_size = -1;
- auto path = _cache_dir / file;
- RETURN_IF_ERROR(io::global_local_filesystem()->file_size(path,
&file_size));
- if (expect_file_size_map.find(offset) == expect_file_size_map.end() ||
- expect_file_size_map[offset] != file_size) {
- LOG(INFO) << "Delete invalid cache file: " << path.native() << ",
offset: " << offset
- << ", size: " << file_size;
- _clean_cache_internal(offset, nullptr);
- continue;
- }
- _last_match_times[offset] = time(nullptr);
- cache_file_size += file_size;
- }
- _cache_file_size = cache_file_size;
- _is_inited = true;
- return Status::OK();
-}
-
-Status SubFileCache::_get_all_sub_file_size(std::map<int64_t, int64_t>*
expect_file_size_map) {
- std::vector<size_t> cache_offsets;
- RETURN_IF_ERROR(_get_need_cache_offsets(0, _remote_file_reader->size(),
&cache_offsets));
- for (int i = 0; i < cache_offsets.size() - 1; ++i) {
- expect_file_size_map->emplace(cache_offsets[i],
config::max_sub_cache_file_size);
- }
- expect_file_size_map->emplace(cache_offsets[cache_offsets.size() - 1],
- _remote_file_reader->size() %
config::max_sub_cache_file_size);
- return Status::OK();
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
deleted file mode 100644
index 96dc873a88..0000000000
--- a/be/src/io/cache/sub_file_cache.h
+++ /dev/null
@@ -1,117 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <algorithm>
-#include <map>
-#include <memory>
-#include <queue>
-#include <shared_mutex>
-#include <utility>
-#include <vector>
-
-#include "common/status.h"
-#include "io/cache/file_cache.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
-
-namespace doris {
-namespace io {
-class IOContext;
-
-class SubFileCache final : public FileCache {
-public:
- SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
- io::FileReaderSPtr remote_file_reader);
- ~SubFileCache() override;
-
- Status close() override { return _remote_file_reader->close(); }
-
- const Path& path() const override { return _remote_file_reader->path(); }
-
- size_t size() const override { return _remote_file_reader->size(); }
-
- bool closed() const override { return _remote_file_reader->closed(); }
-
- const Path& cache_dir() const override { return _cache_dir; }
-
- io::FileReaderSPtr remote_file_reader() const override { return
_remote_file_reader; }
-
- Status clean_timeout_cache() override;
-
- Status clean_all_cache() override;
-
- Status clean_one_cache(size_t* cleaned_size) override;
-
- int64_t get_oldest_match_time() const override {
- return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
- }
-
- bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
-
- FileSystemSPtr fs() const override { return _remote_file_reader->fs(); }
-
-protected:
- Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
- const IOContext* io_ctx) override;
-
-private:
- Status _generate_cache_reader(size_t offset, size_t req_size);
-
- Status _clean_cache_internal(size_t offset, size_t* cleaned_size);
-
- Status _get_need_cache_offsets(size_t offset, size_t req_size,
- std::vector<size_t>* cache_offsets);
-
- std::pair<Path, Path> _cache_path(size_t offset);
-
- Status _init();
-
- Status _get_all_sub_file_size(std::map<int64_t, int64_t>*
expect_file_size_map);
-
-private:
- struct SubFileInfo {
- size_t offset;
- int64_t last_match_time;
- };
- using SubGcQueue = std::priority_queue<SubFileInfo,
std::vector<SubFileInfo>,
- SubFileLRUComparator<SubFileInfo>>;
- // used by gc thread, and currently has no lock protection
- SubGcQueue _gc_lru_queue;
-
- Path _cache_dir;
- int64_t _alive_time_sec;
- io::FileReaderSPtr _remote_file_reader;
-
- std::shared_mutex _cache_map_lock;
- // offset_begin -> last_match_time
- std::map<size_t, int64_t> _last_match_times;
- // offset_begin -> local file reader
- std::map<size_t, io::FileReaderSPtr> _cache_file_readers;
-
- bool _is_inited = false;
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.cpp
b/be/src/io/cache/whole_file_cache.cpp
deleted file mode 100644
index c24d64f2a9..0000000000
--- a/be/src/io/cache/whole_file_cache.cpp
+++ /dev/null
@@ -1,184 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/cache/whole_file_cache.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-
-#include <filesystem>
-#include <future>
-#include <mutex>
-#include <ostream>
-#include <string>
-
-#include "io/fs/local_file_system.h"
-#include "io/io_common.h"
-#include "runtime/exec_env.h"
-#include "util/threadpool.h"
-
-namespace doris {
-using namespace ErrorCode;
-namespace io {
-
-const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE";
-
-WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
- io::FileReaderSPtr remote_file_reader)
- : _cache_dir(cache_dir),
- _alive_time_sec(alive_time_sec),
- _remote_file_reader(remote_file_reader),
- _cache_file_reader(nullptr) {}
-
-WholeFileCache::~WholeFileCache() = default;
-
-Status WholeFileCache::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* io_ctx) {
- if (io_ctx != nullptr && io_ctx->reader_type != ReaderType::READER_QUERY) {
- return _remote_file_reader->read_at(offset, result, bytes_read,
io_ctx);
- }
- if (_cache_file_reader == nullptr) {
- RETURN_IF_ERROR(_generate_cache_reader(offset, result.size));
- }
- std::shared_lock<std::shared_mutex> rlock(_cache_lock);
- RETURN_NOT_OK_STATUS_WITH_WARN(
- _cache_file_reader->read_at(offset, result, bytes_read, io_ctx),
- fmt::format("Read local cache file failed: {}",
_cache_file_reader->path().native()));
- if (*bytes_read != result.size) {
- return Status::Error<OS_ERROR>(
- "read cache file failed: {}, bytes read: {} vs required size:
{}",
- _cache_file_reader->path().native(), *bytes_read, result.size);
- }
- update_last_match_time();
- return Status::OK();
-}
-
-Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
- std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
- Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
- Path cache_done_file =
- _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME,
CACHE_DONE_FILE_SUFFIX);
- bool done_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
- "Check local cache done file exist failed.");
-
- std::promise<Status> download_st;
- std::future<Status> future = download_st.get_future();
- if (!done_file_exist) {
- ThreadPoolToken* thread_token =
-
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
- if (thread_token != nullptr) {
- auto st = thread_token->submit_func([this, &download_st,
cache_done_file, cache_file] {
- auto func = [this, cache_done_file, cache_file] {
- bool done_file_exist = false;
- bool cache_dir_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
- fmt::format("Check local cache dir exist failed.
{}",
- _cache_dir.native()));
- if (!cache_dir_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->create_directory(_cache_dir),
- fmt::format("Create local cache dir failed.
{}",
- _cache_dir.native()));
- } else {
- // Judge again whether cache_done_file exists, it is
possible that the cache
- // is downloaded while waiting in the thread pool
-
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
-
cache_done_file, &done_file_exist),
- "Check local cache done
file exist failed.");
- }
- bool cache_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
- "Check local cache file exist failed.");
- if (done_file_exist && cache_file_exist) {
- return Status::OK();
- } else if (!done_file_exist && cache_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->delete_file(cache_file),
- fmt::format("Check local cache file exist
failed. {}",
- cache_file.native()));
- }
- size_t req_size = _remote_file_reader->size();
- RETURN_NOT_OK_STATUS_WITH_WARN(
- download_cache_to_local(cache_file,
cache_done_file,
- _remote_file_reader,
req_size),
- "Download cache from remote to local failed.");
- return Status::OK();
- };
- download_st.set_value(func());
- });
- if (!st.ok()) {
- LOG(FATAL) << "Failed to submit download cache task to thread
pool! " << st;
- return st;
- }
- } else {
- return Status::InternalError("Failed to get download cache thread
token");
- }
- auto st = future.get();
- if (!st.ok()) {
- return st;
- }
- }
- RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file,
&_cache_file_reader));
- _cache_file_size = _cache_file_reader->size();
- LOG(INFO) << "Create cache file from remote file successfully: "
- << _remote_file_reader->path().native() << " -> " <<
cache_file.native();
- return Status::OK();
-}
-
-Status WholeFileCache::clean_timeout_cache() {
- std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
- _gc_match_time = _last_match_time;
- if (time(nullptr) - _last_match_time > _alive_time_sec) {
- _clean_cache_internal(nullptr);
- }
- return Status::OK();
-}
-
-Status WholeFileCache::clean_all_cache() {
- std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
- return _clean_cache_internal(nullptr);
-}
-
-Status WholeFileCache::clean_one_cache(size_t* cleaned_size) {
- std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
- if (_gc_match_time == _last_match_time) {
- return _clean_cache_internal(cleaned_size);
- }
- return Status::OK();
-}
-
-Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) {
- _cache_file_reader.reset();
- _cache_file_size = 0;
- Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
- Path done_file =
- _cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME,
CACHE_DONE_FILE_SUFFIX);
- RETURN_IF_ERROR(_remove_cache_and_done(cache_file, done_file,
cleaned_size));
- return _check_and_delete_empty_dir(_cache_dir);
-}
-
-bool WholeFileCache::is_gc_finish() const {
- std::shared_lock<std::shared_mutex> rlock(_cache_lock);
- return _cache_file_reader == nullptr;
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.h
b/be/src/io/cache/whole_file_cache.h
deleted file mode 100644
index 866e6e142e..0000000000
--- a/be/src/io/cache/whole_file_cache.h
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-#include <time.h>
-
-#include <memory>
-#include <shared_mutex>
-
-#include "common/status.h"
-#include "io/cache/file_cache.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/file_system.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
-
-namespace doris {
-namespace io {
-class IOContext;
-
-class WholeFileCache final : public FileCache {
-public:
- WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
- io::FileReaderSPtr remote_file_reader);
- ~WholeFileCache() override;
-
- Status close() override { return _remote_file_reader->close(); }
-
- const Path& path() const override { return _remote_file_reader->path(); }
-
- size_t size() const override { return _remote_file_reader->size(); }
-
- bool closed() const override { return _remote_file_reader->closed(); }
-
- const Path& cache_dir() const override { return _cache_dir; }
-
- io::FileReaderSPtr remote_file_reader() const override { return
_remote_file_reader; }
-
- Status clean_timeout_cache() override;
-
- Status clean_all_cache() override;
-
- Status clean_one_cache(size_t* cleaned_size) override;
-
- int64_t get_oldest_match_time() const override { return _gc_match_time; }
-
- bool is_gc_finish() const override;
-
- FileSystemSPtr fs() const override { return _remote_file_reader->fs(); }
-
-protected:
- Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
- const IOContext* io_ctx) override;
-
-private:
- Status _generate_cache_reader(size_t offset, size_t req_size);
-
- Status _clean_cache_internal(size_t* cleaned_size);
-
- void update_last_match_time() { _last_match_time = time(nullptr); }
-
-private:
- Path _cache_dir;
- int64_t _alive_time_sec;
- io::FileReaderSPtr _remote_file_reader;
-
- int64_t _gc_match_time {0};
- int64_t _last_match_time {0};
-
- mutable std::shared_mutex _cache_lock;
- io::FileReaderSPtr _cache_file_reader;
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 9c49f0dd8b..02650c9419 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -27,7 +27,6 @@
#include "common/config.h"
#include "common/status.h"
#include "io/fs/broker_file_system.h"
-#include "io/fs/file_reader_options.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/multi_table_pipe.h"
@@ -47,27 +46,22 @@ namespace io {
class FileWriter;
} // namespace io
-static io::FileBlockCachePathPolicy BLOCK_CACHE_POLICY;
-static std::string RANDOM_CACHE_BASE_PATH = "random";
+constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";
io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state) {
- io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE;
+ io::FileReaderOptions opts;
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
state->query_options().enable_file_cache) {
- cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+ opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
}
- io::FileReaderOptions reader_options(cache_policy, BLOCK_CACHE_POLICY);
if (state != nullptr &&
state->query_options().__isset.file_cache_base_path &&
state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH)
{
-
reader_options.specify_cache_path(state->query_options().file_cache_base_path);
+ opts.cache_base_path = state->query_options().file_cache_base_path;
}
- return reader_options;
+ return opts;
}
-io::FileReaderOptions FileFactory::NO_CACHE_READER_OPTIONS =
- FileFactory::get_reader_options(nullptr);
-
Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>&
broker_addresses,
const std::map<std::string,
std::string>& properties,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index a311b8d58b..ef3d8b9fc3 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -29,8 +29,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
-#include "io/fs/file_reader_options.h"
-#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_reader.h"
#include "io/fs/fs_utils.h"
namespace doris {
@@ -47,7 +46,6 @@ class FileFactory {
public:
static io::FileReaderOptions get_reader_options(RuntimeState* state);
- static io::FileReaderOptions NO_CACHE_READER_OPTIONS;
/// Create FileWriter
static Status create_file_writer(TFileType::type type, ExecEnv* env,
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index d4f7c5d530..7b7d09a5f3 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -33,6 +33,28 @@ namespace io {
class FileSystem;
class IOContext;
+enum class FileCachePolicy : uint8_t {
+ NO_CACHE,
+ FILE_BLOCK_CACHE,
+};
+
+inline FileCachePolicy cache_type_from_string(std::string_view type) {
+ if (type == "file_block_cache") {
+ return FileCachePolicy::FILE_BLOCK_CACHE;
+ } else {
+ return FileCachePolicy::NO_CACHE;
+ }
+}
+
+// Only affects remote file readers
+struct FileReaderOptions {
+ FileCachePolicy cache_type {FileCachePolicy::NO_CACHE};
+ bool is_doris_table = false;
+ std::string cache_base_path;
+ // Use modification time to determine whether the file is changed
+ int64_t modification_time = 0;
+};
+
class FileReader {
public:
FileReader() = default;
@@ -60,5 +82,7 @@ protected:
const IOContext* io_ctx) = 0;
};
+using FileReaderSPtr = std::shared_ptr<FileReader>;
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/file_reader_options.cpp
b/be/src/io/fs/file_reader_options.cpp
deleted file mode 100644
index f388f51322..0000000000
--- a/be/src/io/fs/file_reader_options.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/fs/file_reader_options.h"
-
-namespace doris {
-namespace io {
-
-FileReaderOptions FileReaderOptions::DEFAULT =
- FileReaderOptions(FileCachePolicy::NO_CACHE, NoCachePathPolicy());
-
-FileCachePolicy cache_type_from_string(const std::string& type) {
- if (type == "sub_file_cache") {
- return FileCachePolicy::SUB_FILE_CACHE;
- } else if (type == "whole_file_cache") {
- return FileCachePolicy::WHOLE_FILE_CACHE;
- } else if (type == "file_block_cache") {
- return FileCachePolicy::FILE_BLOCK_CACHE;
- } else {
- return FileCachePolicy::NO_CACHE;
- }
-}
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/file_reader_options.h
b/be/src/io/fs/file_reader_options.h
deleted file mode 100644
index 7477816f8f..0000000000
--- a/be/src/io/fs/file_reader_options.h
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <string>
-
-namespace doris {
-namespace io {
-
-enum class FileCachePolicy : uint8_t {
- NO_CACHE,
- SUB_FILE_CACHE,
- WHOLE_FILE_CACHE,
- FILE_BLOCK_CACHE,
-};
-
-FileCachePolicy cache_type_from_string(const std::string& type);
-
-// CachePathPolicy it to define which cache path should be used
-// for the local cache of the given file(path).
-// The dervied class should implement get_cache_path() method
-class CachePathPolicy {
-public:
- virtual ~CachePathPolicy() = default;
- // path: the path of file which will be cached
- // return value: the cache path of the given file.
- virtual std::string get_cache_path(const std::string& path) const = 0;
-};
-
-class NoCachePathPolicy : public CachePathPolicy {
-public:
- std::string get_cache_path(const std::string& path) const override {
return ""; }
-};
-
-class SegmentCachePathPolicy : public CachePathPolicy {
-public:
- void set_cache_path(const std::string& cache_path) { _cache_path =
cache_path; }
-
- std::string get_cache_path(const std::string& path) const override {
return _cache_path; }
-
-private:
- std::string _cache_path;
-};
-
-class FileBlockCachePathPolicy : public CachePathPolicy {
-public:
- std::string get_cache_path(const std::string& path) const override {
return path; }
-};
-
-class FileReaderOptions {
-public:
- FileReaderOptions(FileCachePolicy cache_type_, const CachePathPolicy&
path_policy_)
- : cache_type(cache_type_), path_policy(path_policy_) {}
-
- FileCachePolicy cache_type;
- const CachePathPolicy& path_policy;
- bool has_cache_base_path = false;
- std::string cache_base_path;
- // Use modification time to determine whether the file is changed
- int64_t modification_time = 0;
-
- void specify_cache_path(const std::string& base_path) {
- has_cache_base_path = true;
- cache_base_path = base_path;
- }
-
- static FileReaderOptions DEFAULT;
-};
-
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/file_reader_writer_fwd.h
b/be/src/io/fs/file_reader_writer_fwd.h
index e63c5395de..e4c90460e8 100644
--- a/be/src/io/fs/file_reader_writer_fwd.h
+++ b/be/src/io/fs/file_reader_writer_fwd.h
@@ -31,5 +31,7 @@ class FileWriter;
using FileReaderSPtr = std::shared_ptr<FileReader>;
using FileWriterPtr = std::unique_ptr<FileWriter>;
+struct FileReaderOptions;
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp
index 989a68884a..fb051fb005 100644
--- a/be/src/io/fs/file_system.cpp
+++ b/be/src/io/fs/file_system.cpp
@@ -17,6 +17,7 @@
#include "io/fs/file_system.h"
+#include "io/fs/file_reader.h"
#include "util/async_io.h" // IWYU pragma: keep
namespace doris {
@@ -27,6 +28,16 @@ Status FileSystem::create_file(const Path& file,
FileWriterPtr* writer) {
FILESYSTEM_M(create_file_impl(path, writer));
}
+Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader) {
+ FileDescription fd;
+ fd.path = file.native();
+ return open_file(fd, FileReaderOptions {}, reader);
+}
+
+Status FileSystem::open_file(const FileDescription& fd, FileReaderSPtr*
reader) {
+ return open_file(fd, FileReaderOptions {}, reader);
+}
+
Status FileSystem::open_file(const FileDescription& fd, const
FileReaderOptions& reader_options,
FileReaderSPtr* reader) {
auto path = absolute_path(fd.path);
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index 04e2615fb0..748de46eb2 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -30,14 +30,12 @@
#include <vector>
#include "common/status.h"
-#include "io/fs/file_reader_options.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/fs_utils.h"
#include "io/fs/path.h"
namespace doris {
namespace io {
-class FileSystem;
#ifndef FILESYSTEM_M
#define FILESYSTEM_M(stmt) \
@@ -75,14 +73,9 @@ public:
// The following are public interface.
// And derived classes should implement all xxx_impl methods.
Status create_file(const Path& file, FileWriterPtr* writer);
- Status open_file(const Path& file, FileReaderSPtr* reader) {
- FileDescription fd;
- fd.path = file.native();
- return open_file(fd, FileReaderOptions::DEFAULT, reader);
- }
- Status open_file(const FileDescription& fd, FileReaderSPtr* reader) {
- return open_file(fd, FileReaderOptions::DEFAULT, reader);
- }
+ // FIXME(plat1ko): Use `Status open_file(const Path&, FileReaderSPtr*,
const FileReaderOptions*)`
+ Status open_file(const Path& file, FileReaderSPtr* reader);
+ Status open_file(const FileDescription& fd, FileReaderSPtr* reader);
Status open_file(const FileDescription& fd, const FileReaderOptions&
reader_options,
FileReaderSPtr* reader);
Status create_directory(const Path& dir, bool failed_if_exists = false);
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 012f0a7430..49a28a1a9b 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -21,17 +21,16 @@
#include "common/status.h"
#include "gutil/macros.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "util/slice.h"
namespace doris {
namespace io {
+class FileSystem;
class FileWriter {
public:
- FileWriter(Path&& path, FileSystemSPtr fs) : _path(std::move(path)),
_fs(fs) {}
+ FileWriter(Path&& path, std::shared_ptr<FileSystem> fs) :
_path(std::move(path)), _fs(fs) {}
FileWriter() = default;
virtual ~FileWriter() = default;
@@ -57,17 +56,19 @@ public:
size_t bytes_appended() const { return _bytes_appended; }
- FileSystemSPtr fs() const { return _fs; }
+ std::shared_ptr<FileSystem> fs() const { return _fs; }
bool is_closed() { return _closed; }
protected:
Path _path;
size_t _bytes_appended = 0;
- FileSystemSPtr _fs;
+ std::shared_ptr<FileSystem> _fs;
bool _closed = false;
bool _opened = false;
};
+using FileWriterPtr = std::unique_ptr<FileWriter>;
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 48c1981202..2a3dff4071 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -44,7 +44,6 @@
namespace doris {
namespace io {
-class FileReaderOptions;
std::shared_ptr<LocalFileSystem> LocalFileSystem::create(Path path,
std::string id) {
return std::shared_ptr<LocalFileSystem>(new
LocalFileSystem(std::move(path), std::move(id)));
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 1f8d35c096..eeee0e2488 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -32,7 +32,6 @@
namespace doris {
namespace io {
-class FileReaderOptions;
class LocalFileSystem final : public FileSystem {
public:
diff --git a/be/src/io/fs/remote_file_system.cpp
b/be/src/io/fs/remote_file_system.cpp
index c169777653..660a532440 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -22,12 +22,8 @@
#include <algorithm>
#include "common/config.h"
-#include "gutil/strings/stringpiece.h"
#include "io/cache/block/cached_remote_file_reader.h"
-#include "io/cache/file_cache.h"
-#include "io/cache/file_cache_manager.h"
#include "io/fs/file_reader.h"
-#include "io/fs/file_reader_options.h"
#include "util/async_io.h" // IWYU pragma: keep
namespace doris {
@@ -82,28 +78,8 @@ Status RemoteFileSystem::open_file_impl(const
FileDescription& fd, const Path& a
*reader = raw_reader;
break;
}
- case io::FileCachePolicy::SUB_FILE_CACHE:
- case io::FileCachePolicy::WHOLE_FILE_CACHE: {
- std::string cache_path =
reader_options.path_policy.get_cache_path(abs_path.native());
- io::FileCachePtr cache_reader =
FileCacheManager::instance()->new_file_cache(
- cache_path, config::file_cache_alive_time_sec, raw_reader,
- reader_options.cache_type);
- FileCacheManager::instance()->add_file_cache(cache_path, cache_reader);
- *reader = cache_reader;
- break;
- }
case io::FileCachePolicy::FILE_BLOCK_CACHE: {
- StringPiece str(raw_reader->path().native());
- std::string cache_path =
reader_options.path_policy.get_cache_path(abs_path.native());
- if (reader_options.has_cache_base_path) {
- // from query session variable: file_cache_base_path
- *reader = std::make_shared<CachedRemoteFileReader>(
- std::move(raw_reader), reader_options.cache_base_path,
cache_path,
- reader_options.modification_time);
- } else {
- *reader =
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path,
- fd.mtime);
- }
+ *reader =
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader),
&reader_options);
break;
}
default: {
diff --git a/be/src/io/fs/remote_file_system.h
b/be/src/io/fs/remote_file_system.h
index 559890d5ee..b27a4bd523 100644
--- a/be/src/io/fs/remote_file_system.h
+++ b/be/src/io/fs/remote_file_system.h
@@ -31,7 +31,6 @@
namespace doris {
namespace io {
-class FileReaderOptions;
class RemoteFileSystem : public FileSystem {
public:
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index b0940c45f1..08feb6b048 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -435,7 +435,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// create index_writer to compaction indexes
auto& fs = _output_rowset->rowset_meta()->fs();
- auto tablet_path = _output_rowset->tablet_path();
+ auto& tablet_path = _tablet->tablet_path();
DCHECK(dest_index_files.size() > 0);
// we choose the first destination segment name as the temporary index
writer path
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 6ac80b18da..e9219d84ac 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -46,7 +46,6 @@
#include "gen_cpp/Types_constants.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/ref_counted.h"
-#include "io/cache/file_cache_manager.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "io/fs/path.h"
#include "olap/cold_data_compaction.h"
@@ -84,7 +83,6 @@ using std::string;
namespace doris {
-using io::FileCacheManager;
using io::Path;
// number of running SCHEMA-CHANGE threads
@@ -227,12 +225,6 @@ Status StorageEngine::start_bg_threads() {
&_cold_data_compaction_producer_thread));
LOG(INFO) << "cold data compaction producer thread started";
- RETURN_IF_ERROR(Thread::create(
- "StorageEngine", "cache_file_cleaner_tasks_producer_thread",
- [this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
- &_cache_file_cleaner_tasks_producer_thread));
- LOG(INFO) << "cache file cleaner tasks producer thread started";
-
// add tablet publish version thread pool
ThreadPoolBuilder("TabletPublishTxnThreadPool")
.set_min_threads(config::tablet_publish_txn_max_thread)
@@ -1206,24 +1198,6 @@ void
StorageEngine::_cold_data_compaction_producer_callback() {
}
}
-void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
- while (true) {
- int64_t interval = config::generate_cache_cleaner_task_interval_sec;
- if (interval <= 0) {
- interval = 10;
- }
- bool stop =
_stop_background_threads_latch.wait_for(std::chrono::seconds(interval));
- if (stop) {
- break;
- }
- if (config::generate_cache_cleaner_task_interval_sec <= 0) {
- continue;
- }
- LOG(INFO) << "Begin to Clean cache files";
- FileCacheManager::instance()->gc_file_caches();
- }
-}
-
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t
tablet_id,
int64_t publish_version, int64_t
transaction_id,
bool is_recovery) {
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index e9a2e65bdc..0b9e46bc05 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -29,8 +29,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
-#include "io/cache/file_cache_manager.h"
-#include "io/fs/file_reader_options.h"
+#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
@@ -47,34 +46,13 @@
namespace doris {
using namespace ErrorCode;
-using io::FileCacheManager;
-
std::string BetaRowset::segment_file_path(int segment_id) {
-#ifdef BE_TEST
- if (!config::file_cache_type.empty()) {
- return segment_file_path(_tablet_path, rowset_id(), segment_id);
- }
-#endif
return segment_file_path(_rowset_dir, rowset_id(), segment_id);
}
-std::string BetaRowset::segment_cache_path(int segment_id) {
- //
{root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
- return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(),
segment_id);
-}
-
-// just check that the format is xxx_segmentid and segmentid is numeric
-bool BetaRowset::is_segment_cache_dir(const std::string& cache_dir) {
- auto segment_id_pos = cache_dir.find_last_of('_') + 1;
- if (segment_id_pos >= cache_dir.size() || segment_id_pos == 0) {
- return false;
- }
- return std::all_of(cache_dir.cbegin() + segment_id_pos, cache_dir.cend(),
::isdigit);
-}
-
std::string BetaRowset::segment_file_path(const std::string& rowset_dir, const
RowsetId& rowset_id,
int segment_id) {
- // {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat
+ // {rowset_dir}/{rowset_id}_{seg_num}.dat
return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(),
segment_id);
}
@@ -99,13 +77,7 @@ std::string
BetaRowset::local_segment_path_segcompacted(const std::string& table
BetaRowset::BetaRowset(const TabletSchemaSPtr& schema, const std::string&
tablet_path,
const RowsetMetaSharedPtr& rowset_meta)
- : Rowset(schema, tablet_path, rowset_meta) {
- if (_rowset_meta->is_local()) {
- _rowset_dir = tablet_path;
- } else {
- _rowset_dir = remote_tablet_path(_rowset_meta->tablet_id());
- }
-}
+ : Rowset(schema, rowset_meta), _rowset_dir(tablet_path) {}
BetaRowset::~BetaRowset() = default;
@@ -155,14 +127,14 @@ Status BetaRowset::load_segment(int64_t seg_id,
segment_v2::SegmentSharedPtr* se
}
DCHECK(seg_id >= 0);
auto seg_path = segment_file_path(seg_id);
- io::SegmentCachePathPolicy cache_policy;
- cache_policy.set_cache_path(segment_cache_path(seg_id));
- auto type = config::enable_file_cache ? config::file_cache_type : "";
- io::FileReaderOptions reader_options(io::cache_type_from_string(type),
cache_policy);
+ io::FileReaderOptions reader_options {
+ .cache_type = config::enable_file_cache ?
io::FileCachePolicy::FILE_BLOCK_CACHE
+ :
io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = true};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema, reader_options,
segment);
if (!s.ok()) {
- LOG(WARNING) << "failed to open segment. " << seg_path << " under
rowset " << unique_id()
+ LOG(WARNING) << "failed to open segment. " << seg_path << " under
rowset " << rowset_id()
<< " : " << s.to_string();
return s;
}
@@ -177,7 +149,7 @@ Status BetaRowset::create_reader(RowsetReaderSharedPtr*
result) {
Status BetaRowset::remove() {
// TODO should we close and remove all segment reader first?
- VLOG_NOTICE << "begin to remove files in rowset " << unique_id()
+ VLOG_NOTICE << "begin to remove files in rowset " << rowset_id()
<< ", version:" << start_version() << "-" << end_version()
<< ", tabletid:" << _rowset_meta->tablet_id();
// If the rowset was removed, it need to remove the fds in segment cache
directly
@@ -211,14 +183,10 @@ Status BetaRowset::remove() {
}
}
}
- if (fs->type() != io::FileSystemType::LOCAL) {
- auto cache_path = segment_cache_path(i);
- FileCacheManager::instance()->remove_file_cache(cache_path);
- }
}
if (!success) {
return Status::Error<ROWSET_DELETE_FILE_FAILED>("failed to remove
files in rowset {}",
- unique_id());
+
rowset_id().to_string());
}
return Status::OK();
}
@@ -397,10 +365,10 @@ bool BetaRowset::check_current_rowset_segment() {
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
- io::SegmentCachePathPolicy cache_policy;
- cache_policy.set_cache_path(segment_cache_path(seg_id));
- auto type = config::enable_file_cache ? config::file_cache_type : "";
- io::FileReaderOptions reader_options(io::cache_type_from_string(type),
cache_policy);
+ io::FileReaderOptions reader_options {
+ .cache_type = config::enable_file_cache ?
io::FileCachePolicy::FILE_BLOCK_CACHE
+ :
io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = true};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
_schema,
reader_options, &segment);
if (!s.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index fb7369b57d..372431cb4b 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -51,20 +51,19 @@ public:
std::string segment_file_path(int segment_id);
- std::string segment_cache_path(int segment_id);
-
- static bool is_segment_cache_dir(const std::string& cache_dir);
-
static std::string segment_file_path(const std::string& rowset_dir, const
RowsetId& rowset_id,
int segment_id);
+ // Return the absolute path of local segcompacted segment file
static std::string local_segment_path_segcompacted(const std::string&
tablet_path,
const RowsetId&
rowset_id, int64_t begin,
int64_t end);
+ // Return the relative path of remote segment file
static std::string remote_segment_path(int64_t tablet_id, const RowsetId&
rowset_id,
int segment_id);
+ // Return the relative path of remote segment file
static std::string remote_segment_path(int64_t tablet_id, const
std::string& rowset_id,
int segment_id);
@@ -114,6 +113,10 @@ protected:
private:
friend class RowsetFactory;
friend class BetaRowsetReader;
+
+ // Remote format: {remote_fs_root}/data/{tablet_id}
+ // Local format:
{local_storage_root}/data/{shard_id}/{tablet_id}/{schema_hash}
+ std::string _rowset_dir;
};
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index b404138c05..ae525eed35 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -33,7 +33,6 @@
#include "common/logging.h"
#include "gutil/strings/substitute.h"
#include "io/fs/file_reader.h"
-#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "olap/olap_define.h"
@@ -84,7 +83,7 @@ BetaRowsetWriter::~BetaRowsetWriter() {
if (!_already_built) { // abnormal exit, remove all files generated
_segment_creator.close(); // ensure all files are closed
auto fs = _rowset_meta->fs();
- if (!fs) {
+ if (fs->type() != io::FileSystemType::LOCAL) { // Remote fs will
delete them asynchronously
return;
}
for (int i = _segment_start_id; i <
_segment_creator.next_segment_id(); ++i) {
@@ -164,15 +163,17 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t
segment_id) {
Status
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr&
segment,
int32_t segment_id) {
+ DCHECK(_rowset_meta->is_local());
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>(
"BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs
get failed");
}
auto path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, segment_id);
- auto type = config::enable_file_cache ? config::file_cache_type : "";
- io::FileReaderOptions reader_options(io::cache_type_from_string(type),
- io::SegmentCachePathPolicy());
+ io::FileReaderOptions reader_options {
+ .cache_type = config::enable_file_cache ?
io::FileCachePolicy::FILE_BLOCK_CACHE
+ :
io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = true};
auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(),
_context.tablet_schema,
reader_options, &segment);
if (!s.ok()) {
@@ -282,7 +283,6 @@ Status
BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
return Status::OK();
}
- int ret;
auto src_seg_path =
BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, seg_id);
auto dst_seg_path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id,
@@ -298,7 +298,7 @@ Status
BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) {
_segid_statistics_map.emplace(_num_segcompacted, org);
_clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
}
- ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
+ int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
if (ret) {
return Status::Error<ROWSET_RENAME_FILE_FAILED>(
"failed to rename {} to {}. ret:{}, errno:{}", src_seg_path,
dst_seg_path, ret,
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index a4bc32e32b..3b601eb09b 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -34,7 +34,6 @@
#include "common/logging.h"
#include "gutil/integral_types.h"
#include "gutil/strings/substitute.h"
-#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/stream_sink_file_writer.h"
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index 3cf6d92a8f..b3e6b1ca9e 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -25,9 +25,8 @@
namespace doris {
-Rowset::Rowset(const TabletSchemaSPtr& schema, const std::string& tablet_path,
- const RowsetMetaSharedPtr& rowset_meta)
- : _tablet_path(tablet_path), _rowset_meta(rowset_meta),
_refs_by_reader(0) {
+Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr&
rowset_meta)
+ : _rowset_meta(rowset_meta), _refs_by_reader(0) {
_is_pending = !_rowset_meta->has_version();
if (_is_pending) {
_is_cumulative = false;
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index cac574f2da..541bbe3cb0 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -222,11 +222,6 @@ public:
virtual bool check_file_exist() = 0;
- // return an unique identifier string for this rowset
- std::string unique_id() const {
- return fmt::format("{}/{}", _tablet_path, rowset_id().to_string());
- }
-
bool need_delete_file() const { return _need_delete_file; }
void set_need_delete_file() { _need_delete_file = true; }
@@ -235,10 +230,6 @@ public:
return rowset_meta()->version().contains(version);
}
- const std::string& tablet_path() const { return _tablet_path; }
-
- virtual std::string rowset_dir() { return _rowset_dir; }
-
static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr&
right) {
return left->end_version() < right->end_version();
}
@@ -315,8 +306,7 @@ protected:
DISALLOW_COPY_AND_ASSIGN(Rowset);
// this is non-public because all clients should use RowsetFactory to
obtain pointer to initialized Rowset
- Rowset(const TabletSchemaSPtr& schema, const std::string& tablet_path,
- const RowsetMetaSharedPtr& rowset_meta);
+ Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr&
rowset_meta);
// this is non-public because all clients should use RowsetFactory to
obtain pointer to initialized Rowset
virtual Status init() = 0;
@@ -331,8 +321,6 @@ protected:
TabletSchemaSPtr _schema;
- std::string _tablet_path;
- std::string _rowset_dir;
RowsetMetaSharedPtr _rowset_meta;
// init in constructor
bool _is_pending; // rowset is pending iff it's not in visible state
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index a04c74ce1a..21ceea7a93 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -63,16 +63,10 @@
#include "vec/olap/vgeneric_iterators.h"
namespace doris {
-namespace io {
-class FileCacheManager;
-class FileReaderOptions;
-} // namespace io
namespace segment_v2 {
class InvertedIndexIterator;
-using io::FileCacheManager;
-
Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t
segment_id,
RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
@@ -80,17 +74,7 @@ Status Segment::open(io::FileSystemSPtr fs, const
std::string& path, uint32_t se
io::FileReaderSPtr file_reader;
io::FileDescription fd;
fd.path = path;
-#ifndef BE_TEST
RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader));
-#else
- // be ut use local file reader instead of remote file reader while use
remote cache
- if (!config::file_cache_type.empty()) {
- RETURN_IF_ERROR(io::global_local_filesystem()->open_file(fd,
reader_options, &file_reader));
- } else {
- RETURN_IF_ERROR(fs->open_file(fd, reader_options, &file_reader));
- }
-#endif
-
std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id,
tablet_schema));
segment->_file_reader = std::move(file_reader);
RETURN_IF_ERROR(segment->_open());
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 8507d105e6..65f3245e6b 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -48,10 +48,6 @@ class StorageReadOptions;
class MemTracker;
class PrimaryKeyIndexReader;
class RowwiseIterator;
-
-namespace io {
-class FileReaderOptions;
-} // namespace io
struct RowLocation;
namespace segment_v2 {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index b4e093fa2d..07d980fd02 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1013,8 +1013,7 @@ void StorageEngine::add_unused_rowset(RowsetSharedPtr
rowset) {
}
VLOG_NOTICE << "add unused rowset, rowset id:" << rowset->rowset_id()
- << ", version:" << rowset->version().first << "-" <<
rowset->version().second
- << ", unique id:" << rowset->unique_id();
+ << ", version:" << rowset->version();
auto rowset_id = rowset->rowset_id().to_string();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 116ed61bf5..f738da02f4 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -325,8 +325,6 @@ private:
void _remove_unused_remote_files_callback();
void _cold_data_compaction_producer_callback();
- void _cache_file_cleaner_tasks_producer_callback();
-
Status _handle_seg_compaction(SegcompactionWorker* worker,
SegCompactionCandidatesSharedPtr segments);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 68423945e1..3daf025bd6 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2106,7 +2106,8 @@ Status Tablet::_cooldown_data() {
new_rowset_meta->set_creation_time(time(nullptr));
UniqueId cooldown_meta_id = UniqueId::gen_uid();
RowsetSharedPtr new_rowset;
- RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
+ RowsetFactory::create_rowset(_schema, remote_tablet_path(tablet_id()),
new_rowset_meta,
+ &new_rowset);
{
std::unique_lock meta_wlock(_meta_lock);
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index f2b1279265..ae8e3fa97e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -596,9 +596,8 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
estimate_timeout = config::download_low_speed_time;
}
- std::string local_segment_path =
- fmt::format("{}/{}_{}.dat", local_tablet->tablet_path(),
- rowset_meta->rowset_id().to_string(),
segment_index);
+ auto local_segment_path = BetaRowset::segment_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index);
LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
local_segment_path);
auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index c7d1b52362..df762226a2 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -294,7 +294,7 @@ Status PointQueryExecutor::_lookup_row_key() {
_row_read_ctxs[i]._row_location = location;
// acquire and wrap this rowset
(*rowset_ptr)->acquire();
- VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id();
+ VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->rowset_id();
_row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr,
decltype(&release_rowset)>(
rowset_ptr.release(), &release_rowset);
}
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index a49bfb442f..180af54fdd 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -282,7 +282,7 @@ private:
static void release_rowset(RowsetSharedPtr* r) {
if (r && *r) {
- VLOG_DEBUG << "release rowset " << (*r)->unique_id();
+ VLOG_DEBUG << "release rowset " << (*r)->rowset_id();
(*r)->release();
}
delete r;
diff --git a/be/src/vec/core/block_spill_reader.cpp
b/be/src/vec/core/block_spill_reader.cpp
index d0cebd3043..eaa6fe4f73 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -52,9 +52,9 @@ Status BlockSpillReader::open() {
io::FileDescription file_description;
file_description.path = file_path_;
- io::FileReaderOptions reader_options = io::FileReaderOptions::DEFAULT;
RETURN_IF_ERROR(FileFactory::create_file_reader(system_properties,
file_description,
- reader_options,
&file_system, &file_reader_));
+ io::FileReaderOptions {},
&file_system,
+ &file_reader_));
size_t file_size = file_reader_->size();
diff --git a/be/test/io/cache/remote_file_cache_test.cpp
b/be/test/io/cache/remote_file_cache_test.cpp
deleted file mode 100644
index 18fcb6de14..0000000000
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <fmt/format.h>
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <functional>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "gen_cpp/olap_file.pb.h"
-#include "gtest/gtest_pred_impl.h"
-#include "io/fs/file_reader_options.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/local_file_system.h"
-#include "io/fs/s3_file_system.h"
-#include "olap/data_dir.h"
-#include "olap/olap_common.h"
-#include "olap/options.h"
-#include "olap/row_cursor.h"
-#include "olap/row_cursor_cell.h"
-#include "olap/rowset/beta_rowset.h"
-#include "olap/rowset/rowset_meta.h"
-#include "olap/rowset/segment_v2/segment.h"
-#include "olap/rowset/segment_v2/segment_writer.h"
-#include "olap/storage_engine.h"
-#include "olap/tablet_schema.h"
-#include "olap/tablet_schema_helper.h"
-#include "runtime/exec_env.h"
-#include "util/s3_util.h"
-#include "util/slice.h"
-
-namespace doris {
-
-using ValueGenerator = std::function<void(size_t rid, int cid, int block_id,
RowCursorCell& cell)>;
-// 0, 1, 2, 3
-// 10, 11, 12, 13
-// 20, 21, 22, 23
-static void DefaultIntGenerator(size_t rid, int cid, int block_id,
RowCursorCell& cell) {
- cell.set_not_null();
- *(int*)cell.mutable_cell_ptr() = rid * 10 + cid;
-}
-
-static StorageEngine* k_engine = nullptr;
-static std::string kSegmentDir = "./ut_dir/remote_file_cache_test";
-static int64_t tablet_id = 0;
-static RowsetId rowset_id;
-static std::string resource_id = "10000";
-
-class RemoteFileCacheTest : public ::testing::Test {
-protected:
- static void SetUpTestSuite() {
-
EXPECT_TRUE(io::global_local_filesystem()->delete_and_create_directory(kSegmentDir).ok());
-
-
doris::ExecEnv::GetInstance()->init_download_cache_required_components();
-
- doris::EngineOptions options;
- k_engine = new StorageEngine(options);
- StorageEngine::_s_instance = k_engine;
- }
-
- static void TearDownTestSuite() {
-
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(kSegmentDir).ok());
- if (k_engine != nullptr) {
- k_engine->stop();
- delete k_engine;
- k_engine = nullptr;
- }
- config::file_cache_type = "";
- }
-
- TabletSchemaSPtr create_schema(const std::vector<TabletColumn>& columns,
- KeysType keys_type = DUP_KEYS, int
num_custom_key_columns = -1) {
- TabletSchemaSPtr res = std::make_shared<TabletSchema>();
-
- for (auto& col : columns) {
- res->append_column(col);
- }
- res->_num_short_key_columns =
- num_custom_key_columns != -1 ? num_custom_key_columns :
res->num_key_columns();
- res->_keys_type = keys_type;
- return res;
- }
-
- void build_segment(SegmentWriterOptions opts, TabletSchemaSPtr
build_schema,
- TabletSchemaSPtr query_schema, size_t nrows, const
ValueGenerator& generator,
- std::shared_ptr<Segment>* res) {
- std::string filename = fmt::format("{}_0.dat", rowset_id.to_string());
- std::string path = fmt::format("{}/{}", kSegmentDir, filename);
- auto fs = io::global_local_filesystem();
-
- io::FileWriterPtr file_writer;
- Status st = fs->create_file(path, &file_writer);
- EXPECT_TRUE(st.ok());
- DataDir data_dir(kSegmentDir);
- data_dir.init();
- SegmentWriter writer(file_writer.get(), 0, build_schema, nullptr,
&data_dir, INT32_MAX,
- opts, nullptr);
- st = writer.init();
- EXPECT_TRUE(st.ok());
-
- RowCursor row;
- auto olap_st = row.init(build_schema);
- EXPECT_EQ(Status::OK(), olap_st);
-
- for (size_t rid = 0; rid < nrows; ++rid) {
- for (int cid = 0; cid < build_schema->num_columns(); ++cid) {
- int row_block_id = rid / opts.num_rows_per_block;
- RowCursorCell cell = row.cell(cid);
- generator(rid, cid, row_block_id, cell);
- }
- EXPECT_TRUE(writer.append_row(row).ok());
- }
-
- uint64_t file_size, index_size;
- st = writer.finalize(&file_size, &index_size);
- EXPECT_TRUE(st.ok());
- EXPECT_TRUE(file_writer->close().ok());
-
- EXPECT_NE("", writer.min_encoded_key().to_string());
- EXPECT_NE("", writer.max_encoded_key().to_string());
-
- io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
- io::SegmentCachePathPolicy());
- st = segment_v2::Segment::open(fs, path, 0, {}, query_schema,
reader_options, res);
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(nrows, (*res)->num_rows());
- }
-
- void test_remote_file_cache(std::string file_cache_type, int
max_sub_cache_file_size) {
- TabletSchemaSPtr tablet_schema = create_schema(
- {create_int_key(1), create_int_key(2), create_int_value(3),
create_int_value(4)});
- SegmentWriterOptions opts;
- opts.num_rows_per_block = 10;
-
- std::shared_ptr<segment_v2::Segment> segment;
- build_segment(opts, tablet_schema, tablet_schema, 4096,
DefaultIntGenerator, &segment);
-
- config::file_cache_type = file_cache_type;
- config::max_sub_cache_file_size = max_sub_cache_file_size;
- RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
- BetaRowset rowset(tablet_schema, kSegmentDir, rowset_meta);
-
- // just use to create s3 filesystem, otherwise won't use cache
- S3Conf s3_conf;
- std::shared_ptr<io::S3FileSystem> fs;
- Status st = io::S3FileSystem::create(std::move(s3_conf), resource_id,
&fs);
- // io::S3FileSystem::create will call connect, which will fail because
s3_conf is empty.
- // but it does affect the following unit test
- ASSERT_FALSE(st.ok()) << st;
- rowset.rowset_meta()->set_num_segments(1);
- rowset.rowset_meta()->set_fs(fs);
- rowset.rowset_meta()->set_tablet_id(tablet_id);
- rowset.rowset_meta()->set_rowset_id(rowset_id);
-
- std::vector<segment_v2::SegmentSharedPtr> segments;
- st = rowset.load_segments(&segments);
- ASSERT_TRUE(st.ok()) << st;
- }
-};
-
-TEST_F(RemoteFileCacheTest, wholefilecache) {
- test_remote_file_cache("whole_file_cache", 0);
-}
-
-TEST_F(RemoteFileCacheTest, subfilecache) {
- test_remote_file_cache("sub_file_cache", 1024);
-}
-
-} // namespace doris
diff --git a/be/test/olap/delete_bitmap_calculator_test.cpp
b/be/test/olap/delete_bitmap_calculator_test.cpp
index e182af0f9d..6e842edc36 100644
--- a/be/test/olap/delete_bitmap_calculator_test.cpp
+++ b/be/test/olap/delete_bitmap_calculator_test.cpp
@@ -29,6 +29,7 @@
#include <vector>
#include "gtest/gtest_pred_impl.h"
+#include "io/fs/file_reader.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/primary_key_index.h"
@@ -132,10 +133,8 @@ public:
EXPECT_NE("", writer.min_encoded_key().to_string());
EXPECT_NE("", writer.max_encoded_key().to_string());
- io::FileReaderOptions reader_options(io::FileCachePolicy::NO_CACHE,
- io::SegmentCachePathPolicy());
st = segment_v2::Segment::open(fs, path, segment_id, rowset_id,
query_schema,
- reader_options, res);
+ io::FileReaderOptions {}, res);
EXPECT_TRUE(st.ok());
EXPECT_EQ(nrows, (*res)->num_rows());
}
diff --git a/be/test/olap/rowset/rowset_tree_test.cpp
b/be/test/olap/rowset/rowset_tree_test.cpp
index 0e86eec6b9..0864be576a 100644
--- a/be/test/olap/rowset/rowset_tree_test.cpp
+++ b/be/test/olap/rowset/rowset_tree_test.cpp
@@ -94,7 +94,7 @@ public:
RowsetMetaSharedPtr meta_ptr = make_shared<RowsetMeta>();
meta_ptr->init_from_pb(rs_meta_pb);
RowsetSharedPtr res_ptr;
- MockRowset::create_rowset(schema_, rowset_path_, meta_ptr, &res_ptr,
is_mem_rowset);
+ MockRowset::create_rowset(schema_, meta_ptr, &res_ptr, is_mem_rowset);
return res_ptr;
}
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index ff24826763..027807d2db 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -38,8 +38,7 @@
#include "exec/tablet_info.h"
#include "gen_cpp/internal_service.pb.h"
#include "gtest/gtest_pred_impl.h"
-#include "io/fs/file_reader_options.h"
-#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
@@ -77,16 +76,11 @@ static const std::string kTestDir =
"ut_dir/tablet_cooldown_test";
static constexpr int64_t kResourceId = 10000;
static constexpr int64_t kStoragePolicyId = 10002;
static constexpr int64_t kTabletId = 10005;
-static constexpr int64_t kTabletId2 = 10006;
static constexpr int64_t kReplicaId = 10009;
static constexpr int32_t kSchemaHash = 270068377;
-static constexpr int64_t kReplicaId2 = 10010;
-static constexpr int32_t kSchemaHash2 = 270068381;
static constexpr int32_t kTxnId = 20003;
static constexpr int32_t kPartitionId = 30003;
-static constexpr int32_t kTxnId2 = 40003;
-static constexpr int32_t kPartitionId2 = 50003;
using io::Path;
@@ -178,7 +172,7 @@ protected:
}
Status upload_impl(const Path& local_path, const Path& dest_path) override
{
- return _local_fs->link_file(local_path.string(),
get_remote_path(dest_path));
+ return _local_fs->link_file(local_path, get_remote_path(dest_path));
}
Status batch_upload_impl(const std::vector<Path>& local_paths,
@@ -210,7 +204,7 @@ protected:
io::FileReaderSPtr* reader) override {
io::FileDescription tmp_fd;
tmp_fd.path = get_remote_path(abs_path);
- return _local_fs->open_file(tmp_fd, io::FileReaderOptions::DEFAULT,
reader);
+ return _local_fs->open_file(tmp_fd, io::FileReaderOptions {}, reader);
}
Status connect_impl() override { return Status::OK(); }
@@ -250,10 +244,7 @@ public:
->delete_and_create_directory(config::storage_root_path)
.ok());
EXPECT_TRUE(io::global_local_filesystem()
-
->create_directory(get_remote_path(fmt::format("data/{}", kTabletId)))
- .ok());
- EXPECT_TRUE(io::global_local_filesystem()
-
->create_directory(get_remote_path(fmt::format("data/{}", kTabletId2)))
+
->create_directory(get_remote_path(remote_tablet_path(kTabletId)))
.ok());
std::vector<StorePath> paths {{config::storage_root_path, -1}};
@@ -435,7 +426,6 @@ TEST_F(TabletCooldownTest, normal) {
TabletSharedPtr tablet1;
TabletSharedPtr tablet2;
createTablet(&tablet1, kReplicaId, kSchemaHash, kTabletId, kTxnId,
kPartitionId);
- createTablet(&tablet2, kReplicaId2, kSchemaHash2, kTabletId2, kTxnId2,
kPartitionId2);
// test cooldown
tablet1->set_storage_policy_id(kStoragePolicyId);
Status st = tablet1->cooldown(); // rowset [0-1]
@@ -446,7 +436,6 @@ TEST_F(TabletCooldownTest, normal) {
ASSERT_EQ(Status::OK(), st);
st = tablet1->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
- sleep(30);
auto rs = tablet1->get_rowset_by_version({2, 2});
ASSERT_FALSE(rs->is_local());
@@ -456,32 +445,6 @@ TEST_F(TabletCooldownTest, normal) {
st = std::static_pointer_cast<BetaRowset>(rs)->load_segments(&segments);
ASSERT_EQ(Status::OK(), st);
ASSERT_EQ(segments.size(), 1);
-
- st = io::global_local_filesystem()->link_file(
- get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId,
kReplicaId, 1)),
- get_remote_path(fmt::format("data/{}/{}.{}.meta", kTabletId2,
kReplicaId, 2)));
- ASSERT_EQ(Status::OK(), st);
- // follow cooldown
- tablet2->set_storage_policy_id(kStoragePolicyId);
- tablet2->update_cooldown_conf(1, 111111111);
- st = tablet2->cooldown(); // rowset [0-1]
- ASSERT_NE(Status::OK(), st);
- tablet2->update_cooldown_conf(1, kReplicaId);
- st = tablet2->cooldown(); // rowset [0-1]
- ASSERT_NE(Status::OK(), st);
- tablet2->update_cooldown_conf(2, kReplicaId);
- st = tablet2->cooldown(); // rowset [0-1]
- ASSERT_EQ(Status::OK(), st);
- sleep(30);
- auto rs2 = tablet2->get_rowset_by_version({2, 2});
- ASSERT_FALSE(rs2->is_local());
-
- // test read tablet2
- ASSERT_EQ(Status::OK(), st);
- std::vector<segment_v2::SegmentSharedPtr> segments2;
- st = std::static_pointer_cast<BetaRowset>(rs2)->load_segments(&segments2);
- ASSERT_EQ(Status::OK(), st);
- ASSERT_EQ(segments2.size(), 1);
}
} // namespace doris
diff --git a/be/test/olap/tablet_meta_test.cpp
b/be/test/olap/tablet_meta_test.cpp
index 993fbf2d49..7d57765ca5 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -67,7 +67,7 @@ TEST(TabletMetaTest, TestReviseMeta) {
meta_ptr->init_from_pb(rs_meta_pb);
RowsetSharedPtr rowset_ptr;
TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
- MockRowset::create_rowset(schema, "", meta_ptr, &rowset_ptr, false);
+ MockRowset::create_rowset(schema, meta_ptr, &rowset_ptr, false);
src_rowsets.push_back(rowset_ptr);
tablet_meta.add_rs_meta(rowset_ptr->rowset_meta());
}
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index 0911337f03..dae8b93e19 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -628,8 +628,7 @@ public:
if (tablet.tablet_id != tablet_id || rowset == nullptr) {
continue;
}
- auto path =
- BetaRowset::segment_file_path(rowset->rowset_dir(),
rowset->rowset_id(), segid);
+ auto path =
static_cast<BetaRowset*>(rowset.get())->segment_file_path(segid);
LOG(INFO) << "read data from " << path;
std::ifstream inputFile(path, std::ios::binary);
inputFile.seekg(0, std::ios::end);
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
index 93b76d9c67..50065ebe6b 100644
--- a/be/test/testutil/mock_rowset.h
+++ b/be/test/testutil/mock_rowset.h
@@ -58,18 +58,16 @@ class MockRowset : public Rowset {
return Rowset::get_segments_key_bounds(segments_key_bounds);
}
- static Status create_rowset(TabletSchemaSPtr schema, const std::string&
rowset_path,
- RowsetMetaSharedPtr rowset_meta,
RowsetSharedPtr* rowset,
- bool is_mem_rowset = false) {
- rowset->reset(new MockRowset(schema, rowset_path, rowset_meta));
+ static Status create_rowset(TabletSchemaSPtr schema, RowsetMetaSharedPtr
rowset_meta,
+ RowsetSharedPtr* rowset, bool is_mem_rowset =
false) {
+ rowset->reset(new MockRowset(schema, rowset_meta));
((MockRowset*)rowset->get())->is_mem_rowset_ = is_mem_rowset;
return Status::OK();
}
protected:
- MockRowset(TabletSchemaSPtr schema, const std::string& rowset_path,
- RowsetMetaSharedPtr rowset_meta)
- : Rowset(schema, rowset_path, rowset_meta) {}
+ MockRowset(TabletSchemaSPtr schema, RowsetMetaSharedPtr rowset_meta)
+ : Rowset(schema, rowset_meta) {}
Status init() override { return Status::NotSupported("MockRowset not
support this method."); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]