This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a943adac1a [feature](cache) Add FileCache for RemoteFile (#11186)
a943adac1a is described below

commit a943adac1a54c09aa5c41e9caf2149740db1d74d
Author: pengxiangyu <[email protected]>
AuthorDate: Thu Aug 4 10:57:32 2022 +0800

    [feature](cache) Add FileCache for RemoteFile (#11186)
    
    Add FileCache for RemoteFile, it will be opened in StoragePolicy.
    Cold data in remote file will be download to local cache files.
---
 be/src/common/config.h                    |  11 +-
 be/src/io/CMakeLists.txt                  |   3 +
 be/src/io/cache/file_cache.h              |  51 +++++++
 be/src/io/cache/file_cache_manager.cpp    |  62 ++++++++
 be/src/io/cache/file_cache_manager.h      |  53 +++++++
 be/src/io/cache/sub_file_cache.cpp        | 243 ++++++++++++++++++++++++++++++
 be/src/io/cache/sub_file_cache.h          |  79 ++++++++++
 be/src/io/cache/whole_file_cache.cpp      | 146 ++++++++++++++++++
 be/src/io/cache/whole_file_cache.h        |  72 +++++++++
 be/src/olap/rowset/segment_v2/segment.cpp |  16 +-
 10 files changed, 733 insertions(+), 3 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2bbee690af..5bafe0899c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -809,7 +809,16 @@ CONF_mBool(enable_function_pushdown, "false");
 CONF_Int32(cooldown_thread_num, "5");
 CONF_mInt64(generate_cooldown_task_interval_sec, "20");
 CONF_Int32(concurrency_per_dir, "2");
-CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
+CONF_mInt64(cooldown_lag_time_sec, "10800");        // 3h
+CONF_mInt64(max_sub_cache_file_size, "1073741824"); // 1GB
+CONF_mInt64(file_cache_alive_time_sec, "604800");   // 1 week
+// file_cache_type is used to set the type of file cache for remote files.
+// "": no cache, "sub_file_cache": split sub files from remote file.
+// "whole_file_cache": the whole file.
+CONF_mString(file_cache_type, "");
+CONF_Validator(file_cache_type, [](const std::string config) -> bool {
+    return config == "sub_file_cache" || config == "whole_file_cache" || 
config == "";
+});
 
 CONF_Int32(s3_transfer_executor_pool_size, "2");
 
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 768a33ccb1..69a473d14a 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -40,6 +40,9 @@ set(IO_FILES
     fs/local_file_writer.cpp
     fs/s3_file_reader.cpp
     fs/s3_file_system.cpp
+    cache/file_cache_manager.cpp
+    cache/sub_file_cache.cpp
+    cache/whole_file_cache.cpp
 )
 
 add_library(IO STATIC
diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h
new file mode 100644
index 0000000000..fee37505bd
--- /dev/null
+++ b/be/src/io/cache/file_cache.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class FileCache : public FileReader {
+public:
+    FileCache() = default;
+    virtual ~FileCache() = default;
+
+    DISALLOW_COPY_AND_ASSIGN(FileCache);
+
+    virtual const Path& cache_dir() const = 0;
+
+    virtual size_t cache_file_size() const = 0;
+
+    virtual io::FileReaderSPtr remote_file_reader() const = 0;
+
+    virtual Status clean_timeout_cache() = 0;
+
+    virtual Status clean_all_cache() = 0;
+};
+
+using FileCachePtr = std::shared_ptr<FileCache>;
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.cpp 
b/be/src/io/cache/file_cache_manager.cpp
new file mode 100644
index 0000000000..103a8ee0c0
--- /dev/null
+++ b/be/src/io/cache/file_cache_manager.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/file_cache_manager.h"
+
+#include "io/cache/sub_file_cache.h"
+#include "io/cache/whole_file_cache.h"
+
+namespace doris {
+namespace io {
+
+void FileCacheManager::add_file_cache(const Path& cache_path, FileCachePtr 
file_cache) {
+    std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+    _file_cache_map.emplace(cache_path.native(), file_cache);
+}
+
+void FileCacheManager::remove_file_cache(const Path& cache_path) {
+    std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+    _file_cache_map.erase(cache_path.native());
+}
+
+void FileCacheManager::clean_timeout_caches() {
+    std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
+    for (std::map<std::string, FileCachePtr>::const_iterator iter = 
_file_cache_map.cbegin();
+         iter != _file_cache_map.cend(); ++iter) {
+        iter->second->clean_timeout_cache();
+    }
+}
+
+FileCachePtr FileCacheManager::new_file_cache(const Path& cache_dir, int64_t 
alive_time_sec,
+                                              io::FileReaderSPtr 
remote_file_reader,
+                                              const std::string& 
file_cache_type) {
+    if (file_cache_type == "whole_file_cache") {
+        return std::make_unique<WholeFileCache>(cache_dir, alive_time_sec, 
remote_file_reader);
+    } else if (file_cache_type == "sub_file_cache") {
+        return std::make_unique<SubFileCache>(cache_dir, alive_time_sec, 
remote_file_reader);
+    } else {
+        return nullptr;
+    }
+}
+
+FileCacheManager* FileCacheManager::instance() {
+    static FileCacheManager cache_manager;
+    return &cache_manager;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/file_cache_manager.h 
b/be/src/io/cache/file_cache_manager.h
new file mode 100644
index 0000000000..4a3e31ba92
--- /dev/null
+++ b/be/src/io/cache/file_cache_manager.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+
+namespace doris {
+namespace io {
+
+class FileCacheManager {
+public:
+    FileCacheManager() = default;
+    ~FileCacheManager() = default;
+
+    static FileCacheManager* instance();
+
+    void add_file_cache(const Path& cache_path, FileCachePtr file_cache);
+
+    void remove_file_cache(const Path& cache_path);
+
+    void clean_timeout_caches();
+
+    FileCachePtr new_file_cache(const Path& cache_dir, int64_t alive_time_sec,
+                                io::FileReaderSPtr remote_file_reader,
+                                const std::string& file_cache_type);
+
+private:
+    std::shared_mutex _cache_map_lock;
+    // cache_path -> FileCache
+    std::map<std::string, FileCachePtr> _file_cache_map;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.cpp 
b/be/src/io/cache/sub_file_cache.cpp
new file mode 100644
index 0000000000..519fa29730
--- /dev/null
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/sub_file_cache.h"
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+
+namespace doris {
+namespace io {
+
+using std::vector;
+
+const static std::string SUB_FILE_CACHE_PREFIX = "SUB_CACHE";
+const static std::string SUB_FILE_DONE_PREFIX = "SUB_CACHE_DONE";
+
+SubFileCache::SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
+                           io::FileReaderSPtr remote_file_reader)
+        : _cache_dir(cache_dir),
+          _alive_time_sec(alive_time_sec),
+          _remote_file_reader(remote_file_reader) {}
+
+SubFileCache::~SubFileCache() {}
+
+Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) {
+    std::vector<size_t> need_cache_offsets;
+    RETURN_IF_ERROR(_get_need_cache_offsets(offset, result.size, 
&need_cache_offsets));
+    bool need_download = false;
+    {
+        std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+        for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+             iter != need_cache_offsets.cend(); ++iter) {
+            if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
+                _cache_file_readers[*iter] == nullptr) {
+                need_download = true;
+                break;
+            }
+        }
+    }
+    if (need_download) {
+        std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+        for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+             iter != need_cache_offsets.cend(); ++iter) {
+            if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
+                _cache_file_readers[*iter] == nullptr) {
+                size_t offset_begin = *iter;
+                size_t req_size = config::max_sub_cache_file_size;
+                if (offset_begin + req_size > _remote_file_reader->size()) {
+                    req_size = _remote_file_reader->size() - offset_begin;
+                }
+                RETURN_IF_ERROR(_generate_cache_reader(*iter, req_size));
+            }
+        }
+        _cache_file_size = _get_cache_file_size();
+    }
+    {
+        std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+        *bytes_read = 0;
+        for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
+             iter != need_cache_offsets.cend(); ++iter) {
+            size_t offset_begin = *iter;
+            size_t req_size = config::max_sub_cache_file_size;
+            if (_cache_file_readers.find(*iter) == _cache_file_readers.end()) {
+                LOG(ERROR) << "Local cache file reader can't be found: " << 
offset_begin << ", "
+                           << offset_begin;
+                return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+            }
+            if (offset_begin < offset) {
+                offset_begin = offset;
+            }
+            if (offset + result.size < *iter + 
config::max_sub_cache_file_size) {
+                req_size = offset + result.size - offset_begin;
+            }
+            Slice read_slice(result.mutable_data() + offset_begin - offset, 
req_size);
+            size_t sub_bytes_read = -1;
+            RETURN_NOT_OK_STATUS_WITH_WARN(
+                    _cache_file_readers[*iter]->read_at(offset_begin - *iter, 
read_slice,
+                                                        &sub_bytes_read),
+                    fmt::format("Read local cache file failed: {}",
+                                _cache_file_readers[*iter]->path().native()));
+            if (sub_bytes_read != read_slice.size) {
+                LOG(ERROR) << "read local cache file failed: "
+                           << _cache_file_readers[*iter]->path().native()
+                           << ", bytes read: " << sub_bytes_read << " vs req 
size: " << req_size;
+                return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+            }
+            *bytes_read += sub_bytes_read;
+            _last_match_times[*iter] = time(nullptr);
+        }
+    }
+    return Status::OK();
+}
+
+Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
+    Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, 
offset);
+    Path cache_done_file = _cache_dir / fmt::format("{}_{}", 
SUB_FILE_DONE_PREFIX, offset);
+    bool done_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_done_file, 
&done_file_exist),
+            fmt::format("Check local cache done file exist failed. {}", 
cache_done_file.native()));
+    if (!done_file_exist) {
+        bool cache_file_exist = false;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
+                fmt::format("Check local cache file exist failed. {}", 
cache_file.native()));
+        if (cache_file_exist) {
+            RETURN_NOT_OK_STATUS_WITH_WARN(
+                    io::global_local_filesystem()->delete_file(cache_file),
+                    fmt::format("Check local cache file exist failed. {}", 
cache_file.native()));
+        }
+        LOG(INFO) << "Download cache file from remote file: "
+                  << _remote_file_reader->path().native() << " -> " << 
cache_file.native();
+        std::unique_ptr<char[]> file_buf(new char[req_size]);
+        Slice file_slice(file_buf.get(), req_size);
+        size_t bytes_read = 0;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                _remote_file_reader->read_at(0, file_slice, &bytes_read),
+                fmt::format("read remote file failed. {}. offset: {}, size: 
{}",
+                            _remote_file_reader->path().native(), offset, 
req_size));
+        if (bytes_read != req_size) {
+            LOG(ERROR) << "read remote file failed: " << 
_remote_file_reader->path().native()
+                       << ", bytes read: " << bytes_read
+                       << " vs file size: " << _remote_file_reader->size();
+            return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+        }
+        io::FileWriterPtr file_writer;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_file, 
&file_writer),
+                fmt::format("Create local cache file failed: {}", 
cache_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                file_writer->append(file_slice),
+                fmt::format("Write local cache file failed: {}", 
cache_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_done_file, 
&file_writer),
+                fmt::format("Create local done file failed: {}", 
cache_done_file.native()));
+    }
+    io::FileReaderSPtr cache_reader;
+    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, 
&cache_reader));
+    _cache_file_readers.emplace(offset, cache_reader);
+    _last_match_times.emplace(offset, time(nullptr));
+    LOG(INFO) << "Create cache file from remote file successfully: "
+              << _remote_file_reader->path().native() << "(" << offset << ", " 
<< req_size
+              << ") -> " << cache_file.native();
+    return Status::OK();
+}
+
+Status SubFileCache::_get_need_cache_offsets(size_t offset, size_t req_size,
+                                             std::vector<size_t>* 
cache_offsets) {
+    size_t first_offset_begin =
+            offset / config::max_sub_cache_file_size * 
config::max_sub_cache_file_size;
+    for (size_t begin = first_offset_begin; begin < offset + req_size;
+         begin += config::max_sub_cache_file_size) {
+        cache_offsets->push_back(begin);
+    }
+    return Status::OK();
+}
+
+Status SubFileCache::clean_timeout_cache() {
+    std::vector<size_t> timeout_keys;
+    {
+        std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
+        for (std::map<size_t, int64_t>::const_iterator iter = 
_last_match_times.cbegin();
+             iter != _last_match_times.cend(); ++iter) {
+            if (time(nullptr) - iter->second > _alive_time_sec) {
+                timeout_keys.emplace_back(iter->first);
+            }
+        }
+    }
+    if (timeout_keys.size() > 0) {
+        std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+        for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
+             iter != timeout_keys.cend(); ++iter) {
+            RETURN_IF_ERROR(_clean_cache_internal(*iter));
+        }
+        _cache_file_size = _get_cache_file_size();
+    }
+    return Status::OK();
+}
+
+Status SubFileCache::clean_all_cache() {
+    std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+    for (std::map<size_t, int64_t>::const_iterator iter = 
_last_match_times.cbegin();
+         iter != _last_match_times.cend(); ++iter) {
+        RETURN_IF_ERROR(_clean_cache_internal(iter->first));
+    }
+    _cache_file_size = _get_cache_file_size();
+    return Status::OK();
+}
+
+Status SubFileCache::_clean_cache_internal(size_t offset) {
+    if (_cache_file_readers.find(offset) != _cache_file_readers.end()) {
+        _cache_file_readers.erase(offset);
+    }
+    _cache_file_size = 0;
+    Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, 
offset);
+    Path done_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_DONE_PREFIX, 
offset);
+    bool done_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(done_file, &done_file_exist),
+            "Check local done file exist failed.");
+    if (done_file_exist) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(done_file),
+                fmt::format("Delete local done file failed: {}", 
done_file.native()));
+    }
+    bool cache_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
+            "Check local cache file exist failed.");
+    if (cache_file_exist) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(cache_file),
+                fmt::format("Delete local cache file failed: {}", 
cache_file.native()));
+    }
+    LOG(INFO) << "Delete local cache file successfully: " << 
cache_file.native();
+    return Status::OK();
+}
+
+size_t SubFileCache::_get_cache_file_size() {
+    size_t cache_file_size = 0;
+    for (std::map<size_t, io::FileReaderSPtr>::const_iterator iter = 
_cache_file_readers.cbegin();
+         iter != _cache_file_readers.cend(); ++iter) {
+        cache_file_size += iter->second->size();
+    }
+    return cache_file_size;
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
new file mode 100644
index 0000000000..21fb33822f
--- /dev/null
+++ b/be/src/io/cache/sub_file_cache.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class SubFileCache final : public FileCache {
+public:
+    SubFileCache(const Path& cache_dir, int64_t alive_time_sec,
+                 io::FileReaderSPtr remote_file_reader);
+    ~SubFileCache() override;
+
+    Status close() override { return _remote_file_reader->close(); }
+
+    Status read_at(size_t offset, Slice result, size_t* bytes_read) override;
+
+    const Path& path() const override { return _remote_file_reader->path(); }
+
+    size_t size() const override { return _remote_file_reader->size(); }
+
+    bool closed() const override { return _remote_file_reader->closed(); }
+
+    const Path& cache_dir() const override { return _cache_dir; }
+
+    size_t cache_file_size() const override { return _cache_file_size; }
+
+    io::FileReaderSPtr remote_file_reader() const override { return 
_remote_file_reader; }
+
+    Status clean_timeout_cache() override;
+
+    Status clean_all_cache() override;
+
+private:
+    Status _generate_cache_reader(size_t offset, size_t req_size);
+
+    Status _clean_cache_internal(size_t offset);
+
+    Status _get_need_cache_offsets(size_t offset, size_t req_size,
+                                   std::vector<size_t>* cache_offsets);
+
+    size_t _get_cache_file_size();
+
+private:
+    Path _cache_dir;
+    size_t _cache_file_size;
+    int64_t _alive_time_sec;
+    io::FileReaderSPtr _remote_file_reader;
+
+    std::shared_mutex _cache_map_lock;
+    // offset_begin -> last_match_time
+    std::map<size_t, int64_t> _last_match_times;
+    // offset_begin -> local file reader
+    std::map<size_t, io::FileReaderSPtr> _cache_file_readers;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.cpp 
b/be/src/io/cache/whole_file_cache.cpp
new file mode 100644
index 0000000000..012ae3e983
--- /dev/null
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/whole_file_cache.h"
+
+#include "io/fs/local_file_system.h"
+
+namespace doris {
+namespace io {
+
+const static std::string WHOLE_FILE_CACHE_NAME = "WHOLE_FILE_CACHE";
+const static std::string WHOLE_FILE_CACHE_DONE_NAME = "WHOLE_FILE_CACHE_DONE";
+
+WholeFileCache::WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
+                               io::FileReaderSPtr remote_file_reader)
+        : _cache_dir(cache_dir),
+          _alive_time_sec(alive_time_sec),
+          _remote_file_reader(remote_file_reader),
+          _last_match_time(time(nullptr)),
+          _cache_file_reader(nullptr) {}
+
+WholeFileCache::~WholeFileCache() {}
+
+Status WholeFileCache::read_at(size_t offset, Slice result, size_t* 
bytes_read) {
+    if (_cache_file_reader == nullptr) {
+        RETURN_IF_ERROR(_generate_cache_reader(offset, result.size));
+    }
+    std::shared_lock<std::shared_mutex> rlock(_cache_lock);
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            _cache_file_reader->read_at(offset, result, bytes_read),
+            fmt::format("Read local cache file failed: {}", 
_cache_file_reader->path().native()));
+    if (*bytes_read != result.size) {
+        LOG(ERROR) << "read cache file failed: " << 
_cache_file_reader->path().native()
+                   << ", bytes read: " << bytes_read << " vs required size: " 
<< result.size;
+        return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+    }
+    _last_match_time = time(nullptr);
+    return Status::OK();
+}
+
+Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
+    std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+    Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
+    Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
+    bool done_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_done_file, 
&done_file_exist),
+            "Check local cache done file exist failed.");
+    if (!done_file_exist) {
+        bool cache_file_exist = false;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
+                "Check local cache file exist failed.");
+        if (cache_file_exist) {
+            
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->delete_file(cache_file),
+                                           "Check local cache file exist 
failed.");
+        }
+        LOG(INFO) << "Download cache file from remote file: "
+                  << _remote_file_reader->path().native() << " -> " << 
cache_file.native();
+        std::unique_ptr<char[]> file_buf(new 
char[_remote_file_reader->size()]);
+        Slice file_slice(file_buf.get(), _remote_file_reader->size());
+        size_t bytes_read = 0;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                _remote_file_reader->read_at(0, file_slice, &bytes_read),
+                fmt::format("read remote file failed. {}", 
_remote_file_reader->path().native()));
+        if (bytes_read != _remote_file_reader->size()) {
+            LOG(ERROR) << "read remote file failed: " << 
_remote_file_reader->path().native()
+                       << ", bytes read: " << bytes_read
+                       << " vs file size: " << _remote_file_reader->size();
+            return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+        }
+        io::FileWriterPtr file_writer;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_file, 
&file_writer),
+                fmt::format("Create local cache file failed: {}", 
cache_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                file_writer->append(file_slice),
+                fmt::format("Write local cache file failed: {}", 
cache_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_done_file, 
&file_writer),
+                fmt::format("Create local done file failed: {}", 
cache_done_file.native()));
+    }
+    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, 
&_cache_file_reader));
+    _cache_file_size = _cache_file_reader->size();
+    _last_match_time = time(nullptr);
+    LOG(INFO) << "Create cache file from remote file successfully: "
+              << _remote_file_reader->path().native() << " -> " << 
cache_file.native();
+    return Status::OK();
+}
+
+Status WholeFileCache::clean_timeout_cache() {
+    if (time(nullptr) - _last_match_time > _alive_time_sec) {
+        _clean_cache_internal();
+    }
+    return Status::OK();
+}
+
+Status WholeFileCache::clean_all_cache() {
+    _clean_cache_internal();
+    return Status::OK();
+}
+
+Status WholeFileCache::_clean_cache_internal() {
+    std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+    _cache_file_reader.reset();
+    _cache_file_size = 0;
+    Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
+    Path done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
+    bool done_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(done_file, &done_file_exist),
+            "Check local done file exist failed.");
+    if (done_file_exist) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(done_file),
+                fmt::format("Delete local done file failed: {}", 
done_file.native()));
+    }
+    bool cache_file_exist = false;
+    RETURN_NOT_OK_STATUS_WITH_WARN(
+            io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
+            "Check local cache file exist failed.");
+    if (cache_file_exist) {
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->delete_file(cache_file),
+                fmt::format("Delete local cache file failed: {}", 
cache_file.native()));
+    }
+    LOG(INFO) << "Delete local cache file successfully: " << 
cache_file.native();
+    return Status::OK();
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/cache/whole_file_cache.h 
b/be/src/io/cache/whole_file_cache.h
new file mode 100644
index 0000000000..8628b6c09d
--- /dev/null
+++ b/be/src/io/cache/whole_file_cache.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cache/file_cache.h"
+#include "io/fs/path.h"
+
+namespace doris {
+namespace io {
+
+class WholeFileCache final : public FileCache {
+public:
+    WholeFileCache(const Path& cache_dir, int64_t alive_time_sec,
+                   io::FileReaderSPtr remote_file_reader);
+    ~WholeFileCache() override;
+
+    Status close() override { return _remote_file_reader->close(); }
+
+    Status read_at(size_t offset, Slice result, size_t* bytes_read) override;
+
+    const Path& path() const override { return _remote_file_reader->path(); }
+
+    size_t size() const override { return _remote_file_reader->size(); }
+
+    bool closed() const override { return _remote_file_reader->closed(); }
+
+    const Path& cache_dir() const override { return _cache_dir; }
+
+    size_t cache_file_size() const override { return _cache_file_size; }
+
+    io::FileReaderSPtr remote_file_reader() const override { return 
_remote_file_reader; }
+
+    Status clean_timeout_cache() override;
+
+    Status clean_all_cache() override;
+
+private:
+    Status _generate_cache_reader(size_t offset, size_t req_size);
+
+    Status _clean_cache_internal();
+
+private:
+    Path _cache_dir;
+    size_t _cache_file_size;
+    int64_t _alive_time_sec;
+    io::FileReaderSPtr _remote_file_reader;
+
+    std::shared_mutex _cache_lock;
+    int64_t _last_match_time;
+    io::FileReaderSPtr _cache_file_reader;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index fea0510172..a9deeed6e5 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -22,7 +22,9 @@
 #include <memory>
 #include <utility>
 
-#include "common/logging.h"                       // LOG
+#include "common/config.h"
+#include "common/logging.h" // LOG
+#include "io/cache/file_cache_manager.h"
 #include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
 #include "olap/rowset/segment_v2/empty_segment_iterator.h"
 #include "olap/rowset/segment_v2/page_io.h"
@@ -36,12 +38,22 @@
 namespace doris {
 namespace segment_v2 {
 
+using io::FileCacheManager;
+
 Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t 
segment_id,
                      TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* 
output) {
     std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema));
     io::FileReaderSPtr file_reader;
     RETURN_IF_ERROR(fs->open_file(path, &file_reader));
-    segment->_file_reader = std::move(file_reader);
+    if (config::file_cache_type.empty()) {
+        segment->_file_reader = std::move(file_reader);
+    } else {
+        std::string cache_path = path.substr(0, path.size() - 4);
+        io::FileReaderSPtr cache_reader = 
FileCacheManager::instance()->new_file_cache(
+                cache_path, config::file_cache_alive_time_sec, file_reader,
+                config::file_cache_type);
+        segment->_file_reader = std::move(cache_reader);
+    }
     RETURN_IF_ERROR(segment->_open());
     *output = std::move(segment);
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to