This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1c8b7df710fb217a8a6aea5bf926df6df6f0e14c
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Oct 25 10:07:15 2021 +0800

    [Memory Usage] Implement segment lru cache to save memory of BE (#6829)
---
 be/src/agent/task_worker_pool.cpp                  |  65 ++++++---
 be/src/agent/task_worker_pool.h                    |   3 +
 be/src/common/config.h                             |   8 +-
 be/src/olap/CMakeLists.txt                         |   1 +
 be/src/olap/lru_cache.cpp                          | 119 ++++++++++++----
 be/src/olap/lru_cache.h                            |  42 ++++--
 be/src/olap/olap_server.cpp                        |   4 +-
 be/src/olap/rowset/alpha_rowset.cpp                |  32 +----
 be/src/olap/rowset/alpha_rowset.h                  |   6 +-
 be/src/olap/rowset/alpha_rowset_reader.cpp         |   2 +-
 be/src/olap/rowset/beta_rowset.cpp                 |  14 +-
 be/src/olap/rowset/beta_rowset.h                   |   5 +-
 be/src/olap/rowset/beta_rowset_reader.cpp          |   9 +-
 be/src/olap/rowset/beta_rowset_reader.h            |   5 +
 be/src/olap/rowset/rowset.cpp                      |   4 +-
 be/src/olap/rowset/rowset.h                        |   6 +-
 be/src/olap/segment_loader.cpp                     |  94 +++++++++++++
 be/src/olap/segment_loader.h                       | 152 +++++++++++++++++++++
 be/src/olap/snapshot_manager.cpp                   |   2 +-
 be/src/olap/storage_engine.cpp                     |   6 +-
 be/src/olap/storage_engine.h                       |   2 +-
 be/src/runtime/exec_env_init.cpp                   |   3 +
 be/test/olap/lru_cache_test.cpp                    |  82 +++++++++--
 be/test/olap/rowset/beta_rowset_test.cpp           |   1 +
 be/test/olap/rowset/rowset_converter_test.cpp      |   1 +
 docs/en/administrator-guide/config/be_config.md    |  13 +-
 docs/zh-CN/administrator-guide/config/be_config.md |  15 +-
 .../load/routineload/RoutineLoadTaskScheduler.java |   6 +-
 .../main/java/org/apache/doris/task/PushTask.java  |   2 +-
 29 files changed, 565 insertions(+), 139 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index a4b9463..1fb5bb3 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -51,6 +51,7 @@
 #include "util/doris_metrics.h"
 #include "util/file_utils.h"
 #include "util/monotime.h"
+#include "util/random.h"
 #include "util/scoped_cleanup.h"
 #include "util/stopwatch.hpp"
 #include "util/threadpool.h"
@@ -1098,19 +1099,36 @@ void 
TaskWorkerPool::_check_consistency_worker_thread_callback() {
 }
 
 void TaskWorkerPool::_report_task_worker_thread_callback() {
+    StorageEngine::instance()->register_report_listener(this);
     TReportRequest request;
     request.__set_backend(_backend);
 
-    do {
+    while (_is_work) {
+        _is_doing_work = false;
+        // wait at most report_task_interval_seconds, or being notified
+        _worker_thread_condition_variable.wait_for(
+                MonoDelta::FromSeconds(config::report_task_interval_seconds));
+        if (!_is_work) {
+            break;
+        }
+
+        if (_master_info.network_address.port == 0) {
+            // port == 0 means not received heartbeat yet
+            // sleep a short time and try again
+            LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing task report";
+            continue;
+        }
+
         _is_doing_work = true;
+        // See _random_sleep() comment in 
_report_disk_state_worker_thread_callback
+        _random_sleep(5);
         {
             lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
             request.__set_tasks(_s_task_signatures);
         }
         _handle_report(request, ReportType::TASK);
-        _is_doing_work = false;
-    } while (!_stop_background_threads_latch.wait_for(
-            MonoDelta::FromSeconds(config::report_task_interval_seconds)));
+    }
+    StorageEngine::instance()->deregister_report_listener(this);
 }
 
 /// disk state report thread will report disk state at a configurable fix 
interval.
@@ -1122,14 +1140,6 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
 
     while (_is_work) {
         _is_doing_work = false;
-        if (_master_info.network_address.port == 0) {
-            // port == 0 means not received heartbeat yet
-            // sleep a short time and try again
-            LOG(INFO) << "waiting to receive first heartbeat from frontend";
-            sleep(config::sleep_one_second);
-            continue;
-        }
-
         // wait at most report_disk_state_interval_seconds, or being notified
         _worker_thread_condition_variable.wait_for(
                 
MonoDelta::FromSeconds(config::report_disk_state_interval_seconds));
@@ -1137,7 +1147,18 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
             break;
         }
 
+        if (_master_info.network_address.port == 0) {
+            // port == 0 means not received heartbeat yet
+            LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing disk report";
+            continue;
+        }
+
         _is_doing_work = true;
+        // Random sleep 1~5 seconds before doing report.
+        // In order to avoid the problem that the FE receives many report 
requests at the same time
+        // and can not be processed.
+        _random_sleep(5);
+
         std::vector<DataDirInfo> data_dir_infos;
         _env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* 
update */);
 
@@ -1168,13 +1189,6 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
 
     while (_is_work) {
         _is_doing_work = false;
-        if (_master_info.network_address.port == 0) {
-            // port == 0 means not received heartbeat yet
-            // sleep a short time and try again
-            LOG(INFO) << "waiting to receive first heartbeat from frontend";
-            sleep(config::sleep_one_second);
-            continue;
-        }
 
         // wait at most report_tablet_interval_seconds, or being notified
         _worker_thread_condition_variable.wait_for(
@@ -1183,7 +1197,15 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
             break;
         }
 
+        if (_master_info.network_address.port == 0) {
+            // port == 0 means not received heartbeat yet
+            LOG(INFO) << "waiting to receive first heartbeat from frontend 
before doing tablet report";
+            continue;
+        }
+
         _is_doing_work = true;
+        // See _random_sleep() comment in 
_report_disk_state_worker_thread_callback
+        _random_sleep(5);
         request.tablets.clear();
         uint64_t report_version = _s_report_version;
         OLAPStatus build_all_report_tablets_info_status =
@@ -1620,4 +1642,9 @@ void TaskWorkerPool::_handle_report(TReportRequest& 
request, ReportType type) {
     }
 }
 
