This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b9185830 [feature](cooldown) Implement cold data compaction (#16681)
f1b9185830 is described below

commit f1b91858309a66ea37d3d87190f36646cd081ef3
Author: plat1ko <[email protected]>
AuthorDate: Tue Feb 14 15:21:54 2023 +0800

    [feature](cooldown) Implement cold data compaction (#16681)
---
 be/src/common/config.h                             |   2 +
 be/src/olap/CMakeLists.txt                         |   1 +
 be/src/olap/base_compaction.cpp                    |   9 +-
 be/src/olap/base_compaction.h                      |   2 +-
 be/src/olap/cold_data_compaction.cpp               |  78 ++++++++++
 ...ulative_compaction.h => cold_data_compaction.h} |  24 +--
 be/src/olap/compaction.cpp                         |  93 +++++++----
 be/src/olap/compaction.h                           |   4 +-
 be/src/olap/cumulative_compaction.cpp              |   2 +-
 be/src/olap/cumulative_compaction.h                |   2 +-
 be/src/olap/cumulative_compaction_policy.cpp       |  34 ++--
 be/src/olap/cumulative_compaction_policy.h         |   9 +-
 be/src/olap/olap_common.h                          |   1 +
 be/src/olap/olap_server.cpp                        | 112 +++++++++++++
 be/src/olap/reader.cpp                             |   4 +-
 be/src/olap/storage_engine.h                       |   3 +
 be/src/olap/tablet.cpp                             | 173 ++++++++++++++-------
 be/src/olap/tablet.h                               |  11 ++
 be/src/olap/tablet_meta.h                          |   4 +-
 be/src/vec/olap/vertical_merge_iterator.cpp        |   4 +
 20 files changed, 432 insertions(+), 140 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 176b3d665a..abf2bf3574 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -811,6 +811,8 @@ CONF_Int32(cooldown_thread_num, "5");
 CONF_mInt64(generate_cooldown_task_interval_sec, "20");
 CONF_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h
 CONF_mInt32(confirm_unused_remote_files_interval_sec, "60");
+CONF_Int32(cold_data_compaction_thread_num, "2");
+CONF_mInt32(cold_data_compaction_interval_sec, "1800");
 CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
 CONF_Int32(concurrency_per_dir, "2");
 CONF_mInt64(cooldown_lag_time_sec, "10800");       // 3h
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 3ae9d90ba2..c37bf8938f 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(Olap STATIC
     base_tablet.cpp
     bloom_filter.hpp
     block_column_predicate.cpp
+    cold_data_compaction.cpp
     compaction.cpp
     compaction_permit_limiter.cpp   
     cumulative_compaction.cpp
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 36fdb193de..d67f8a2d72 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -23,7 +23,7 @@
 namespace doris {
 using namespace ErrorCode;
 
-BaseCompaction::BaseCompaction(TabletSharedPtr tablet)
+BaseCompaction::BaseCompaction(const TabletSharedPtr& tablet)
         : Compaction(tablet, "BaseCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
 BaseCompaction::~BaseCompaction() {}
@@ -90,9 +90,14 @@ Status BaseCompaction::execute_compact_impl() {
 void BaseCompaction::_filter_input_rowset() {
     // if dup_key and no delete predicate
     // we skip big files too save resources
-    if (_tablet->keys_type() != KeysType::DUP_KEYS || 
_tablet->delete_predicates().size() != 0) {
+    if (_tablet->keys_type() != KeysType::DUP_KEYS) {
         return;
     }
+    for (auto& rs : _input_rowsets) {
+        if (rs->rowset_meta()->has_delete_predicate()) {
+            return;
+        }
+    }
     int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 
1024 * 1024;
     // first find a proper rowset for start
     auto rs_iter = _input_rowsets.begin();
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index 96a4a362f4..da53a89d95 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -28,7 +28,7 @@ namespace doris {
 
 class BaseCompaction : public Compaction {
 public:
-    BaseCompaction(TabletSharedPtr tablet);
+    BaseCompaction(const TabletSharedPtr& tablet);
     ~BaseCompaction() override;
 
     Status prepare_compact() override;
diff --git a/be/src/olap/cold_data_compaction.cpp 
b/be/src/olap/cold_data_compaction.cpp
new file mode 100644
index 0000000000..4b06ee7616
--- /dev/null
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/cold_data_compaction.h"
+
+#include "common/compiler_util.h"
+#include "olap/compaction.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+ColdDataCompaction::ColdDataCompaction(const TabletSharedPtr& tablet)
+        : Compaction(tablet, "ColdDataCompaction:" + 
std::to_string(tablet->tablet_id())) {}
+
+ColdDataCompaction::~ColdDataCompaction() = default;
+
+Status ColdDataCompaction::prepare_compact() {
+    if (UNLIKELY(!_tablet->init_succeeded())) {
+        return Status::Error<INVALID_ARGUMENT>();
+    }
+    return pick_rowsets_to_compact();
+}
+
+Status ColdDataCompaction::execute_compact_impl() {
+#ifndef __APPLE__
+    if (config::enable_base_compaction_idle_sched) {
+        Thread::set_idle_sched();
+    }
+#endif
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    int64_t permits = get_compaction_permits();
+    RETURN_IF_ERROR(do_compaction(permits));
+    _state = CompactionState::SUCCESS;
+    return Status::OK();
+}
+
+Status ColdDataCompaction::pick_rowsets_to_compact() {
+    _tablet->traverse_rowsets([this](const auto& rs) {
+        if (!rs->is_local()) {
+            _input_rowsets.push_back(rs);
+        }
+    });
+    std::sort(_input_rowsets.begin(), _input_rowsets.end(), 
Rowset::comparator);
+    return check_version_continuity(_input_rowsets);
+}
+
+Status ColdDataCompaction::modify_rowsets() {
+    {
+        std::lock_guard wlock(_tablet->get_header_lock());
+        // Merged cooldowned rowsets MUST NOT be managed by version graph, 
they will be reclaimed by `remove_unused_remote_files`.
+        _tablet->delete_rowsets(_input_rowsets, false);
+        _tablet->add_rowsets({_output_rowset});
+        // TODO(plat1ko): process primary key
+        _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid());
+    }
+    {
+        std::shared_lock rlock(_tablet->get_header_lock());
+        _tablet->save_meta();
+    }
+    return Status::OK();
+}
+
+} // namespace doris
diff --git a/be/src/olap/cumulative_compaction.h 
b/be/src/olap/cold_data_compaction.h
similarity index 68%
copy from be/src/olap/cumulative_compaction.h
copy to be/src/olap/cold_data_compaction.h
index f228e91975..75b4d064ca 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cold_data_compaction.h
@@ -17,34 +17,24 @@
 
 #pragma once
 
-#include <string>
-
 #include "olap/compaction.h"
-#include "olap/cumulative_compaction_policy.h"
 
 namespace doris {
 
-class CumulativeCompaction : public Compaction {
+class ColdDataCompaction final : public Compaction {
 public:
-    CumulativeCompaction(TabletSharedPtr tablet);
-    ~CumulativeCompaction() override;
+    ColdDataCompaction(const TabletSharedPtr& tablet);
+    ~ColdDataCompaction() override;
 
     Status prepare_compact() override;
     Status execute_compact_impl() override;
 
-    std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }
-
-protected:
-    Status pick_rowsets_to_compact() override;
-
-    std::string compaction_name() const override { return "cumulative 
compaction"; }
-
-    ReaderType compaction_type() const override { return 
ReaderType::READER_CUMULATIVE_COMPACTION; }
-
 private:
-    Version _last_delete_version {-1, -1};
+    std::string compaction_name() const override { return "cold data 
compaction"; }
+    ReaderType compaction_type() const override { return 
ReaderType::READER_COLD_DATA_COMPACTION; }
 
-    DISALLOW_COPY_AND_ASSIGN(CumulativeCompaction);
+    Status pick_rowsets_to_compact() override;
+    Status modify_rowsets() override;
 };
 
 } // namespace doris
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f968f219fe..b7fedd2203 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -33,7 +33,7 @@ using std::vector;
 namespace doris {
 using namespace ErrorCode;
 
-Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
+Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label)
         : _tablet(tablet),
           _input_rowsets_size(0),
           _input_row_num(0),
@@ -197,6 +197,10 @@ bool Compaction::handle_ordered_data_compaction() {
     if (!config::enable_ordered_data_compaction) {
         return false;
     }
+    if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+        // The remote file system does not support to link files.
+        return false;
+    }
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
         return false;
@@ -204,7 +208,7 @@ bool Compaction::handle_ordered_data_compaction() {
     // check delete version: if compaction type is base compaction and
     // has a delete version, use original compaction
     if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
-        for (auto rowset : _input_rowsets) {
+        for (auto& rowset : _input_rowsets) {
             if (rowset->rowset_meta()->has_delete_predicate()) {
                 return false;
             }
@@ -247,7 +251,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         int64_t now = UnixMillis();
         if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
             _tablet->set_last_cumu_compaction_success_time(now);
-        } else {
+        } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
             _tablet->set_last_base_compaction_success_time(now);
         }
         auto cumu_policy = _tablet->cumulative_compaction_policy();
@@ -273,41 +277,52 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     // 2. write merged rows to output rowset
     // The test results show that merger is low-memory-footprint, there is no 
need to tracker its mem pool
     Merger::Statistics stats;
-    Status res;
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
         stats.rowid_conversion = &_rowid_conversion;
     }
 
-    if (use_vectorized_compaction) {
-        if (vertical_compaction) {
-            res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
-                                                 _input_rs_readers, 
_output_rs_writer.get(),
-                                                 get_avg_segment_rows(), 
&stats);
+    auto build_output_rowset = [&]() {
+        Status res;
+        if (use_vectorized_compaction) {
+            if (vertical_compaction) {
+                res = Merger::vertical_merge_rowsets(_tablet, 
compaction_type(), _cur_tablet_schema,
+                                                     _input_rs_readers, 
_output_rs_writer.get(),
+                                                     get_avg_segment_rows(), 
&stats);
+            } else {
+                res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
+                                             _input_rs_readers, 
_output_rs_writer.get(), &stats);
+            }
         } else {
-            res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
-                                         _input_rs_readers, 
_output_rs_writer.get(), &stats);
+            LOG(FATAL) << "Only support vectorized compaction";
         }
-    } else {
-        LOG(FATAL) << "Only support vectorized compaction";
-    }
 
-    if (!res.ok()) {
-        LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". 
res=" << res
-                     << ", tablet=" << _tablet->full_name()
-                     << ", output_version=" << _output_version;
+        if (!res.ok()) {
+            LOG(WARNING) << "fail to do " << merge_type << compaction_name() 
<< ". res=" << res
+                         << ", tablet=" << _tablet->full_name()
+                         << ", output_version=" << _output_version;
+            return res;
+        }
+        TRACE("merge rowsets finished");
+        TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
+        TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
+
+        _output_rowset = _output_rs_writer->build();
+        if (_output_rowset == nullptr) {
+            LOG(WARNING) << "rowset writer build failed. writer version:"
+                         << ", output_version=" << _output_version;
+            return Status::Error<ROWSET_BUILDER_INIT>();
+        }
         return res;
-    }
-    TRACE("merge rowsets finished");
-    TRACE_COUNTER_INCREMENT("merged_rows", stats.merged_rows);
-    TRACE_COUNTER_INCREMENT("filtered_rows", stats.filtered_rows);
+    };
 
-    _output_rowset = _output_rs_writer->build();
-    if (_output_rowset == nullptr) {
-        LOG(WARNING) << "rowset writer build failed. writer version:"
-                     << ", output_version=" << _output_version;
-        return Status::Error<ROWSET_BUILDER_INIT>();
+    if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+        std::shared_lock slock(_tablet->get_remote_files_lock());
+        RETURN_IF_ERROR(build_output_rowset());
+    } else {
+        RETURN_IF_ERROR(build_output_rowset());
     }
+
     TRACE_COUNTER_INCREMENT("output_rowset_data_size", 
_output_rowset->data_disk_size());
     TRACE_COUNTER_INCREMENT("output_row_num", _output_rowset->num_rows());
     TRACE_COUNTER_INCREMENT("output_segments_num", 
_output_rowset->num_segments());
@@ -326,7 +341,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     // TODO(yingchun): do the judge in Tablet class
     if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
         _tablet->set_last_cumu_compaction_success_time(now);
-    } else {
+    } else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
         _tablet->set_last_base_compaction_success_time(now);
     }
 
@@ -364,6 +379,22 @@ Status Compaction::construct_output_rowset_writer(bool 
is_vertical) {
     ctx.segments_overlap = NONOVERLAPPING;
     ctx.tablet_schema = _cur_tablet_schema;
     ctx.newest_write_timestamp = _newest_write_timestamp;
+    if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) {
+        // write output rowset to storage policy resource
+        auto storage_policy = get_storage_policy(_tablet->storage_policy_id());
+        if (storage_policy == nullptr) {
+            return Status::InternalError("could not find storage_policy, 
storage_policy_id={}",
+                                         _tablet->storage_policy_id());
+        }
+        auto resource = get_storage_resource(storage_policy->resource_id);
+        if (resource.fs == nullptr) {
+            return Status::InternalError("could not find resource, 
resouce_id={}",
+                                         storage_policy->resource_id);
+        }
+        DCHECK(atol(resource.fs->id().c_str()) == storage_policy->resource_id);
+        DCHECK(resource.fs->type() != io::FileSystemType::LOCAL);
+        ctx.fs = std::move(resource.fs);
+    }
     if (is_vertical) {
         return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer);
     }
@@ -404,6 +435,12 @@ Status Compaction::modify_rowsets() {
 
 void Compaction::gc_output_rowset() {
     if (_state != CompactionState::SUCCESS && _output_rowset != nullptr) {
+        if (!_output_rowset->is_local()) {
+            _tablet->record_unused_remote_rowset(_output_rowset->rowset_id(),
+                                                 
_output_rowset->rowset_meta()->resource_id(),
+                                                 
_output_rowset->num_segments());
+            return;
+        }
         StorageEngine::instance()->add_unused_rowset(_output_rowset);
     }
 }
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 8501df9ef4..d136650d32 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -42,7 +42,7 @@ class Merger;
 //  4. gc output rowset if failed
 class Compaction {
 public:
-    Compaction(TabletSharedPtr tablet, const std::string& label);
+    Compaction(const TabletSharedPtr& tablet, const std::string& label);
     virtual ~Compaction();
 
     // This is only for http CompactionAction
@@ -64,7 +64,7 @@ protected:
     Status do_compaction(int64_t permits);
     Status do_compaction_impl(int64_t permits);
 
-    Status modify_rowsets();
+    virtual Status modify_rowsets();
     void gc_output_rowset();
 
     Status construct_output_rowset_writer(bool is_vertical = false);
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 765790859c..f6d56d142e 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -24,7 +24,7 @@
 namespace doris {
 using namespace ErrorCode;
 
-CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet)
+CumulativeCompaction::CumulativeCompaction(const TabletSharedPtr& tablet)
         : Compaction(tablet, "CumulativeCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
 CumulativeCompaction::~CumulativeCompaction() {}
diff --git a/be/src/olap/cumulative_compaction.h 
b/be/src/olap/cumulative_compaction.h
index f228e91975..fdfc9dbe46 100644
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -26,7 +26,7 @@ namespace doris {
 
 class CumulativeCompaction : public Compaction {
 public:
-    CumulativeCompaction(TabletSharedPtr tablet);
+    CumulativeCompaction(const TabletSharedPtr& tablet);
     ~CumulativeCompaction() override;
 
     Status prepare_compact() override;
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index c2735def03..b38271fd1a 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -149,22 +149,21 @@ void 
SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
     }
 }
 
-void SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
-        Tablet* tablet, TabletState state, const 
std::vector<RowsetMetaSharedPtr>& all_metas,
-        int64_t current_cumulative_point, uint32_t* score) {
+uint32_t 
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* 
tablet) {
+    uint32_t score = 0;
     bool base_rowset_exist = false;
-    const int64_t point = current_cumulative_point;
+    const int64_t point = tablet->cumulative_layer_point();
     int64_t promotion_size = 0;
 
     std::vector<RowsetMetaSharedPtr> rowset_to_compact;
     int64_t total_size = 0;
 
-    // check the base rowset and collect the rowsets of cumulative part
-    auto rs_meta_iter = all_metas.begin();
     RowsetMetaSharedPtr first_meta;
     int64_t first_version = INT64_MAX;
-    for (; rs_meta_iter != all_metas.end(); rs_meta_iter++) {
-        auto rs_meta = *rs_meta_iter;
+    // NOTE: tablet._meta_lock is hold
+    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
+    // check the base rowset and collect the rowsets of cumulative part
+    for (auto& rs_meta : rs_metas) {
         if (rs_meta->start_version() < first_version) {
             first_version = rs_meta->start_version();
             first_meta = rs_meta;
@@ -173,20 +172,19 @@ void 
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         if (rs_meta->start_version() == 0) {
             base_rowset_exist = true;
         }
-        if (rs_meta->end_version() < point) {
+        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
             // all_rs_metas() is not sorted, so we use _continue_ other than 
_break_ here.
             continue;
         } else {
             // collect the rowsets of cumulative part
             total_size += rs_meta->total_disk_size();
-            *score += rs_meta->get_compaction_score();
+            score += rs_meta->get_compaction_score();
             rowset_to_compact.push_back(rs_meta);
         }
     }
 
     if (first_meta == nullptr) {
-        *score = 0;
-        return;
+        return 0;
     }
 
     // Use "first"(not base) version to calc promotion size
@@ -195,15 +193,14 @@ void 
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
 
     // If base version does not exist, but its state is RUNNING.
     // It is abnormal, do not select it and set *score = 0
-    if (!base_rowset_exist && state == TABLET_RUNNING) {
+    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
         LOG(WARNING) << "tablet state is running but have no base version";
-        *score = 0;
-        return;
+        return 0;
     }
 
     // if total_size is greater than promotion_size, return total score
     if (total_size >= promotion_size) {
-        return;
+        return score;
     }
 
     // sort the rowsets of cumulative part
@@ -219,11 +216,12 @@ void 
SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         // if current level less then remain level, score contains current 
rowset
         // and process return; otherwise, score does not contains current 
rowset.
         if (current_level <= remain_level) {
-            return;
+            return score;
         }
         total_size -= rs_meta->total_disk_size();
-        *score -= rs_meta->get_compaction_score();
+        score -= rs_meta->get_compaction_score();
     }
+    return score;
 }
 
 int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
diff --git a/be/src/olap/cumulative_compaction_policy.h 
b/be/src/olap/cumulative_compaction_policy.h
index 623688290d..20a9d64b61 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -54,9 +54,7 @@ public:
     /// param all_rowsets, all rowsets in tablet.
     /// param current_cumulative_point, current cumulative point value.
     /// return score, the result score after calculate.
-    virtual void calc_cumulative_compaction_score(
-            Tablet* tablet, TabletState state, const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
-            int64_t current_cumulative_point, uint32_t* score) = 0;
+    virtual uint32_t calc_cumulative_compaction_score(Tablet* tablet) = 0;
 
     /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function.
     /// Its implementation depends on concrete compaction policy.
@@ -142,10 +140,7 @@ public:
 
     /// Num based cumulative compaction policy implements calc cumulative 
compaction score function.
     /// Its main policy is calculating the accumulative compaction score after 
current cumulative_point in tablet.
-    void calc_cumulative_compaction_score(Tablet* tablet, TabletState state,
-                                          const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
-                                          int64_t current_cumulative_point,
-                                          uint32_t* score) override;
+    uint32_t calc_cumulative_compaction_score(Tablet* tablet) override;
 
     std::string name() override { return CUMULATIVE_SIZE_BASED_POLICY; }
 
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 35179caf6d..a624e59a6d 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -175,6 +175,7 @@ enum ReaderType {
     READER_BASE_COMPACTION = 2,
     READER_CUMULATIVE_COMPACTION = 3,
     READER_CHECKSUM = 4,
+    READER_COLD_DATA_COMPACTION = 5,
 };
 
 constexpr bool field_is_slice_type(const FieldType& field_type) {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 5f61adea46..707b77db5b 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
 #include "io/cache/file_cache_manager.h"
+#include "olap/cold_data_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -88,6 +89,10 @@ Status StorageEngine::start_bg_threads() {
                 .set_max_threads(config::seg_compaction_max_threads)
                 .build(&_seg_compaction_thread_pool);
     }
+    ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
+            .set_min_threads(config::cold_data_compaction_thread_num)
+            .set_max_threads(config::cold_data_compaction_thread_num)
+            .build(&_cold_data_compaction_thread_pool);
 
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
@@ -162,6 +167,12 @@ Status StorageEngine::start_bg_threads() {
             &_remove_unused_remote_files_thread));
     LOG(INFO) << "remove unused remote files thread started";
 
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "remove_unused_remote_files_thread",
+            [this]() { this->_cold_data_compaction_producer_callback(); },
+            &_remove_unused_remote_files_thread));
+    LOG(INFO) << "cold data compaction producer thread started";
+
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "cache_file_cleaner_tasks_producer_thread",
             [this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
@@ -755,6 +766,107 @@ void 
StorageEngine::_remove_unused_remote_files_callback() {
     }
 }
 
+void StorageEngine::_cold_data_compaction_producer_callback() {
+    std::unordered_set<int64_t> tablet_submitted;
+    std::mutex tablet_submitted_mtx;
+
+    while (!_stop_background_threads_latch.wait_for(
+            std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
+        if (config::disable_auto_compaction) {
+            continue;
+        }
+
+        std::unordered_set<int64_t> copied_tablet_submitted;
+        {
+            std::lock_guard lock(tablet_submitted_mtx);
+            copied_tablet_submitted = tablet_submitted;
+        }
+        int n = config::cold_data_compaction_thread_num - 
copied_tablet_submitted.size();
+        if (n <= 0) {
+            continue;
+        }
+        auto tablets = 
_tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) {
+            return t->tablet_meta()->cooldown_meta_id().initialized() && 
t->is_used() &&
+                   t->tablet_state() == TABLET_RUNNING &&
+                   !copied_tablet_submitted.count(t->tablet_id()) &&
+                   
!t->tablet_meta()->tablet_schema()->disable_auto_compaction();
+        });
+        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact;
+        tablet_to_compact.reserve(n + 1);
+        std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow;
+        tablet_to_follow.reserve(n + 1);
+
+        for (auto& t : tablets) {
+            if (t->replica_id() == t->cooldown_replica_id()) {
+                auto score = t->calc_cold_data_compaction_score();
+                if (score < 4) {
+                    continue;
+                }
+                tablet_to_compact.emplace_back(t, score);
+                std::sort(tablet_to_compact.begin(), tablet_to_compact.end(),
+                          [](auto& a, auto& b) { return a.second > b.second; 
});
+                if (tablet_to_compact.size() > n) tablet_to_compact.pop_back();
+                continue;
+            }
+            // else, need to follow
+            {
+                std::lock_guard lock(_running_cooldown_mutex);
+                if (_running_cooldown_tablets.count(t->table_id())) {
+                    // already in cooldown queue
+                    continue;
+                }
+            }
+            // TODO(plat1ko): some avoidance strategy if failed to follow
+            auto score = t->calc_cold_data_compaction_score();
+            tablet_to_follow.emplace_back(t, score);
+            std::sort(tablet_to_follow.begin(), tablet_to_follow.end(),
+                      [](auto& a, auto& b) { return a.second > b.second; });
+            if (tablet_to_follow.size() > n) tablet_to_follow.pop_back();
+        }
+
+        for (auto& [tablet, score] : tablet_to_compact) {
+            LOG(INFO) << "submit cold data compaction. tablet_id=" << 
tablet->tablet_id()
+                      << " score=" << score;
+            _cold_data_compaction_thread_pool->submit_func([&, t = 
std::move(tablet)]() {
+                auto compaction = std::make_shared<ColdDataCompaction>(t);
+                {
+                    std::lock_guard lock(tablet_submitted_mtx);
+                    tablet_submitted.insert(t->tablet_id());
+                }
+                auto st = compaction->compact();
+                {
+                    std::lock_guard lock(tablet_submitted_mtx);
+                    tablet_submitted.erase(t->tablet_id());
+                }
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to do cold data compaction. 
tablet_id="
+                                 << t->tablet_id() << " err=" << st;
+                }
+            });
+        }
+
+        for (auto& [tablet, score] : tablet_to_follow) {
+            LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << 
tablet->tablet_id()
+                      << " score=" << score;
+            _cold_data_compaction_thread_pool->submit_func([&, t = 
std::move(tablet)]() {
+                {
+                    std::lock_guard lock(tablet_submitted_mtx);
+                    tablet_submitted.insert(t->tablet_id());
+                }
+                auto st = t->cooldown();
+                {
+                    std::lock_guard lock(tablet_submitted_mtx);
+                    tablet_submitted.erase(t->tablet_id());
+                }
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to cooldown. tablet_id=" << 
t->tablet_id()
+                                 << " err=" << st;
+                }
+            });
+        }
+    }
+}
+
 void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
     int64_t interval = config::generate_cache_cleaner_task_interval_sec;
     do {
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 8d59085908..e201d49e68 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -308,6 +308,7 @@ Status TabletReader::_init_return_columns(const 
ReaderParams& read_params) {
         VLOG_NOTICE << "return column is empty, using full column as default.";
     } else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION ||
                 read_params.reader_type == READER_BASE_COMPACTION ||
+                read_params.reader_type == READER_COLD_DATA_COMPACTION ||
                 read_params.reader_type == READER_ALTER_TABLE) &&
                !read_params.return_columns.empty()) {
         _return_columns = read_params.return_columns;
@@ -592,11 +593,12 @@ Status TabletReader::_init_delete_condition(const 
ReaderParams& read_params) {
     if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
         return Status::OK();
     }
-    // Only BASE_COMPACTION need set filter_delete = true
+    // Only BASE_COMPACTION and COLD_DATA_COMPACTION need set filter_delete = 
true
     // other reader type:
     // QUERY will filter the row in query layer to keep right result use where 
clause.
     // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
     if (read_params.reader_type == READER_BASE_COMPACTION ||
+        read_params.reader_type == READER_COLD_DATA_COMPACTION ||
         read_params.reader_type == READER_CHECKSUM) {
         _filter_delete = true;
     }
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 768e95daf9..3fe49c3246 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -274,6 +274,7 @@ private:
 
     void _cooldown_tasks_producer_callback();
     void _remove_unused_remote_files_callback();
+    void _cold_data_compaction_producer_callback();
 
     void _cache_file_cleaner_tasks_producer_callback();
 
@@ -372,6 +373,7 @@ private:
     std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
 
     std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
 
@@ -396,6 +398,7 @@ private:
 
     scoped_refptr<Thread> _cooldown_tasks_producer_thread;
     scoped_refptr<Thread> _remove_unused_remote_files_thread;
+    scoped_refptr<Thread> _cold_data_compaction_producer_thread;
 
     scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8748c67c86..a931b6c81b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -418,6 +418,38 @@ Status 
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
     return Status::OK();
 }
 
