This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a943adac1a [feature](cache) Add FileCache for RemoteFile (#11186)
a943adac1a is described below
commit a943adac1a54c09aa5c41e9caf2149740db1d74d
Author: pengxiangyu <[email protected]>
AuthorDate: Thu Aug 4 10:57:32 2022 +0800
[feature](cache) Add FileCache for RemoteFile (#11186)
Add FileCache for RemoteFile, it will be opened in StoragePolicy.
Cold data in remote file will be download to local cache files.
---
be/src/common/config.h | 11 +-
be/src/io/CMakeLists.txt | 3 +
be/src/io/cache/file_cache.h | 51 +++++++
be/src/io/cache/file_cache_manager.cpp | 62 ++++++++
be/src/io/cache/file_cache_manager.h | 53 +++++++
be/src/io/cache/sub_file_cache.cpp | 243 ++++++++++++++++++++++++++++++
be/src/io/cache/sub_file_cache.h | 79 ++++++++++
be/src/io/cache/whole_file_cache.cpp | 146 ++++++++++++++++++
be/src/io/cache/whole_file_cache.h | 72 +++++++++
be/src/olap/rowset/segment_v2/segment.cpp | 16 +-
10 files changed, 733 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2bbee690af..5bafe0899c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -809,7 +809,16 @@ CONF_mBool(enable_function_pushdown, "false");
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
CONF_Int32(concurrency_per_dir, "2");
-CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
+CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
+CONF_mInt64(max_sub_cache_file_size, "1073741824"); // 1GB
+CONF_mInt64(file_cache_alive_time_sec, "604800"); // 1 week
+// file_cache_type is used to set the type of file cache for remote files.
+// "": no cache, "sub_file_cache": split sub files from remote file.
+// "whole_file_cache": the whole file.
+CONF_mString(file_cache_type, "");
+CONF_Validator(file_cache_type, [](const std::string config) -> bool {
+ return config == "sub_file_cache" || config == "whole_file_cache" ||
config == "";
+});
CONF_Int32(s3_transfer_executor_pool_size, "2");
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 768a33ccb1..69a473d14a 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -40,6 +40,9 @@ set(IO_FILES
fs/local_file_writer.cpp
fs/s3_file_reader.cpp
fs/s3_file_system.cpp
+ cache/file_cache_manager.cpp
+ cache/sub_file_cache.cpp
+ cache/whole_file_cache.cpp
)
add_library(IO STATIC
diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h
new file mode 100644
index 0000000000..fee37505bd
--- /dev/null
+++ b/be/src/io/cache/file_cache.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class FileCache : public FileReader {
+public:
+ FileCache() = default;
+ virtual ~FileCache() = default;
+
+ DISALLOW_COPY_AND_ASSIGN(FileCache);
+
+ virtual const Path& cache_dir() const = 0;
+
+ virtual size_t cache_file_size() const = 0;
+
+ virtual io::FileReaderSPtr remote_file_reader() const = 0;
+
+ virtual Status clean_timeout_cache() = 0;
+
+ virtual Status clean_all_cache() = 0;
+};
+
+using FileCachePtr = std::shared_ptr<FileCache>;
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.cpp
b/be/src/io/cache/file_cache_manager.cpp
new file mode 100644
index 0000000000..103a8ee0c0
--- /dev/null
+++ b/be/src/io/cache/file_cache_manager.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/file_cache_manager.h"
+
+#include "io/cache/sub_file_cache.h"
+#include "io/cache/whole_file_cache.h"
+
+namespace doris {
+namespace io {
+
+void FileCacheManager::add_file_cache(const Path& cache_path, FileCachePtr
file_cache) {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+ _file_cache_map.emplace(cache_path.native(), file_cache);
+}
+
+void FileCacheManager::remove_file_cache(const Path& cache_path) {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+ _file_cache_map.erase(cache_path.native());
+}
+
+void FileCacheManager::clean_timeout_caches() {
+ std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
+ for (std::map<std::string, FileCachePtr>::const_iterator iter =
_file_cache_map.cbegin();
+ iter != _file_cache_map.cend(); ++iter) {
+ iter->second->clean_timeout_cache();
+ }
+}
+
+FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t
alive_time_sec,
+ io::FileReaderSPtr
remote_file_reader,
+ const std::string&
file_cache_type) {
+ if (file_cache_type == "whole_file_cache") {
+ return std::make_unique<WholeFileCache>(cache_dir, alive_time_sec,
remote_file_reader);
+ } else if (file_cache_type == "sub_file_cache") {
+ return std::make_unique<SubFileCache>(cache_dir, alive_time_sec,
remote_file_reader);
+ } else {
+ return nullptr;
+ }
+}
+
+FileCacheManager* FileCacheManager::instance() {
+ static FileCacheManager cache_manager;
+ return &cache_manager;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.h
b/be/src/io/cache/file_cache_manager.h
new file mode 100644
index 0000000000..4a3e31ba92
--- /dev/null
+++ b/be/src/io/cache/file_cache_manager.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+
+namespace doris {
+namespace io {
+
+class FileCacheManager {
+public:
+ FileCacheManager() = default;
+ ~FileCacheManager() = default;
+
+ static FileCacheManager* instance();
+
+ void add_file_cache(const Path& cache_path, FileCachePtr file_cache);
+
+ void remove_file_cache(const Path& cache_path);
+
+ void clean_timeout_caches();
+
+ FileCachePtr new_file_cache(const Path& cache_dir, int64_t alive_time_sec,
+ io::FileReaderSPtr remote_file_reader,
+ const std::string& file_cache_type);
+
+private:
+ std::shared_mutex _cache_map_lock;
+ // cache_path -> FileCache
+ std::map<std::string, FileCachePtr> _file_cache_map;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.cpp
b/be/src/io/cache/sub_file_cache.cpp
new file mode 100644
index 0000000000..519fa29730
--- /dev/null
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/sub_file_cache.h"
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+
+namespace doris {
+namespace io {
+
+using std::vector;
+
+const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE";
+const static std::string SUB_FILE_DONE_PREFIX = "SUB_CACHE_DONE";
+
+SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
+ io::FileReaderSPtr remote_file_reader)
+ : _cache_dir(cache_dir),
+ _alive_time_sec(alive_time_sec),
+ _remote_file_reader(remote_file_reader) {}
+
+SubFileCache::~SubFileCache() {}
+
+Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) {
+ std::vector<size_t> need_cache_offsets;
+ RETURN_IF_ERROR(_get_need_cache_offsets(offset, result.size,
&need_cache_offsets));
+ bool need_download = false;
+ {
+ std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+ for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+ iter != need_cache_offsets.cend(); ++iter) {
+ if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
+ _cache_file_readers[*iter] == nullptr) {
+ need_download = true;
+ break;
+ }
+ }
+ }
+ if (need_download) {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+ for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+ iter != need_cache_offsets.cend(); ++iter) {
+ if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
+ _cache_file_readers[*iter] == nullptr) {
+ size_t offset_begin = *iter;
+ size_t req_size = config::max_sub_cache_file_size;
+ if (offset_begin + req_size > _remote_file_reader->size()) {
+ req_size = _remote_file_reader->size() - offset_begin;
+ }
+ RETURN_IF_ERROR(_generate_cache_reader(*iter, req_size));
+ }
+ }
+ _cache_file_size = _get_cache_file_size();
+ }
+ {
+ std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+ *bytes_read = 0;
+ for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+ iter != need_cache_offsets.cend(); ++iter) {
+ size_t offset_begin = *iter;
+ size_t req_size = config::max_sub_cache_file_size;
+ if (_cache_file_readers.find(*iter) == _cache_file_readers.end()) {
+ LOG(ERROR) << "Local cache file reader can't be found: " <<
offset_begin << ", "
+ << offset_begin;
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
+ if (offset_begin < offset) {
+ offset_begin = offset;
+ }
+ if (offset + result.size < *iter +
config::max_sub_cache_file_size) {
+ req_size = offset + result.size - offset_begin;
+ }
+ Slice read_slice(result.mutable_data() + offset_begin - offset,
req_size);
+ size_t sub_bytes_read = -1;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ _cache_file_readers[*iter]->read_at(offset_begin - *iter,
read_slice,
+ &sub_bytes_read),
+ fmt::format("Read local cache file failed: {}",
+ _cache_file_readers[*iter]->path().native()));
+ if (sub_bytes_read != read_slice.size) {
+ LOG(ERROR) << "read local cache file failed: "
+ << _cache_file_readers[*iter]->path().native()
+ << ", bytes read: " << sub_bytes_read << " vs req
size: " << req_size;
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
+ *bytes_read += sub_bytes_read;
+ _last_match_times[*iter] = time(nullptr);
+ }
+ }
+ return Status::OK();
+}
+
+Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
+ Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX,
offset);
+ Path cache_done_file = _cache_dir / fmt::format("{}_{}",
SUB_FILE_DONE_PREFIX, offset);
+ bool done_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
+ fmt::format("Check local cache done file exist failed. {}",
cache_done_file.native()));
+ if (!done_file_exist) {
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
+ if (cache_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(cache_file),
+ fmt::format("Check local cache file exist failed. {}",
cache_file.native()));
+ }
+ LOG(INFO) << "Download cache file from remote file: "
+ << _remote_file_reader->path().native() << " -> " <<
cache_file.native();
+ std::unique_ptr<char[]> file_buf(new char[req_size]);
+ Slice file_slice(file_buf.get(), req_size);
+ size_t bytes_read = 0;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ _remote_file_reader->read_at(0, file_slice, &bytes_read),
+ fmt::format("read remote file failed. {}. offset: {}, size:
{}",
+ _remote_file_reader->path().native(), offset,
req_size));
+ if (bytes_read != req_size) {
+ LOG(ERROR) << "read remote file failed: " <<
_remote_file_reader->path().native()
+ << ", bytes read: " << bytes_read
+ << " vs file size: " << _remote_file_reader->size();
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
+ io::FileWriterPtr file_writer;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->create_file(cache_file,
&file_writer),
+ fmt::format("Create local cache file failed: {}",
cache_file.native()));
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ file_writer->append(file_slice),
+ fmt::format("Write local cache file failed: {}",
cache_file.native()));
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->create_file(cache_done_file,
&file_writer),
+ fmt::format("Create local done file failed: {}",
cache_done_file.native()));
+ }
+ io::FileReaderSPtr cache_reader;
+ RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file,
&cache_reader));
+ _cache_file_readers.emplace(offset, cache_reader);
+ _last_match_times.emplace(offset, time(nullptr));
+ LOG(INFO) << "Create cache file from remote file successfully: "
+ << _remote_file_reader->path().native() << "(" << offset << ", "
<< req_size
+ << ") -> " << cache_file.native();
+ return Status::OK();
+}
+
+Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size,
+ std::vector<size_t>*
cache_offsets) {
+ size_t first_offset_begin =
+ offset / config::max_sub_cache_file_size *
config::max_sub_cache_file_size;
+ for (size_t begin = first_offset_begin; begin < offset + req_size;
+ begin += config::max_sub_cache_file_size) {
+ cache_offsets->push_back(begin);
+ }
+ return Status::OK();
+}
+
+Status SubFileCache::clean_timeout_cache() {
+ std::vector<size_t> timeout_keys;
+ {
+ std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+ for (std::map<size_t, int64_t>::const_iterator iter =
_last_match_times.cbegin();
+ iter != _last_match_times.cend(); ++iter) {
+ if (time(nullptr) - iter->second > _alive_time_sec) {
+ timeout_keys.emplace_back(iter->first);
+ }
+ }
+ }
+ if (timeout_keys.size() > 0) {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+ for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
+ iter != timeout_keys.cend(); ++iter) {
+ RETURN_IF_ERROR(_clean_cache_internal(*iter));
+ }
+ _cache_file_size = _get_cache_file_size();
+ }
+ return Status::OK();
+}
+
+Status SubFileCache::clean_all_cache() {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+ for (std::map<size_t, int64_t>::const_iterator iter =
_last_match_times.cbegin();
+ iter != _last_match_times.cend(); ++iter) {
+ RETURN_IF_ERROR(_clean_cache_internal(iter->first));
+ }
+ _cache_file_size = _get_cache_file_size();
+ return Status::OK();
+}
+
+Status SubFileCache::_clean_cache_internal(size_t offset) {
+ if (_cache_file_readers.find(offset) != _cache_file_readers.end()) {
+ _cache_file_readers.erase(offset);
+ }
+ _cache_file_size = 0;
+ Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX,
offset);
+ Path done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX,
offset);
+ bool done_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(done_file, &done_file_exist),
+ "Check local done file exist failed.");
+ if (done_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(done_file),
+ fmt::format("Delete local done file failed: {}",
done_file.native()));
+ }
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (cache_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(cache_file),
+ fmt::format("Delete local cache file failed: {}",
cache_file.native()));
+ }
+ LOG(INFO) << "Delete local cache file successfully: " <<
cache_file.native();
+ return Status::OK();
+}
+
+size_t SubFileCache::_get_cache_file_size() {
+ size_t cache_file_size = 0;
+ for (std::map<size_t, io::FileReaderSPtr>::const_iterator iter =
_cache_file_readers.cbegin();
+ iter != _cache_file_readers.cend(); ++iter) {
+ cache_file_size += iter->second->size();
+ }
+ return cache_file_size;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
new file mode 100644
index 0000000000..21fb33822f
--- /dev/null
+++ b/be/src/io/cache/sub_file_cache.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class SubFileCache final : public FileCache {
+public:
+ SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
+ io::FileReaderSPtr remote_file_reader);
+ ~SubFileCache() override;
+
+ Status close() override { return _remote_file_reader->close(); }
+
+ Status read_at(size_t offset, Slice result, size_t* bytes_read) override;
+
+ const Path& path() const override { return _remote_file_reader->path(); }
+
+ size_t size() const override { return _remote_file_reader->size(); }
+
+ bool closed() const override { return _remote_file_reader->closed(); }
+
+ const Path& cache_dir() const override { return _cache_dir; }
+
+ size_t cache_file_size() const override { return _cache_file_size; }
+
+ io::FileReaderSPtr remote_file_reader() const override { return
_remote_file_reader; }
+
+ Status clean_timeout_cache() override;
+
+ Status clean_all_cache() override;
+
+private:
+ Status _generate_cache_reader(size_t offset, size_t req_size);
+
+ Status _clean_cache_internal(size_t offset);
+
+ Status _get_need_cache_offsets(size_t offset, size_t req_size,
+ std::vector<size_t>* cache_offsets);
+
+ size_t _get_cache_file_size();
+
+private:
+ Path _cache_dir;
+ size_t _cache_file_size;
+ int64_t _alive_time_sec;
+ io::FileReaderSPtr _remote_file_reader;
+
+ std::shared_mutex _cache_map_lock;
+ // offset_begin -> last_match_time
+ std::map<size_t, int64_t> _last_match_times;
+ // offset_begin -> local file reader
+ std::map<size_t, io::FileReaderSPtr> _cache_file_readers;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.cpp
b/be/src/io/cache/whole_file_cache.cpp
new file mode 100644
index 0000000000..012ae3e983
--- /dev/null
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/whole_file_cache.h"
+
+#include "io/fs/local_file_system.h"
+
+namespace doris {
+namespace io {
+
+const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE";
+const static std::string WHOLE_FILE_CACHE_DONE_NAME = "WHOLE_FILE_CACHE_DONE";
+
+WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
+ io::FileReaderSPtr remote_file_reader)
+ : _cache_dir(cache_dir),
+ _alive_time_sec(alive_time_sec),
+ _remote_file_reader(remote_file_reader),
+ _last_match_time(time(nullptr)),
+ _cache_file_reader(nullptr) {}
+
+WholeFileCache::~WholeFileCache() {}
+
+Status WholeFileCache::read_at(size_t offset, Slice result, size_t*
bytes_read) {
+ if (_cache_file_reader == nullptr) {
+ RETURN_IF_ERROR(_generate_cache_reader(offset, result.size));
+ }
+ std::shared_lock<std::shared_mutex> rlock(_cache_lock);
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ _cache_file_reader->read_at(offset, result, bytes_read),
+ fmt::format("Read local cache file failed: {}",
_cache_file_reader->path().native()));
+ if (*bytes_read != result.size) {
+ LOG(ERROR) << "read cache file failed: " <<
_cache_file_reader->path().native()
+ << ", bytes read: " << bytes_read << " vs required size: "
<< result.size;
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
+ _last_match_time = time(nullptr);
+ return Status::OK();
+}
+
+Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+ Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
+ Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
+ bool done_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
+ "Check local cache done file exist failed.");
+ if (!done_file_exist) {
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (cache_file_exist) {
+
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->delete_file(cache_file),
+ "Check local cache file exist
failed.");
+ }
+ LOG(INFO) << "Download cache file from remote file: "
+ << _remote_file_reader->path().native() << " -> " <<
cache_file.native();
+ std::unique_ptr<char[]> file_buf(new
char[_remote_file_reader->size()]);
+ Slice file_slice(file_buf.get(), _remote_file_reader->size());
+ size_t bytes_read = 0;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ _remote_file_reader->read_at(0, file_slice, &bytes_read),
+ fmt::format("read remote file failed. {}",
_remote_file_reader->path().native()));
+ if (bytes_read != _remote_file_reader->size()) {
+ LOG(ERROR) << "read remote file failed: " <<
_remote_file_reader->path().native()
+ << ", bytes read: " << bytes_read
+ << " vs file size: " << _remote_file_reader->size();
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
+ io::FileWriterPtr file_writer;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->create_file(cache_file,
&file_writer),
+ fmt::format("Create local cache file failed: {}",
cache_file.native()));
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ file_writer->append(file_slice),
+ fmt::format("Write local cache file failed: {}",
cache_file.native()));
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->create_file(cache_done_file,
&file_writer),
+ fmt::format("Create local done file failed: {}",
cache_done_file.native()));
+ }
+ RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file,
&_cache_file_reader));
+ _cache_file_size = _cache_file_reader->size();
+ _last_match_time = time(nullptr);
+ LOG(INFO) << "Create cache file from remote file successfully: "
+ << _remote_file_reader->path().native() << " -> " <<
cache_file.native();
+ return Status::OK();
+}
+
+Status WholeFileCache::clean_timeout_cache() {
+ if (time(nullptr) - _last_match_time > _alive_time_sec) {
+ _clean_cache_internal();
+ }
+ return Status::OK();
+}
+
+Status WholeFileCache::clean_all_cache() {
+ _clean_cache_internal();
+ return Status::OK();
+}
+
+Status WholeFileCache::_clean_cache_internal() {
+ std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+ _cache_file_reader.reset();
+ _cache_file_size = 0;
+ Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
+ Path done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
+ bool done_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(done_file, &done_file_exist),
+ "Check local done file exist failed.");
+ if (done_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(done_file),
+ fmt::format("Delete local done file failed: {}",
done_file.native()));
+ }
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (cache_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(cache_file),
+ fmt::format("Delete local cache file failed: {}",
cache_file.native()));
+ }
+ LOG(INFO) << "Delete local cache file successfully: " <<
cache_file.native();
+ return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.h
b/be/src/io/cache/whole_file_cache.h
new file mode 100644
index 0000000000..8628b6c09d
--- /dev/null
+++ b/be/src/io/cache/whole_file_cache.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class WholeFileCache final : public FileCache {
+public:
+ WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
+ io::FileReaderSPtr remote_file_reader);
+ ~WholeFileCache() override;
+
+ Status close() override { return _remote_file_reader->close(); }
+
+ Status read_at(size_t offset, Slice result, size_t* bytes_read) override;
+
+ const Path& path() const override { return _remote_file_reader->path(); }
+
+ size_t size() const override { return _remote_file_reader->size(); }
+
+ bool closed() const override { return _remote_file_reader->closed(); }
+
+ const Path& cache_dir() const override { return _cache_dir; }
+
+ size_t cache_file_size() const override { return _cache_file_size; }
+
+ io::FileReaderSPtr remote_file_reader() const override { return
_remote_file_reader; }
+
+ Status clean_timeout_cache() override;
+
+ Status clean_all_cache() override;
+
+private:
+ Status _generate_cache_reader(size_t offset, size_t req_size);
+
+ Status _clean_cache_internal();
+
+private:
+ Path _cache_dir;
+ size_t _cache_file_size;
+ int64_t _alive_time_sec;
+ io::FileReaderSPtr _remote_file_reader;
+
+ std::shared_mutex _cache_lock;
+ int64_t _last_match_time;
+ io::FileReaderSPtr _cache_file_reader;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index fea0510172..a9deeed6e5 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -22,7 +22,9 @@
#include <memory>
#include <utility>
-#include "common/logging.h" // LOG
+#include "common/config.h"
+#include "common/logging.h" // LOG
+#include "io/cache/file_cache_manager.h"
#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
#include "olap/rowset/segment_v2/empty_segment_iterator.h"
#include "olap/rowset/segment_v2/page_io.h"
@@ -36,12 +38,22 @@
namespace doris {
namespace segment_v2 {
+using io::FileCacheManager;
+
Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t
segment_id,
TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>*
output) {
std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema));
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader));
- segment->_file_reader = std::move(file_reader);
+ if (config::file_cache_type.empty()) {
+ segment->_file_reader = std::move(file_reader);
+ } else {
+ std::string cache_path = path.substr(0, path.size() - 4);
+ io::FileReaderSPtr cache_reader =
FileCacheManager::instance()->new_file_cache(
+ cache_path, config::file_cache_alive_time_sec, file_reader,
+ config::file_cache_type);
+ segment->_file_reader = std::move(cache_reader);
+ }
RETURN_IF_ERROR(segment->_open());
*output = std::move(segment);
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]