+void TaskWorkerPool::_random_sleep(int second) {
+    Random rnd(UnixMillis());
+    sleep(rnd.Uniform(second) + 1);
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 4662a48..c451944 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -208,6 +208,9 @@ private:
     OLAPStatus _check_migrate_requset(const TStorageMediumMigrateReq& req, 
TabletSharedPtr& tablet,
                                       DataDir** dest_store);
 
+    // random sleep 1~second seconds
+    void _random_sleep(int second);
+
 private:
     std::string _name;
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 04ccb35..37e71d9 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -182,7 +182,8 @@ CONF_mInt64(column_dictionary_key_size_threshold, "0");
 // memory_limitation_per_thread_for_schema_change unit GB
 CONF_mInt32(memory_limitation_per_thread_for_schema_change, "2");
 
-CONF_mInt32(file_descriptor_cache_clean_interval, "3600");
+// the clean interval of file descriptor cache and segment cache
+CONF_mInt32(cache_clean_interval, "1800");
 CONF_mInt32(disk_stat_monitor_interval, "5");
 CONF_mInt32(unused_rowset_monitor_interval, "30");
 CONF_String(storage_root_path, "${DORIS_HOME}/storage");
@@ -633,6 +634,11 @@ CONF_mInt32(max_segment_num_per_rowset, "100");
 // The connection timeout when connecting to external table such as odbc table.
 CONF_mInt32(external_table_connect_timeout_sec, "5");
 
+// The capacity of lur cache in segment loader.
+// Althought it is called "segment cache", but it caches segments in rowset 
granularity.
+// So the value of this config should corresponding to the number of rowsets 
on this BE.
+CONF_mInt32(segment_cache_capacity, "1000000");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 7da0983..0302ff8 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -121,4 +121,5 @@ add_library(Olap STATIC
     task/engine_alter_tablet_task.cpp
     olap_snapshot_converter.cpp
     column_vector.cpp
+    segment_loader.cpp
 )
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index c860d42..0a2ce0f 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -175,7 +175,7 @@ void HandleTable::_resize() {
     _length = new_length;
 }
 
-LRUCache::LRUCache() {
+LRUCache::LRUCache(LRUCacheType type) : _type(type) {
     // Make empty circular linked list
     _lru_normal.next = &_lru_normal;
     _lru_normal.prev = &_lru_normal;
@@ -234,7 +234,7 @@ void LRUCache::release(Cache::Handle* handle) {
         MutexLock l(&_mutex);
         last_ref = _unref(e);
         if (last_ref) {
-            _usage -= e->charge;
+            _usage -= e->total_size;
         } else if (e->in_cache && e->refs == 1) {
             // only exists in cache
             if (_usage > _capacity) {
@@ -242,7 +242,7 @@ void LRUCache::release(Cache::Handle* handle) {
                 _table.remove(e);
                 e->in_cache = false;
                 _unref(e);
-                _usage -= e->charge;
+                _usage -= e->total_size;
                 last_ref = true;
             } else {
                 // put it to LRU free list
@@ -261,9 +261,9 @@ void LRUCache::release(Cache::Handle* handle) {
     }
 }
 
-void LRUCache::_evict_from_lru(size_t charge, LRUHandle** to_remove_head) {
+void LRUCache::_evict_from_lru(size_t total_size, LRUHandle** to_remove_head) {
     // 1. evict normal cache entries
-    while (_usage + charge > _capacity && _lru_normal.next != &_lru_normal) {
+    while (_usage + total_size > _capacity && _lru_normal.next != 
&_lru_normal) {
         LRUHandle* old = _lru_normal.next;
         DCHECK(old->priority == CachePriority::NORMAL);
         _evict_one_entry(old);
@@ -271,7 +271,7 @@ void LRUCache::_evict_from_lru(size_t charge, LRUHandle** 
to_remove_head) {
         *to_remove_head = old;
     }
     // 2. evict durable cache entries if need
-    while (_usage + charge > _capacity && _lru_durable.next != &_lru_durable) {
+    while (_usage + total_size > _capacity && _lru_durable.next != 
&_lru_durable) {
         LRUHandle* old = _lru_durable.next;
         DCHECK(old->priority == CachePriority::DURABLE);
         _evict_one_entry(old);
@@ -287,17 +287,19 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {
     _table.remove(e);
     e->in_cache = false;
     _unref(e);
-    _usage -= e->charge;
+    _usage -= e->total_size;
 }
 
 Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* 
value, size_t charge,
                                 void (*deleter)(const CacheKey& key, void* 
value),
                                 CachePriority priority) {
-    LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + 
key.size()));
+    size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
+    LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
     e->value = value;
     e->deleter = deleter;
     e->charge = charge;
     e->key_length = key.size();
+    e->total_size = (_type == LRUCacheType::SIZE ? handle_size + charge : 1);
     e->hash = hash;
     e->refs = 2; // one for the returned handle, one for LRUCache.
     e->next = e->prev = nullptr;
@@ -310,17 +312,17 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, 
uint32_t hash, void* value,
 
         // Free the space following strict LRU policy until enough space
         // is freed or the lru list is empty
-        _evict_from_lru(charge, &to_remove_head);
+        _evict_from_lru(e->total_size, &to_remove_head);
 
         // insert into the cache
         // note that the cache might get larger than its capacity if not enough
         // space was freed
         auto old = _table.insert(e);
-        _usage += charge;
+        _usage += e->total_size;
         if (old != nullptr) {
             old->in_cache = false;
             if (_unref(old)) {
-                _usage -= old->charge;
+                _usage -= old->total_size;
                 // old is on LRU because it's in cache and its reference count
                 // was just 1 (Unref returned 0)
                 _lru_remove(old);
@@ -350,7 +352,7 @@ void LRUCache::erase(const CacheKey& key, uint32_t hash) {
         if (e != nullptr) {
             last_ref = _unref(e);
             if (last_ref) {
-                _usage -= e->charge;
+                _usage -= e->total_size;
                 if (e->in_cache) {
                     // locate in free list
                     _lru_remove(e);
@@ -372,10 +374,10 @@ void LRUCache::_prune_one(LRUHandle* old) {
     _table.remove(old);
     old->in_cache = false;
     _unref(old);
-    _usage -= old->charge;
+    _usage -= old->total_size;
 }
 
-int LRUCache::prune() {
+int64_t LRUCache::prune() {
     LRUHandle* to_remove_head = nullptr;
     {
         MutexLock l(&_mutex);
@@ -392,7 +394,47 @@ int LRUCache::prune() {
             to_remove_head = old;
         }
     }
-    int pruned_count = 0;
+    int64_t pruned_count = 0;
+    while (to_remove_head != nullptr) {
+        ++pruned_count;
+        LRUHandle* next = to_remove_head->next;
+        to_remove_head->free();
+        to_remove_head = next;
+    }
+    return pruned_count;
+}
+
+int64_t LRUCache::prune_if(bool (*pred)(const void* value)) {
+    LRUHandle* to_remove_head = nullptr;
+    {
+        MutexLock l(&_mutex);
+        LRUHandle* p = _lru_normal.next;
+        while (p != &_lru_normal) {
+            LRUHandle* old = _lru_normal.next;
+            if (pred(old->value)) {
+                _prune_one(old);
+                old->next = to_remove_head;
+                to_remove_head = old;
+                p = _lru_normal.next;
+            } else {
+                p = p->next;
+            }
+        }
+
+        p = _lru_durable.next;
+        while (p != &_lru_durable) {
+            LRUHandle* old = _lru_durable.next;
+            if (pred(old->value)) {
+                _prune_one(old);
+                old->next = to_remove_head;
+                to_remove_head = old;
+                p = _lru_durable.next;
+            } else {
+                p = p->next;
+            }
+        }
+    }
+    int64_t pruned_count = 0;
     while (to_remove_head != nullptr) {
         ++pruned_count;
         LRUHandle* next = to_remove_head->next;
@@ -410,12 +452,14 @@ uint32_t ShardedLRUCache::_shard(uint32_t hash) {
     return hash >> (32 - kNumShardBits);
 }
 
-ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t 
total_capacity, std::shared_ptr<MemTracker> parent)
+ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t 
total_capacity,
+                                 LRUCacheType type, 
std::shared_ptr<MemTracker> parent)
         : _name(name), _last_id(1),
         _mem_tracker(MemTracker::CreateTracker(-1, name, parent, true, false, 
MemTrackerLevel::OVERVIEW)) {
     const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
     for (int s = 0; s < kNumShards; s++) {
-        _shards[s].set_capacity(per_shard);
+        _shards[s] = new LRUCache(type);
+        _shards[s]->set_capacity(per_shard);
     }
 
     _entity = DorisMetrics::instance()->metric_registry()->register_entity(
@@ -430,6 +474,9 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, 
size_t total_capacity,
 }
 
 ShardedLRUCache::~ShardedLRUCache() {
+    for (int s = 0; s < kNumShards; s++) {
+        delete _shards[s];
+    }
     _entity->deregister_hook(_name);
     DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
     _mem_tracker->Release(_mem_tracker->consumption());
@@ -439,22 +486,22 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& 
key, void* value, size_t
                                        void (*deleter)(const CacheKey& key, 
void* value),
                                        CachePriority priority) {
     const uint32_t hash = _hash_slice(key);
-    return _shards[_shard(hash)].insert(key, hash, value, charge, deleter, 
priority);
+    return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, 
priority);
 }
 
 Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
     const uint32_t hash = _hash_slice(key);
-    return _shards[_shard(hash)].lookup(key, hash);
+    return _shards[_shard(hash)]->lookup(key, hash);
 }
 
 void ShardedLRUCache::release(Handle* handle) {
     LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
-    _shards[_shard(h->hash)].release(handle);
+    _shards[_shard(h->hash)]->release(handle);
 }
 
 void ShardedLRUCache::erase(const CacheKey& key) {
     const uint32_t hash = _hash_slice(key);
-    _shards[_shard(hash)].erase(key, hash);
+    _shards[_shard(hash)]->erase(key, hash);
 }
 
 void* ShardedLRUCache::value(Handle* handle) {
@@ -470,12 +517,20 @@ uint64_t ShardedLRUCache::new_id() {
     return _last_id.fetch_add(1, std::memory_order_relaxed);
 }
 
-void ShardedLRUCache::prune() {
-    int num_prune = 0;
+int64_t ShardedLRUCache::prune() {
+    int64_t num_prune = 0;
+    for (int s = 0; s < kNumShards; s++) {
+        num_prune += _shards[s]->prune();
+    }
+    return num_prune;
+}
+
+int64_t ShardedLRUCache::prune_if(bool (*pred)(const void* value)) {
+    int64_t num_prune = 0;
     for (int s = 0; s < kNumShards; s++) {
-        num_prune += _shards[s].prune();
+        num_prune += _shards[s]->prune_if(pred);
     }
-    VLOG_DEBUG << "Successfully prune cache, clean " << num_prune << " 
entries.";
+    return num_prune;
 }
 
 void ShardedLRUCache::update_cache_metrics() const {
@@ -484,10 +539,10 @@ void ShardedLRUCache::update_cache_metrics() const {
     size_t total_lookup_count = 0;
     size_t total_hit_count = 0;
     for (int i = 0; i < kNumShards; i++) {
-        total_capacity += _shards[i].get_capacity();
-        total_usage += _shards[i].get_usage();
-        total_lookup_count += _shards[i].get_lookup_count();
-        total_hit_count += _shards[i].get_hit_count();
+        total_capacity += _shards[i]->get_capacity();
+        total_usage += _shards[i]->get_usage();
+        total_lookup_count += _shards[i]->get_lookup_count();
+        total_hit_count += _shards[i]->get_hit_count();
     }
 
     capacity->set_value(total_capacity);
@@ -501,7 +556,11 @@ void ShardedLRUCache::update_cache_metrics() const {
 }
 
 Cache* new_lru_cache(const std::string& name, size_t capacity, 
std::shared_ptr<MemTracker> parent_tracker) {
-    return new ShardedLRUCache(name, capacity, parent_tracker);
+    return new ShardedLRUCache(name, capacity, LRUCacheType::SIZE, 
parent_tracker);
+}
+
+Cache* new_typed_lru_cache(const std::string& name, size_t capacity, 
LRUCacheType type, std::shared_ptr<MemTracker> parent_tracker) {
+    return new ShardedLRUCache(name, capacity, type, parent_tracker);
 }
 
 } // namespace doris
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 99bb7ef..94e262d 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -48,9 +48,18 @@ namespace doris {
 class Cache;
 class CacheKey;
 
-// Create a new cache with a specified name and a fixed size capacity.  This 
implementation
-// of Cache uses a least-recently-used eviction policy.
-extern Cache* new_lru_cache(const std::string& name, size_t capacity, 
std::shared_ptr<MemTracker> parent_tracekr = nullptr);
+enum LRUCacheType {
+    SIZE, // The capacity of cache is based on the size of cache entry.
+    NUMBER  // The capacity of cache is based on the number of cache entry.
+};
+
+// Create a new cache with a specified name and a fixed SIZE capacity.
+// This implementation of Cache uses a least-recently-used eviction policy.
+extern Cache* new_lru_cache(const std::string& name, size_t capacity,
+        std::shared_ptr<MemTracker> parent_tracekr = nullptr);
+
+extern Cache* new_typed_lru_cache(const std::string& name, size_t capacity, 
LRUCacheType type,
+        std::shared_ptr<MemTracker> parent_tracekr = nullptr);
 
 class CacheKey {
 public:
@@ -201,7 +210,13 @@ public:
     // Default implementation of Prune() does nothing.  Subclasses are strongly
     // encouraged to override the default implementation.  A future release of
     // leveldb may change prune() to a pure abstract method.
-    virtual void prune() {}
+    // return num of entries being pruned.
+    virtual int64_t prune() { return 0; }
+
+    // Same as prune(), but the entry will only be pruned if the predicate 
matched.
+    // NOTICE: the predicate should be simple enough, or the prune_if() 
function
+    // may hold lock for a long time to execute predicate.
+    virtual int64_t prune_if(bool (*pred)(const void* value)) { return 0; }
 
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
@@ -218,6 +233,7 @@ typedef struct LRUHandle {
     LRUHandle* prev = nullptr;      // previous entry in lru list
     size_t charge;
     size_t key_length;
+    size_t total_size; // including key length
     bool in_cache; // Whether entry is in the cache.
     uint32_t refs;
     uint32_t hash; // Hash of key(); used for fast sharding and comparisons
@@ -287,7 +303,7 @@ private:
 // A single shard of sharded cache.
 class LRUCache {
 public:
-    LRUCache();
+    LRUCache(LRUCacheType type);
     ~LRUCache();
 
     // Separate from constructor so caller can easily make an array of LRUCache
@@ -300,7 +316,8 @@ public:
     Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
     void release(Cache::Handle* handle);
     void erase(const CacheKey& key, uint32_t hash);
-    int prune();
+    int64_t prune();
+    int64_t prune_if(bool (*pred)(const void* value));
 
     uint64_t get_lookup_count() const { return _lookup_count; }
     uint64_t get_hit_count() const { return _hit_count; }
@@ -311,10 +328,13 @@ private:
     void _lru_remove(LRUHandle* e);
     void _lru_append(LRUHandle* list, LRUHandle* e);
     bool _unref(LRUHandle* e);
-    void _evict_from_lru(size_t charge, LRUHandle** to_remove_head);
+    void _evict_from_lru(size_t total_size, LRUHandle** to_remove_head);
     void _evict_one_entry(LRUHandle* e);
     void _prune_one(LRUHandle* old);
 
+private:
+    LRUCacheType _type;
+
     // Initialized before use.
     size_t _capacity = 0;
 
@@ -340,7 +360,8 @@ static const int kNumShards = 1 << kNumShardBits;
 
 class ShardedLRUCache : public Cache {
 public:
-    explicit ShardedLRUCache(const std::string& name, size_t total_capacity, 
std::shared_ptr<MemTracker> parent);
+    explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
+                             LRUCacheType type, std::shared_ptr<MemTracker> 
parent);
     // TODO(fdy): 析构时清除所有cache元素
     virtual ~ShardedLRUCache();
     virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
@@ -352,7 +373,8 @@ public:
     virtual void* value(Handle* handle);
     Slice value_slice(Handle* handle) override;
     virtual uint64_t new_id();
-    virtual void prune();
+    virtual int64_t prune();
+    virtual int64_t prune_if(bool (*pred)(const void* value));
 
 private:
     void update_cache_metrics() const;
@@ -361,7 +383,7 @@ private:
     static uint32_t _shard(uint32_t hash);
 
     std::string _name;
-    LRUCache _shards[kNumShards];
+    LRUCache* _shards[kNumShards];
     std::atomic<uint64_t> _last_id;
 
     std::shared_ptr<MemTracker> _mem_tracker;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 851e807..a5a18e2 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -127,7 +127,7 @@ void StorageEngine::_fd_cache_clean_callback() {
 #endif
     int32_t interval = 600;
     while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval))) {
-        interval = config::file_descriptor_cache_clean_interval;
+        interval = config::cache_clean_interval;
         if (interval <= 0) {
             OLAP_LOG_WARNING(
                     "config of file descriptor clean interval is illegal: 
[%d], "
@@ -136,7 +136,7 @@ void StorageEngine::_fd_cache_clean_callback() {
             interval = 3600;
         }
 
-        _start_clean_fd_cache();
+        _start_clean_cache();
     }
 }
 
diff --git a/be/src/olap/rowset/alpha_rowset.cpp 
b/be/src/olap/rowset/alpha_rowset.cpp
index ae8d339..c6bec67 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -30,7 +30,7 @@ AlphaRowset::AlphaRowset(const TabletSchema* schema, 
std::string rowset_path,
                          RowsetMetaSharedPtr rowset_meta)
         : Rowset(schema, std::move(rowset_path), std::move(rowset_meta)) {}
 
-OLAPStatus AlphaRowset::do_load(bool use_cache, std::shared_ptr<MemTracker>) {
+OLAPStatus AlphaRowset::do_load(bool use_cache) {
     for (auto& segment_group : _segment_groups) {
         // validate segment group
         if (segment_group->validate() != OLAP_SUCCESS) {
@@ -407,34 +407,4 @@ std::shared_ptr<SegmentGroup> 
AlphaRowset::_segment_group_with_largest_size() {
     return largest_segment_group;
 }
 
-OLAPStatus AlphaRowset::reset_sizeinfo() {
-    RETURN_NOT_OK(load());
-    std::vector<SegmentGroupPB> segment_group_metas;
-    AlphaRowsetMetaSharedPtr alpha_rowset_meta =
-            std::dynamic_pointer_cast<AlphaRowsetMeta>(_rowset_meta);
-    alpha_rowset_meta->get_segment_groups(&segment_group_metas);
-    int32_t segment_group_idx = 0;
-    int64_t data_disk_size = 0;
-    int64_t index_disk_size = 0;
-    int64_t num_rows = 0;
-    for (auto segment_group : _segment_groups) {
-        
segment_group_metas.at(segment_group_idx).set_data_size(segment_group->data_size());
-        
segment_group_metas.at(segment_group_idx).set_index_size(segment_group->index_size());
-        
segment_group_metas.at(segment_group_idx).set_num_rows(segment_group->num_rows());
-        data_disk_size += segment_group->data_size();
-        index_disk_size += segment_group->index_size();
-        num_rows += segment_group->num_rows();
-        ++segment_group_idx;
-    }
-    alpha_rowset_meta->clear_segment_group();
-    alpha_rowset_meta->set_num_rows(num_rows);
-    alpha_rowset_meta->set_data_disk_size(data_disk_size);
-    alpha_rowset_meta->set_index_disk_size(index_disk_size);
-    alpha_rowset_meta->set_total_disk_size(data_disk_size + index_disk_size);
-    for (auto& segment_group_meta : segment_group_metas) {
-        alpha_rowset_meta->add_segment_group(segment_group_meta);
-    }
-    return OLAP_SUCCESS;
-}
-
 } // namespace doris
diff --git a/be/src/olap/rowset/alpha_rowset.h 
b/be/src/olap/rowset/alpha_rowset.h
index 0a2186e..0fc3f71 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -67,10 +67,6 @@ public:
 
     bool check_file_exist() override;
 
-    // when convert from old be, should set row num, index size, data size
-    // info by using segment's info
-    OLAPStatus reset_sizeinfo();
-
 protected:
     friend class RowsetFactory;
 
@@ -80,7 +76,7 @@ protected:
     // init segment groups
     OLAPStatus init() override;
 
-    OLAPStatus do_load(bool use_cache, std::shared_ptr<MemTracker>) override;
+    OLAPStatus do_load(bool use_cache) override;
 
     void do_close() override {}
 
diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp 
b/be/src/olap/rowset/alpha_rowset_reader.cpp
index 9a72dae..8534ad7 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.cpp
+++ b/be/src/olap/rowset/alpha_rowset_reader.cpp
@@ -31,7 +31,7 @@ AlphaRowsetReader::AlphaRowsetReader(int 
num_rows_per_row_block, AlphaRowsetShar
                   
std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()),
           _segment_groups(_rowset->_segment_groups),
           _key_range_size(0) {
-    _rowset->aquire();
+    _rowset->acquire();
 }
 
 AlphaRowsetReader::~AlphaRowsetReader() {
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 1b26a37..8e228b4 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -44,9 +44,13 @@ OLAPStatus BetaRowset::init() {
     return OLAP_SUCCESS; // no op
 }
 
-// `use_cache` is ignored because beta rowset doesn't support fd cache now
-OLAPStatus BetaRowset::do_load(bool /*use_cache*/, std::shared_ptr<MemTracker> 
parent) {
-    // Open all segments under the current rowset
+OLAPStatus BetaRowset::do_load(bool /*use_cache*/) {
+    // do nothing.
+    // the segments in this rowset will be loaded by calling load_segments() 
explicitly.
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus 
BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
     for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
         std::string seg_path = segment_file_path(_rowset_path, rowset_id(), 
seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
@@ -56,7 +60,7 @@ OLAPStatus BetaRowset::do_load(bool /*use_cache*/, 
std::shared_ptr<MemTracker> p
                          << " : " << s.to_string();
             return OLAP_ERR_ROWSET_LOAD_FAILED;
         }
-        _segments.push_back(std::move(segment));
+        segments->push_back(std::move(segment));
     }
     return OLAP_SUCCESS;
 }
@@ -108,7 +112,7 @@ OLAPStatus BetaRowset::remove() {
 }
 
 void BetaRowset::do_close() {
-    _segments.clear();
+    // do nothing.
 }
 
 OLAPStatus BetaRowset::link_files_to(const std::string& dir, RowsetId 
new_rowset_id) {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 7300e7f..9db69bf 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -64,6 +64,8 @@ public:
 
     bool check_file_exist() override;
 
+    OLAPStatus load_segments(std::vector<segment_v2::SegmentSharedPtr>* 
segments);
+
 protected:
     BetaRowset(const TabletSchema* schema, std::string rowset_path,
                RowsetMetaSharedPtr rowset_meta);
@@ -71,14 +73,13 @@ protected:
     // init segment groups
     OLAPStatus init() override;
 
-    OLAPStatus do_load(bool use_cache, std::shared_ptr<MemTracker> parent) 
override;
+    OLAPStatus do_load(bool use_cache) override;
 
     void do_close() override;
 
 private:
     friend class RowsetFactory;
     friend class BetaRowsetReader;
-    std::vector<segment_v2::SegmentSharedPtr> _segments;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index f50eb87..1c5e511 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -35,7 +35,7 @@ BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset,
           _rowset(std::move(rowset)),
           _stats(&_owned_stats),
           _parent_tracker(std::move(parent_tracker)) {
-    _rowset->aquire();
+    _rowset->acquire();
 }
 
 OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
@@ -44,7 +44,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
         _parent_tracker = read_context->runtime_state->instance_mem_tracker();
     }
 
-    RETURN_NOT_OK(_rowset->load(true, _parent_tracker));
+    RETURN_NOT_OK(_rowset->load());
     _context = read_context;
     if (_context->stats != nullptr) {
         // schema change/compaction should use owned_stats
@@ -92,9 +92,12 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     }
     read_options.use_page_cache = read_context->use_page_cache;
 
+    // load segments
+    RETURN_NOT_OK(SegmentLoader::instance()->load_segments(_rowset, 
&_segment_cache_handle));
+
     // create iterator for each segment
     std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
-    for (auto& seg_ptr : _rowset->_segments) {
+    for (auto& seg_ptr : _segment_cache_handle.value()->segments) {
         std::unique_ptr<RowwiseIterator> iter;
         auto s = seg_ptr->new_iterator(schema, read_options, _parent_tracker, 
&iter);
         if (!s.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index f073a65..34e7d9c 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -24,6 +24,7 @@
 #include "olap/row_cursor.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_reader.h"
+#include "olap/segment_loader.h"
 
 namespace doris {
 
@@ -69,6 +70,10 @@ private:
     std::unique_ptr<RowBlockV2> _input_block;
     std::unique_ptr<RowBlock> _output_block;
     std::unique_ptr<RowCursor> _row;
+
+    // make sure this handle is initialized and valid before
+    // reading data.
+    SegmentCacheHandle _segment_cache_handle;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index 5e1237d..385657b 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -36,7 +36,7 @@ Rowset::Rowset(const TabletSchema* schema, std::string 
rowset_path, RowsetMetaSh
     }
 }
 
-OLAPStatus Rowset::load(bool use_cache, std::shared_ptr<MemTracker> parent) {
+OLAPStatus Rowset::load(bool use_cache) {
     // if the state is ROWSET_UNLOADING it means close() is called
     // and the rowset is already loaded, and the resource is not closed yet.
     if (_rowset_state_machine.rowset_state() == ROWSET_LOADED) {
@@ -48,7 +48,7 @@ OLAPStatus Rowset::load(bool use_cache, 
std::shared_ptr<MemTracker> parent) {
         // after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
         if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
             // first do load, then change the state
-            RETURN_NOT_OK(do_load(use_cache, parent));
+            RETURN_NOT_OK(do_load(use_cache));
             RETURN_NOT_OK(_rowset_state_machine.on_load());
         }
     }
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 0f99efc..19bdf7b 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -113,7 +113,7 @@ public:
     //
     // May be called multiple times, subsequent calls will no-op.
     // Derived class implements the load logic by overriding the 
`do_load_once()` method.
-    OLAPStatus load(bool use_cache = true, std::shared_ptr<MemTracker> parent 
= nullptr);
+    OLAPStatus load(bool use_cache = true);
 
     // returns OLAP_ERR_ROWSET_CREATE_READER when failed to create reader
     virtual OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) = 
0;
@@ -224,7 +224,7 @@ public:
     }
 
     // this function is called by reader to increase reference of rowset
-    void aquire() { ++_refs_by_reader; }
+    void acquire() { ++_refs_by_reader; }
 
     void release() {
         // if the refs by reader is 0 and the rowset is closed, should release 
the resouce
@@ -260,7 +260,7 @@ protected:
     virtual OLAPStatus init() = 0;
 
     // The actual implementation of load(). Guaranteed by to called exactly 
once.
-    virtual OLAPStatus do_load(bool use_cache, std::shared_ptr<MemTracker> 
parent = nullptr) = 0;
+    virtual OLAPStatus do_load(bool use_cache) = 0;
 
     // release resources in this api
     virtual void do_close() = 0;
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
new file mode 100644
index 0000000..3a50dc9
--- /dev/null
+++ b/be/src/olap/segment_loader.cpp
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/segment_loader.h"
+
+#include "olap/rowset/rowset.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+
+SegmentLoader* SegmentLoader::_s_instance = nullptr;
+
+void SegmentLoader::create_global_instance(size_t capacity) {
+    DCHECK(_s_instance == nullptr);
+    static SegmentLoader instance(capacity);
+    _s_instance = &instance;
+}
+
+SegmentLoader::SegmentLoader(size_t capacity)
+    : _mem_tracker(MemTracker::CreateTracker(capacity, "SegmentLoader", 
nullptr, true, true, MemTrackerLevel::OVERVIEW)) {
+        _cache = std::unique_ptr<Cache>(new_typed_lru_cache("SegmentCache", 
capacity, LRUCacheType::NUMBER, _mem_tracker));
+}
+
+bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, 
SegmentCacheHandle* handle) {
+    auto lru_handle = _cache->lookup(key.encode());
+    if (lru_handle == nullptr) {
+        return false;
+    }
+    *handle = SegmentCacheHandle(_cache.get(), lru_handle);
+    return true;
+}
+
+void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, 
SegmentLoader::CacheValue& value, SegmentCacheHandle* handle) {
+    auto deleter = [](const doris::CacheKey& key, void* value) {
+        SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*) 
value;
+        cache_value->segments.clear();
+        delete cache_value;
+    };
+
+    auto lru_handle = _cache->insert(key.encode(), &value, 
sizeof(SegmentLoader::CacheValue), deleter, CachePriority::NORMAL);
+    *handle = SegmentCacheHandle(_cache.get(), lru_handle);
+}
+
+OLAPStatus SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
+                                       SegmentCacheHandle* cache_handle) {
+    SegmentCacheHandle handle;
+    SegmentLoader::CacheKey cache_key(rowset->rowset_id());
+    if (_lookup(cache_key, &handle)) {
+        *cache_handle = std::move(handle);
+        return OLAP_SUCCESS;
+    }
+
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    RETURN_NOT_OK(rowset->load_segments(&segments));
+
+    // memory of SegmentLoader::CacheValue will be handled by SegmentLoader
+    SegmentLoader::CacheValue* cache_value = new SegmentLoader::CacheValue();
+    cache_value->segments = std::move(segments);
+    _insert(cache_key, *cache_value, &handle);
+    *cache_handle = std::move(handle);
+    
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus SegmentLoader::prune() {
+    bool (*pred)(const void* value) = [](const void* value) -> bool {
+        int64_t curtime = UnixMillis();
+        SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*) 
value;
+        return curtime - cache_value->last_visit_time > 
config::tablet_rowset_stale_sweep_time_sec * 1000;
+    };
+
+    MonotonicStopWatch watch;
+    watch.start();
+    int64_t prune_num = _cache->prune_if(pred);
+    LOG(INFO) << "prune " << prune_num << " entries in segment cache. 
cost(ms): "
+            << watch.elapsed_time() / 1000 / 1000;
+    return OLAP_SUCCESS;
+}
+
+} // namespace doris
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
new file mode 100644
index 0000000..8a2bba4
--- /dev/null
+++ b/be/src/olap/segment_loader.h
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN
+#include "olap/lru_cache.h"
+#include "olap/olap_common.h" // for rowset id
+#include "olap/rowset/beta_rowset.h"
+#include "runtime/mem_tracker.h"
+#include "util/time.h"
+
+namespace doris {
+
+class SegmentCacheHandle;
+
+// SegmentLoader is used to load the Segment of BetaRowset.
+// An LRUCache is encapsulated inside it, which is used to cache the opened 
segments.
+// The caller should use the following method to load and obtain
+// the segments of a specified rowset:
+//
+//  SegmentCacheHandle cache_handle;
+//  RETURN_NOT_OK(SegmentCache::instance()->load_segments(_rowset, 
&cache_handle));
+//  for (auto& seg_ptr : cache_handle.value()->segments) {
+//      ... visit segment ...
+//  }
+//
+// Make sure that cache_handle is valid during the segment usage period.
+using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
+class SegmentLoader {
+public:
+
+    // The cache key or segment lru cache
+    struct CacheKey {
+        CacheKey(RowsetId rowset_id_) : rowset_id(rowset_id_) {}
+        RowsetId rowset_id;
+
+        // Encode to a flat binary which can be used as LRUCache's key
+        std::string encode() const {
+            return rowset_id.to_string();
+        }
+    };
+
+    // The cache value of segment lru cache.
+    // Holding all opened segments of a rowset.
+    struct CacheValue {
+        // Save the last visit time of this cache entry.
+        // Use atomic because it may be modified by multi threads.
+        std::atomic<int64_t> last_visit_time = 0;
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+    };
+
+    // Create global instance of this class.
+    // "capacity" is the capacity of lru cache.
+    // TODO: Currently we use the number of rowset as the cache capacity.
+    // That is, the limit of cache is the number of rowset.
+    // This is because currently we cannot accurately estimate the memory 
occupied by a segment.
+    // After the estimation of segment memory usage is provided later, it is 
recommended
+    // to use Memory as the capacity limit of the cache.
+    static void create_global_instance(size_t capacity);
+
+    // Return global instance.
+    // Client should call create_global_cache before.
+    static SegmentLoader* instance() { return _s_instance; }
+
+    SegmentLoader(size_t capacity);
+
+    // Load segments of "rowset" from _cache, return the "cache_handle" which 
contains segments
+    OLAPStatus load_segments(const BetaRowsetSharedPtr& rowset, 
SegmentCacheHandle* cache_handle);
+
+    // Try to prune the segment cache if expired.
+    OLAPStatus prune();
+
+private:
+    SegmentLoader();
+
+    // Lookup the given rowset in the cache.
+    // If the rowset is found, the cache entry will be written into handle.
+    // Return true if entry is found, otherwise return false.
+    bool _lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* 
handle);
+
+    // Insert a cache entry by key.
+    // And the cache entry will be returned in handle.
+    // This function is thread-safe.
+    void _insert(const SegmentLoader::CacheKey& key, CacheValue& value, 
SegmentCacheHandle* handle);
+
+private:
+    static SegmentLoader* _s_instance;
+    // A LRU cache to cache all opened segments
+    std::unique_ptr<Cache> _cache = nullptr;
+    std::shared_ptr<MemTracker> _mem_tracker = nullptr;
+};
+
+// A handle for a single rowset from segment lru cache.
+// The handle can ensure that the segment is valid
+// and will not be closed while the holder of the handle is accessing the 
segment.
+// The handle will automatically release the cache entry when it is destroyed.
+// So the caller need to make sure the handle is valid in lifecycle.
+class SegmentCacheHandle {
+public:
+    SegmentCacheHandle() {}
+    SegmentCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), 
_handle(handle) {}
+
+    ~SegmentCacheHandle() {
+        if (_handle != nullptr) {
+            // last_visit_time is set when release.
+            // because it only be needed when pruning.
+            value()->last_visit_time = UnixMillis();
+            _cache->release(_handle);
+        }
+    }
+
+    SegmentCacheHandle(SegmentCacheHandle&& other) noexcept {
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+    }
+
+    SegmentCacheHandle& operator=(SegmentCacheHandle&& other) noexcept {
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+        return *this;
+    }
+
+    SegmentLoader::CacheValue* value() { return (SegmentLoader::CacheValue*) 
_cache->value(_handle); }
+
+private:
+    Cache* _cache = nullptr;
+    Cache::Handle* _handle = nullptr;
+
+    // Don't allow copy and assign
+    DISALLOW_COPY_AND_ASSIGN(SegmentCacheHandle);
+};
+
+} // namespace doris
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 812725f..9db0fa7 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -241,7 +241,7 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const 
RowsetMetaPB& rs_meta_pb,
         LOG(WARNING) << "failed to build rowset when rename rowset id";
         return OLAP_ERR_MALLOC_ERROR;
     }
-    RETURN_NOT_OK(new_rowset->load());
+    RETURN_NOT_OK(new_rowset->load(false));
     new_rowset->rowset_meta()->to_rowset_pb(new_rs_meta_pb);
     org_rowset->remove();
     return OLAP_SUCCESS;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 4983c78..8e6aab6 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -51,6 +51,7 @@
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/rowset/unique_rowset_id_generator.h"
 #include "olap/schema_change.h"
+#include "olap/segment_loader.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/utils.h"
@@ -629,10 +630,9 @@ void StorageEngine::clear_transaction_task(const 
TTransactionId transaction_id,
     LOG(INFO) << "finish to clear transaction task. transaction_id=" << 
transaction_id;
 }
 
-void StorageEngine::_start_clean_fd_cache() {
-    VLOG_TRACE << "start clean file descritpor cache";
+void StorageEngine::_start_clean_cache() {
     _file_cache->prune();
-    VLOG_TRACE << "end clean file descritpor cache";
+    SegmentLoader::instance()->prune();
 }
 
 OLAPStatus StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index cfe7e87..55d56a9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -240,7 +240,7 @@ private:
     // parse the default rowset type config to RowsetTypePB
     void _parse_default_rowset_type();
 
-    void _start_clean_fd_cache();
+    void _start_clean_cache();
 
     // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位,
     // 当检测到有unused标识时,从内存中删除对应表信息,磁盘数据不动。
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 69d70cd..e7947fa 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -26,6 +26,7 @@
 #include "gen_cpp/TExtDataSourceService.h"
 #include "gen_cpp/TPaloBrokerService.h"
 #include "olap/page_cache.h"
+#include "olap/segment_loader.h"
 #include "olap/storage_engine.h"
 #include "plugin/plugin_mgr.h"
 #include "runtime/broker_mgr.h"
@@ -229,6 +230,8 @@ Status ExecEnv::_init_mem_tracker() {
     LOG(INFO) << "Storage page cache memory limit: " << 
PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
             << ", origin config value: " << config::storage_page_cache_limit;
 
+    SegmentLoader::create_global_instance(config::segment_cache_capacity);
+
     // 4. init other managers
     RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker));
     RETURN_IF_ERROR(_tmp_file_mgr->init());
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index b2b62ac..1456365 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -68,7 +68,10 @@ public:
         _s_current->_deleted_values.push_back(DecodeValue(v));
     }
 
-    static const int kCacheSize = 1000;
+    // there is 16 shards in ShardedLRUCache
+    // And the LRUHandle size is about 100B. So the cache size should big 
enough
+    // to run the UT.
+    static const int kCacheSize = 1000 * 16;
     std::vector<int> _deleted_keys;
     std::vector<int> _deleted_values;
     Cache* _cache;
@@ -92,7 +95,7 @@ public:
     void Insert(int key, int value, int charge) {
         std::string result;
         _cache->release(_cache->insert(EncodeKey(&result, key), 
EncodeValue(value), charge,
-                                       &CacheTest::Deleter));
+                    &CacheTest::Deleter));
     }
 
     void InsertDurable(int key, int value, int charge) {
@@ -158,6 +161,9 @@ TEST_F(CacheTest, EntriesArePinned) {
     Insert(100, 101, 1);
     std::string result1;
     Cache::Handle* h1 = _cache->lookup(EncodeKey(&result1, 100));
+    if (h1 == nullptr) {
+        std::cout << "h1 is null" << std::endl;
+    }
     ASSERT_EQ(101, DecodeValue(_cache->value(h1)));
 
     Insert(100, 102, 1);
@@ -224,36 +230,88 @@ static void insert_LRUCache(LRUCache& cache, const 
CacheKey& key, int value,
 }
 
 TEST_F(CacheTest, Usage) {
-    LRUCache cache;
-    cache.set_capacity(1000);
+    LRUCache cache(LRUCacheType::SIZE);
+    cache.set_capacity(1050);
 
+    // The lru usage is handle_size + charge = 96 - 1 = 95
+    // 95 + 3 means handle_size + key size
     CacheKey key1("100");
     insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
-    ASSERT_EQ(100, cache.get_usage());
+    ASSERT_EQ(198, cache.get_usage()); // 100 + 95 + 3
 
     CacheKey key2("200");
     insert_LRUCache(cache, key2, 200, CachePriority::DURABLE);
-    ASSERT_EQ(300, cache.get_usage());
+    ASSERT_EQ(496, cache.get_usage()); // 198 + 200 + 95 + 3
 
     CacheKey key3("300");
     insert_LRUCache(cache, key3, 300, CachePriority::NORMAL);
-    ASSERT_EQ(600, cache.get_usage());
+    ASSERT_EQ(894, cache.get_usage()); // 496 + 300 + 95 + 3
 
     CacheKey key4("400");
     insert_LRUCache(cache, key4, 400, CachePriority::NORMAL);
-    ASSERT_EQ(1000, cache.get_usage());
+    ASSERT_EQ(796, cache.get_usage()); // 894 + 400 + 95 + 3 - (300 + 100 + 
(95 + 3) * 2)
 
     CacheKey key5("500");
     insert_LRUCache(cache, key5, 500, CachePriority::NORMAL);
-    ASSERT_EQ(700, cache.get_usage());
+    ASSERT_EQ(896, cache.get_usage()); // 796 + 500 + 95 + 3 - (400 + 95 +3)
 
     CacheKey key6("600");
     insert_LRUCache(cache, key6, 600, CachePriority::NORMAL);
-    ASSERT_EQ(800, cache.get_usage());
+    ASSERT_EQ(996, cache.get_usage()); // 896 + 600 + 95 +3 - (500 + 95 + 3)
 
     CacheKey key7("950");
     insert_LRUCache(cache, key7, 950, CachePriority::DURABLE);
-    ASSERT_EQ(950, cache.get_usage());
+    ASSERT_EQ(1048, cache.get_usage()); // 996 + 950 + 95 +3 - (200 + 600 + 
(95 + 3) * 2)
+}
+
+TEST_F(CacheTest, Prune) {
+    LRUCache cache(LRUCacheType::NUMBER);
+    cache.set_capacity(5);
+
+    // The lru usage is handle_size + charge = 96 - 1 = 95
+    // 95 + 3 means handle_size + key size
+    CacheKey key1("100");
+    insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
+    ASSERT_EQ(1, cache.get_usage());
+
+    CacheKey key2("200");
+    insert_LRUCache(cache, key2, 200, CachePriority::DURABLE);
+    ASSERT_EQ(2, cache.get_usage());
+
+    CacheKey key3("300");
+    insert_LRUCache(cache, key3, 300, CachePriority::NORMAL);
+    ASSERT_EQ(3, cache.get_usage());
+
+    CacheKey key4("400");
+    insert_LRUCache(cache, key4, 400, CachePriority::NORMAL);
+    ASSERT_EQ(4, cache.get_usage());
+
+    CacheKey key5("500");
+    insert_LRUCache(cache, key5, 500, CachePriority::NORMAL);
+    ASSERT_EQ(5, cache.get_usage());
+
+    CacheKey key6("600");
+    insert_LRUCache(cache, key6, 600, CachePriority::NORMAL);
+    ASSERT_EQ(5, cache.get_usage());
+
+    CacheKey key7("700");
+    insert_LRUCache(cache, key7, 700, CachePriority::DURABLE);
+    ASSERT_EQ(5, cache.get_usage());
+
+    auto pred = [](const void* value) -> bool {
+        return false;
+    };
+    cache.prune_if(pred);
+    ASSERT_EQ(5, cache.get_usage());
+
+    auto pred2 = [](const void* value) -> bool {
+        return true;
+    };
+    cache.prune_if(pred2);
+    ASSERT_EQ(0, cache.get_usage());
+
+    cache.prune();
+    ASSERT_EQ(0, cache.get_usage());
 }
 
 TEST_F(CacheTest, HeavyEntries) {
@@ -319,6 +377,7 @@ TEST(CacheHandleTest, HandleTableTest) {
         h->value = nullptr;
         h->deleter = nullptr;
         h->charge = 1;
+        h->total_size = sizeof(LRUHandle) - 1 + key->size() + 1;
         h->key_length = key->size();
         h->hash = 1; // make them in a same hash table linked-list
         h->refs = 0;
@@ -355,6 +414,7 @@ TEST(CacheHandleTest, HandleTableTest) {
         h->value = nullptr;
         h->deleter = nullptr;
         h->charge = 1;
+        h->total_size = sizeof(LRUHandle) - 1 + key->size() + 1;
         h->key_length = key->size();
         h->hash = 1; // make them in a same hash table linked-list
         h->refs = 0;
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp 
b/be/test/olap/rowset/beta_rowset_test.cpp
index 6191e6f..54973c7 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -354,6 +354,7 @@ TEST_F(BetaRowsetTest, BasicFunctionTest) {
 
 int main(int argc, char** argv) {
     doris::StoragePageCache::create_global_cache(1 << 30, 0.1);
+    doris::SegmentLoader::create_global_instance(1000);
     ::testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();
 }
diff --git a/be/test/olap/rowset/rowset_converter_test.cpp 
b/be/test/olap/rowset/rowset_converter_test.cpp
index fece436..bb3f193 100644
--- a/be/test/olap/rowset/rowset_converter_test.cpp
+++ b/be/test/olap/rowset/rowset_converter_test.cpp
@@ -298,6 +298,7 @@ TEST_F(RowsetConverterTest, TestConvertBetaRowsetToAlpha) {
 
 int main(int argc, char** argv) {
     doris::StoragePageCache::create_global_cache(1 << 30, 0.1);
+    doris::SegmentLoader::create_global_instance(1000);
     ::testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();
 }
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index 359be42..4672101 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -582,11 +582,12 @@ Default:32768
 
 File handle cache capacity, 32768 file handles are cached by default.
 
-### `file_descriptor_cache_clean_interval`
+### `cache_clean_interval`
 
-Default:3600(s)
+Default:1800(s)
 
 File handle cache cleaning interval, used to clean up file handles that have 
not been used for a long time.
+Also the clean interval of Segment Cache.
 
 ### `flush_thread_num_per_store`
 
@@ -1447,3 +1448,11 @@ Increasing this value can reduce the number of calls to 
read remote data, but it
 * Type: int32
 * Description: The timeout when establishing connection with external table 
such as ODBC table.
 * Default value: 5 seconds
+
+### `segment_cache_capacity`
+
+* Type: int32
+* Description: The maximum number of Segments cached by Segment Cache.
+* Default value: 1000000
+
+The default value is currently only an empirical value, and may need to be 
modified according to actual scenarios. Increasing this value can cache more 
segments and avoid some IO. Decreasing this value will reduce memory usage.
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 31b9bb0..fc4d893 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -580,11 +580,12 @@ ETL线程池的大小
 
 文件句柄缓存的容量,默认缓存32768个文件句柄
 
-### `file_descriptor_cache_clean_interval`
+### `cache_clean_interval`
 
-默认值:3600 (s)
+默认值:1800 (s)
 
-文件句柄缓存清理的间隔,用于清理长期不用的文件句柄
+文件句柄缓存清理的间隔,用于清理长期不用的文件句柄。
+同时也是Segment Cache的清理间隔时间。
 
 ### `flush_thread_num_per_store`
 
@@ -1468,3 +1469,11 @@ webserver默认工作线程数
 * 类型: int32
 * 描述: 和外部表建立连接的超时时间。
 * 默认值: 5秒
+
+### `segment_cache_capacity`
+
+* 类型: int32
+* 描述: Segment Cache 缓存的 Segment 最大数量
+* 默认值: 1000000
+
+默认值目前只是一个经验值,可能需要根据实际场景修改。增大该值可以缓存更多的segment从而避免一些IO。减少该值则会降低内存使用。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index b83fa13..f2f03a6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -151,7 +151,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
                     new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR, 
"failed to allocate task: " + e.getMessage()), false);
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, 
routineLoadTaskInfo.getId()).add("error_msg",
-                    "allocate task encounter exception: " + 
e.getMessage()).build());
+                    "allocate task encounter exception: " + 
e.getMessage()).build(), e);
             throw e;
         }
 
@@ -170,9 +170,9 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
             routineLoadTaskInfo.setBeId(-1);
             
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
                     new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
-                            "failed to allocate task for txn: " + 
e.getMessage()), false);
+                            "failed to begin txn: " + e.getMessage()), false);
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, 
routineLoadTaskInfo.getId()).add("error_msg",
-                    "begin task txn encounter exception: " + 
e.getMessage()).build());
+                    "begin task txn encounter exception: " + 
e.getMessage()).build(), e);
             throw e;
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index e78c515..cc2cb6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -195,7 +195,7 @@ public class PushTask extends AgentTask {
     public void countDownLatch(long backendId, long tabletId) {
         if (this.latch != null) {
             if (latch.markedCountDown(backendId, tabletId)) {
-                LOG.info("pushTask current latch count: {}. backend: {}, 
tablet:{}",
+                LOG.debug("pushTask current latch count: {}. backend: {}, 
tablet:{}",
                          latch.getCount(), backendId, tabletId);
             }
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to