+void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    rs_metas.reserve(to_add.size());
+    for (auto& rs : to_add) {
+        _rs_version_map.emplace(rs->version(), rs);
+        _timestamped_version_tracker.add_version(rs->version());
+        rs_metas.push_back(rs->rowset_meta());
+    }
+    _tablet_meta->modify_rs_metas(rs_metas, {});
+}
+
+void Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, 
bool move_to_stale) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    rs_metas.reserve(to_delete.size());
+    for (auto& rs : to_delete) {
+        rs_metas.push_back(rs->rowset_meta());
+        _rs_version_map.erase(rs->version());
+    }
+    _tablet_meta->modify_rs_metas({}, rs_metas, !move_to_stale);
+    if (move_to_stale) {
+        for (auto& rs : to_delete) {
+            _stale_rs_version_map[rs->version()] = rs;
+        }
+        _timestamped_version_tracker.add_stale_path_version(rs_metas);
+    } else {
+        for (auto& rs : to_delete) {
+            _timestamped_version_tracker.delete_version(rs->version());
+            StorageEngine::instance()->add_unused_rowset(rs);
+        }
+    }
+}
+
 // snapshot manager may call this api to check if version exists, so that
 // the version maybe not exist
 const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version,
@@ -854,6 +886,32 @@ uint32_t Tablet::calc_compaction_score(
     }
 }
 
+uint32_t Tablet::calc_cold_data_compaction_score() const {
+    uint32_t score = 0;
+    std::vector<RowsetMetaSharedPtr> cooldowned_rowsets;
+    int64_t max_delete_version = 0;
+    {
+        std::shared_lock rlock(_meta_lock);
+        for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
+            if (!rs_meta->is_local()) {
+                cooldowned_rowsets.push_back(rs_meta);
+                if (rs_meta->has_delete_predicate() &&
+                    rs_meta->end_version() > max_delete_version) {
+                    max_delete_version = rs_meta->end_version();
+                }
+            }
+        }
+    }
+    for (auto& rs_meta : cooldowned_rowsets) {
+        if (rs_meta->end_version() < max_delete_version) {
+            score += rs_meta->num_segments();
+        } else {
+            score += rs_meta->get_compaction_score();
+        }
+    }
+    return (keys_type() != KeysType::DUP_KEYS) ? score * 2 : score;
+}
+
 uint32_t Tablet::_calc_cumulative_compaction_score(
         std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy) {
 #ifndef BE_TEST
@@ -862,10 +920,7 @@ uint32_t Tablet::_calc_cumulative_compaction_score(
         _cumulative_compaction_policy = cumulative_compaction_policy;
     }
 #endif
-    uint32_t score = 0;
-    _cumulative_compaction_policy->calc_cumulative_compaction_score(
-            this, tablet_state(), _tablet_meta->all_rs_metas(), 
cumulative_layer_point(), &score);
-    return score;
+    return 
_cumulative_compaction_policy->calc_cumulative_compaction_score(this);
 }
 
 uint32_t Tablet::_calc_base_compaction_score() const {
@@ -876,7 +931,7 @@ uint32_t Tablet::_calc_base_compaction_score() const {
         if (rs_meta->start_version() == 0) {
             base_rowset_exist = true;
         }
-        if (rs_meta->start_version() >= point) {
+        if (rs_meta->start_version() >= point || !rs_meta->is_local()) {
             // all_rs_metas() is not sorted, so we use _continue_ other than 
_break_ here.
             continue;
         }
@@ -1724,11 +1779,16 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     {
         std::unique_lock meta_wlock(_meta_lock);
         if (tablet_state() == TABLET_RUNNING) {
-            modify_rowsets(to_add, to_delete);
+            delete_rowsets({std::move(old_rowset)}, false);
+            add_rowsets({std::move(new_rowset)});
+            // TODO(plat1ko): process primary key
             _tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
-            save_meta();
         }
     }
+    {
+        std::unique_lock meta_rlock(_meta_lock);
+        save_meta();
+    }
     return Status::OK();
 }
 
@@ -1802,66 +1862,59 @@ Status 
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
     std::vector<RowsetSharedPtr> overlap_rowsets;
     bool version_aligned = false;
 
-    std::lock_guard wlock(_meta_lock);
-    if (tablet_state() != TABLET_RUNNING) {
-        return Status::InternalError("tablet not running");
-    }
+    {
+        std::lock_guard wlock(_meta_lock);
+        if (tablet_state() != TABLET_RUNNING) {
+            return Status::InternalError("tablet not running");
+        }
 
-    for (auto& [v, rs] : _rs_version_map) {
-        if (v.second == cooldowned_version) {
-            version_aligned = true;
-            break;
+        for (auto& [v, rs] : _rs_version_map) {
+            if (v.second == cooldowned_version) {
+                version_aligned = true;
+                break;
+            }
         }
-    }
-    if (!version_aligned) {
-        return Status::InternalError("cooldowned version is not aligned");
-    }
-    for (auto& [v, rs] : _rs_version_map) {
-        if (v.second <= cooldowned_version) {
-            overlap_rowsets.push_back(rs);
-        } else if (!rs->is_local()) {
-            return Status::InternalError("cooldowned version larger than that 
to follow");
+        if (!version_aligned) {
+            return Status::InternalError("cooldowned version is not aligned");
         }
-    }
-    std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), 
Rowset::comparator);
-    auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
-    auto rs_it = overlap_rowsets.begin();
-    for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != 
overlap_rowsets.end();
-         ++rs_pb_it, ++rs_it) {
-        // skip cooldowned rowset with same version in BE
-        if ((*rs_it)->is_local() || rs_pb_it->end_version() != 
(*rs_it)->end_version()) {
-            break;
+        for (auto& [v, rs] : _rs_version_map) {
+            if (v.second <= cooldowned_version) {
+                overlap_rowsets.push_back(rs);
+            } else if (!rs->is_local()) {
+                return Status::InternalError("cooldowned version larger than 
that to follow");
+            }
         }
+        std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), 
Rowset::comparator);
+        auto rs_pb_it = cooldown_meta_pb.rs_metas().begin();
+        auto rs_it = overlap_rowsets.begin();
+        for (; rs_pb_it != cooldown_meta_pb.rs_metas().end() && rs_it != 
overlap_rowsets.end();
+             ++rs_pb_it, ++rs_it) {
+            // skip cooldowned rowset with same version in BE
+            if ((*rs_it)->is_local() || rs_pb_it->end_version() != 
(*rs_it)->end_version()) {
+                break;
+            }
+        }
+        std::vector<RowsetSharedPtr> to_delete(rs_it, overlap_rowsets.end());
+        std::vector<RowsetSharedPtr> to_add;
+        to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
+        for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
+            auto rs_meta = std::make_shared<RowsetMeta>();
+            rs_meta->init_from_pb(*rs_pb_it);
+            RowsetSharedPtr rs;
+            RowsetFactory::create_rowset(_schema, _tablet_path, 
std::move(rs_meta), &rs);
+            to_add.push_back(std::move(rs));
+        }
+        // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` 
cannot process version graph correctly.
+        delete_rowsets(to_delete, false);
+        add_rowsets(to_add);
+        // TODO(plat1ko): process primary key
+        
_tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
     }
-    // Note: We CANNOT call `modify_rowsets` here because `modify_rowsets` 
cannot process version graph correctly.
-    std::vector<RowsetMetaSharedPtr> to_delete;
-    to_delete.reserve(overlap_rowsets.end() - rs_it);
-    for (; rs_it != overlap_rowsets.end(); ++rs_it) {
-        _rs_version_map.erase((*rs_it)->version());
-        to_delete.push_back((*rs_it)->rowset_meta());
-        _timestamped_version_tracker.delete_version((*rs_it)->version());
-        StorageEngine::instance()->add_unused_rowset(*rs_it);
-    }
-
-    std::vector<RowsetMetaSharedPtr> to_add;
-    to_add.reserve(cooldown_meta_pb.rs_metas().end() - rs_pb_it);
-    for (; rs_pb_it != cooldown_meta_pb.rs_metas().end(); ++rs_pb_it) {
-        auto rs_meta = std::make_shared<RowsetMeta>();
-        rs_meta->init_from_pb(*rs_pb_it);
-        RowsetSharedPtr rowset;
-        RowsetFactory::create_rowset(_schema, _tablet_path, 
std::move(rs_meta), &rowset);
-        _rs_version_map.emplace(rowset->version(), rowset);
-        to_add.push_back(rowset->rowset_meta());
-        _timestamped_version_tracker.add_version(rowset->version());
+    {
+        std::lock_guard rlock(_meta_lock);
+        save_meta();
     }
 
-    _tablet_meta->modify_rs_metas(to_add, to_delete);
-
-    // TODO(plat1ko): process primary key
-
-    _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
-    save_meta();
-
     return Status::OK();
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index dc0034f065..f06463dcb3 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -293,9 +293,16 @@ public:
 
     Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* 
rowset);
 
+    // MUST hold EXCLUSIVE `_meta_lock`
+    void add_rowsets(const std::vector<RowsetSharedPtr>& to_add);
+    // MUST hold EXCLUSIVE `_meta_lock`
+    void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool 
move_to_stale);
+
     
////////////////////////////////////////////////////////////////////////////
     // begin cooldown functions
     
////////////////////////////////////////////////////////////////////////////
+    int64_t cooldown_replica_id() const { return _cooldown_replica_id; }
+
     // Cooldown to remote fs.
     Status cooldown();
 
@@ -320,6 +327,10 @@ public:
                                      int64_t num_segments);
 
     static void remove_unused_remote_files();
+
+    std::shared_mutex& get_remote_files_lock() { return _remote_files_lock; }
+
+    uint32_t calc_cold_data_compaction_score() const;
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 8b147216a8..96adc4baad 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -146,7 +146,7 @@ public:
     bool in_restore_mode() const;
     void set_in_restore_mode(bool in_restore_mode);
 
-    TabletSchemaSPtr tablet_schema() const;
+    const TabletSchemaSPtr& tablet_schema() const;
 
     const TabletSchemaSPtr tablet_schema(Version version) const;
 
@@ -529,7 +529,7 @@ inline void TabletMeta::set_in_restore_mode(bool 
in_restore_mode) {
     _in_restore_mode = in_restore_mode;
 }
 
-inline TabletSchemaSPtr TabletMeta::tablet_schema() const {
+inline const TabletSchemaSPtr& TabletMeta::tablet_schema() const {
     return _schema;
 }
 
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index f173c6b3da..1f86ca81a2 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/olap/vertical_merge_iterator.h"
 
+#include "olap/olap_common.h"
+
 namespace doris {
 using namespace ErrorCode;
 
@@ -137,6 +139,8 @@ Status RowSourcesBuffer::_create_buffer_file() {
         file_path_ss << "_base";
     } else if (_reader_type == READER_CUMULATIVE_COMPACTION) {
         file_path_ss << "_cumu";
+    } else if (_reader_type == READER_COLD_DATA_COMPACTION) {
+        file_path_ss << "_cold";
     } else {
         DCHECK(false);
         return Status::InternalError("unknown reader type");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to