morningman commented on a change in pull request #4212:
URL: https://github.com/apache/incubator-doris/pull/4212#discussion_r465747771



##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}

Review comment:
       Is raw pointer safe here? Why not using shared ptr?

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = kInvalidCumulativePoint;
+    if (current_cumulative_point != kInvalidCumulativePoint) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+    int64_t promotion_size = 0;
+    _calc_promotion_size(*base_rowset_meta, &promotion_size);
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {

Review comment:
       Continue if the singletion rowset is a delete version?

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.

Review comment:
       ```suggestion
       /// This base class gives a unified implementation. Its derived classes 
also can override this function each other.
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = kInvalidCumulativePoint;
+    if (current_cumulative_point != kInvalidCumulativePoint) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+    int64_t promotion_size = 0;
+    _calc_promotion_size(*base_rowset_meta, &promotion_size);
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        // check the rowset is whether less than promotion size
+        if (rs->version().first != 0 && rs->total_disk_size() < 
promotion_size) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        prev_version = rs->version().second;
+        *ret_cumulative_point = prev_version + 1;
+    }
+    LOG(INFO) << "cumulative compaction universal policy, calculate cumulative 
point value = "
+              << *ret_cumulative_point << ", calc promotion size value = " << 
promotion_size
+              << " tablet = " << _tablet->full_name();
+}
+
+void 
UniversalCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr 
base_rowset_meta,
+                                                               int64_t* 
promotion_size) {
+    int64_t base_size = base_rowset_meta->total_disk_size();
+    *promotion_size = base_size * _universal_promotion_ratio;
+
+    // promotion_size is between _universal_promotion_size and 
_universal_promotion_min_size
+    if (*promotion_size >= _universal_promotion_size) {
+        *promotion_size = _universal_promotion_size;
+    } else if (*promotion_size <= _universal_promotion_min_size) {
+        *promotion_size = _universal_promotion_min_size;
+    }
+    _refresh_tablet_universal_promotion_size(*promotion_size);
+}
+
+void 
UniversalCumulativeCompactionPolicy::_refresh_tablet_universal_promotion_size(
+        int64_t promotion_size) {
+    _tablet_universal_promotion_size = promotion_size;
+}
+
+void UniversalCumulativeCompactionPolicy::update_cumulative_point(
+        std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr 
output_rowset,
+        Version& last_delete_version) {
+
+    // if rowsets have delete version, move to the last directly
+    if (last_delete_version.first != -1) {
+        _tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
+    } else {
+        // if rowsets have not delete version, check output_rowset total disk 
size 
+        // satisfies promotion size.
+        size_t total_size = output_rowset->rowset_meta()->total_disk_size();
+        if (total_size >= _tablet_universal_promotion_size) {
+            _tablet->set_cumulative_layer_point(output_rowset->end_version() + 
1);
+        }
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calc_cumulative_compaction_score(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, int64_t 
current_cumulative_point,
+        uint32_t* score) {
+
+    bool base_rowset_exist = false;
+    const int64_t point = current_cumulative_point;
+    int64_t promotion_size = 0;
+    
+    std::vector<RowsetMetaSharedPtr> rowset_to_compact;
+    int64_t total_size = 0;
+
+    // check the base rowset and collect the rowsets of cumulative part 
+    auto rs_meta_iter = all_metas.begin();
+    for (; rs_meta_iter != all_metas.end(); rs_meta_iter++) {
+        auto rs_meta = *rs_meta_iter;
+        // check base rowset
+        if (rs_meta->start_version() == 0) {
+            base_rowset_exist = true;
+            _calc_promotion_size(rs_meta, &promotion_size);
+        }
+        if (rs_meta->start_version() < point) {

Review comment:
       better use `end_version()` here to check?

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,

Review comment:
       ```suggestion
               const std::unordered_map<Version, RowsetSharedPtr, 
HashOfVersion>& rs_version_map,
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+            int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+    
+    /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function. 
+    /// Its implemention depands on concrete compaction policy.
+    /// param candidate_rowsets, the candidate_rowsets vector container to 
pick input rowsets
+    /// return input_rowsets, the vector container as return
+    /// return last_delete_version, if has delete rowset, record the delete 
version from input_rowsets
+    /// return compaction_score, calculate the compaction score of picked 
input rowset
+    virtual int pick_input_rowsets(std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                                    const int64_t max_compaction_score,
+                                    const int64_t min_compaction_score,
+                                    std::vector<RowsetSharedPtr>* 
input_rowsets,
+                                    Version* last_delete_version, size_t* 
compaction_score) = 0;
+
+    /// Update tablet's cumulative point after cumulative compaction finished. 
This function is pure virtual function.
+    /// Each derived has its own update policy which deponds on its concrete 
algorithm. When the cumulative point moves 
+    /// after output rowset, then output rowset will do base compaction next 
time.
+    /// param input_rowsets, the picked input rowset to do compaction just now
+    /// param output_rowset, the result rowset after compaction
+    virtual void update_cumulative_point(std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                         RowsetSharedPtr output_rowset,
+                                         Version& last_delete_version) = 0;
+
+    /// Calculate tablet's cumulatvie point before compaction. This 
calculation just executes once when the tablet compacts
+    /// first time after BE initialization and then motion of cumulatvie point 
depends on update_cumulative_point policy.
+    /// This function is pure virtual function. In genaral, the cumulative 
point splits the rowsets into two parts:
+    /// base rowsets, cumulative rowsets.
+    /// param all_rowsets, all rowsets in the tablet
+    /// param kInvalidCumulativePoint, the value to represent whether the 
cumulative point is initialized
+    /// param current_cumulative_point, current cumulative position
+    /// return cumulative_point, the result of calculating cumulative point 
position
+    virtual void calculate_cumulative_point(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                            const int64_t 
kInvalidCumulativePoint,

Review comment:
       `kInvalidCumulativePoint`, this param name is weird.

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+            int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+    
+    /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function. 
+    /// Its implemention depands on concrete compaction policy.
+    /// param candidate_rowsets, the candidate_rowsets vector container to 
pick input rowsets
+    /// return input_rowsets, the vector container as return
+    /// return last_delete_version, if has delete rowset, record the delete 
version from input_rowsets
+    /// return compaction_score, calculate the compaction score of picked 
input rowset
+    virtual int pick_input_rowsets(std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                                    const int64_t max_compaction_score,
+                                    const int64_t min_compaction_score,
+                                    std::vector<RowsetSharedPtr>* 
input_rowsets,
+                                    Version* last_delete_version, size_t* 
compaction_score) = 0;
+
+    /// Update tablet's cumulative point after cumulative compaction finished. 
This function is pure virtual function.
+    /// Each derived has its own update policy which deponds on its concrete 
algorithm. When the cumulative point moves 
+    /// after output rowset, then output rowset will do base compaction next 
time.
+    /// param input_rowsets, the picked input rowset to do compaction just now
+    /// param output_rowset, the result rowset after compaction
+    virtual void update_cumulative_point(std::vector<RowsetSharedPtr>& 
input_rowsets,
+                                         RowsetSharedPtr output_rowset,
+                                         Version& last_delete_version) = 0;
+
+    /// Calculate tablet's cumulatvie point before compaction. This 
calculation just executes once when the tablet compacts
+    /// first time after BE initialization and then motion of cumulatvie point 
depends on update_cumulative_point policy.
+    /// This function is pure virtual function. In genaral, the cumulative 
point splits the rowsets into two parts:
+    /// base rowsets, cumulative rowsets.
+    /// param all_rowsets, all rowsets in the tablet
+    /// param kInvalidCumulativePoint, the value to represent whether the 
cumulative point is initialized
+    /// param current_cumulative_point, current cumulative position
+    /// return cumulative_point, the result of calculating cumulative point 
position
+    virtual void calculate_cumulative_point(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                            const int64_t 
kInvalidCumulativePoint,
+                                            int64_t current_cumulative_point,
+                                            int64_t* cumulative_point) = 0;
+
+protected:
+    /// tablet pointer
+    Tablet* _tablet;
+};
+
+/// Original cumulative compcation policy implemention. Original policy which 
derives CumulativeCompactionPolicy is early 
+/// basic algorithm. This policy uses linear structure to compact rowsets. The 
cumulative rowsets compact only once and 
+/// then the output will do base compaction. It can make segments of rowsets 
in order and compact small rowsets to a bigger one.
+class OriginalCumulativeCompactionPolicy : public CumulativeCompactionPolicy {
+    
+public:
+    /// Constructor function of OriginalCumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    OriginalCumulativeCompactionPolicy(Tablet* tablet)
+            : CumulativeCompactionPolicy(tablet){}
+
+    /// Destructor function of OriginalCumulativeCompactionPolicy.
+    ~OriginalCumulativeCompactionPolicy() {}
+
+    /// Original cumulative compaction policy implements pick input rowsets 
function.
+    /// Its main policy is picking rowsets from candidate rowsets by comparing 
accumulative compaction_score and
+    /// max_cumulative_compaction_num_singleton_deltas or checking whether 
there is delete version rowset.
+    int pick_input_rowsets(std::vector<RowsetSharedPtr>& candidate_rowsets,
+                            const int64_t max_compaction_score,
+                            const int64_t min_compaction_score,
+                            std::vector<RowsetSharedPtr>* input_rowsets,
+                            Version* last_delete_version, size_t* 
compaction_score) override;
+
+    /// Original cumulative compaction policy implements update cumulative 
point function.
+    /// Its main policy is using the last input version to update the 
cumulative point. It aims that every rowsets only 
+    /// do compact once.
+    void update_cumulative_point(std::vector<RowsetSharedPtr>& input_rowsets,
+                                 RowsetSharedPtr _output_rowset,
+                                 Version& last_delete_version) override;
+
+    /// Original cumulative compaction policy implements calculate cumulative 
point function.
+    /// When the first time the tablet does compact, this calculation is 
executed. Its main policy is to find first rowset
+    /// which is segments_overlapping type, it represent this rowset is not 
compacted and use this version as cumulative point. 
+    void calculate_cumulative_point(const std::vector<RowsetMetaSharedPtr>& 
all_rowsets,
+                                     const int64_t kInvalidCumulativePoint,
+                                     int64_t current_cumulative_point,
+                                     int64_t* cumulative_point) override;
+
+    /// Original cumulative compaction policy implements calc cumulative 
compaction score function.
+    /// Its main policy is calculating the accumulative compaction score after 
current cumulative_point in tablet.
+    void calc_cumulative_compaction_score(const 
std::vector<RowsetMetaSharedPtr>& all_rowsets,
+                                          int64_t current_cumulative_point,
+                                          uint32_t* score) override;
+
+};
+
+/// Universal cumulative compcation policy implemention. Universal policy 
which derives CumulativeCompactionPolicy is a optimized
+/// version of Original cumulative compcation policy. This policy alos uses 
linear structure to compact rowsets. The cumulative rowsets 

Review comment:
       ```suggestion
   /// version of Original cumulative compaction policy. This policy also uses 
linear structure to compact rowsets. The cumulative rowsets 
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+            int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+    
+    /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function. 
+    /// Its implemention depands on concrete compaction policy.
+    /// param candidate_rowsets, the candidate_rowsets vector container to 
pick input rowsets
+    /// return input_rowsets, the vector container as return
+    /// return last_delete_version, if has delete rowset, record the delete 
version from input_rowsets
+    /// return compaction_score, calculate the compaction score of picked 
input rowset
+    virtual int pick_input_rowsets(std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+                                    const int64_t max_compaction_score,
+                                    const int64_t min_compaction_score,
+                                    std::vector<RowsetSharedPtr>* 
input_rowsets,
+                                    Version* last_delete_version, size_t* 
compaction_score) = 0;
+
+    /// Update tablet's cumulative point after cumulative compaction finished. 
This function is pure virtual function.
+    /// Each derived has its own update policy which deponds on its concrete 
algorithm. When the cumulative point moves 
+    /// after output rowset, then output rowset will do base compaction next 
time.
+    /// param input_rowsets, the picked input rowset to do compaction just now
+    /// param output_rowset, the result rowset after compaction
+    virtual void update_cumulative_point(std::vector<RowsetSharedPtr>& 
input_rowsets,

Review comment:
       ```suggestion
       virtual void update_cumulative_point(const std::vector<RowsetSharedPtr>& 
input_rowsets,
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.h
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+#define DORIS_BE_SRC_OLAP_CUMULATIVE_COMPACTION_POLICY_H
+
+#include <string>
+
+#include "olap/utils.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class Tablet;
+
+/// This CompactionPolicyType enum is used to represent the type of compaction 
policy.
+/// Now it has two values, CUMULATIVE_ORIGINAL_POLICY and 
CUMULATIVE_UNIVERSAL_POLICY.
+/// CUMULATIVE_ORIGINAL_POLICY means current compaction policy implemented by 
original policy.
+/// CUMULATIVE_UNIVERSAL_POLICY means current comapction policy implemented by 
universal policy.
+enum CompactionPolicyType {
+    CUMULATIVE_ORIGINAL_POLICY = 0,
+    CUMULATIVE_UNIVERSAL_POLICY = 1,
+};
+
+const static std::string CUMULATIVE_ORIGINAL_POLICY_TYPE = "ORIGINAL";
+const static std::string CUMULATIVE_UNIVERSAL_POLICY_TYPE = "UNIVERSAL";
+/// This class CumulativeCompactionPolicy is the base class of cumulative 
compaction policy.
+/// It defines the policy to do cumulative compaction. It has different 
derived classes, which implements 
+/// concrete cumulative compaction algorithm. The policy is configured by 
conf::cumulative_compaction_policy.
+/// The policy functions is the main steps to do cumulative compaction. For 
example, how to pick candicate 
+/// rowsets from tablet using current policy, how to calculate the cumulative 
point and how to calculate
+/// the tablet cumulative compcation score and so on.
+class CumulativeCompactionPolicy {
+
+public:
+    /// Constructor function of CumulativeCompactionPolicy, 
+    /// it needs tablet pointer to access tablet method. 
+    /// param tablet, the shared pointer of tablet
+    CumulativeCompactionPolicy(Tablet* tablet) : _tablet(tablet){}
+
+    /// Destructor function of CumulativeCompactionPolicy.
+    virtual ~CumulativeCompactionPolicy() {}
+
+    /// Calculate the cumulative compaction score of the tablet. This function 
uses rowsets meta and current 
+    /// cumulative point to calculative the score of tablet. The score depends 
on the concrete algorithm of policy.
+    /// In general, the score represents the segments nums to do cumulative 
compaction in total rowsets. The more
+    /// score tablet gets, the earlier it can do  cumulative compaction.
+    /// param all_rowsets, all rowsets in tablet.
+    /// param current_cumulative_point, current cumulative point value.
+    /// return score, the result score after calculate.
+    virtual void calc_cumulative_compaction_score(
+            const std::vector<RowsetMetaSharedPtr>& all_rowsets, int64_t 
current_cumulative_point,
+            uint32_t* score) = 0;
+
+    /// This function implements the policy which represents how to pick the 
candicate rowsets for compaction. 
+    /// This base class gives a unified implemention. Its derived classes also 
can overide this function each other.
+    /// param skip_window_sec, it means skipping the rowsets which use create 
time plus skip_window_sec is greater than now.
+    /// param rs_version_map, mapping from version to rowset
+    /// param cumulative_point,  current cumulative point of tablet
+    /// return candidate_rowsets, the container of candidate rowsets 
+    virtual void pick_candicate_rowsets(
+            int64_t skip_window_sec,
+            std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& 
rs_version_map,
+            int64_t cumulative_point, std::vector<RowsetSharedPtr>* 
candidate_rowsets);
+    
+    /// Pick input rowsets from candidate rowsets for compaction. This 
function is pure virtual function. 
+    /// Its implemention depands on concrete compaction policy.
+    /// param candidate_rowsets, the candidate_rowsets vector container to 
pick input rowsets
+    /// return input_rowsets, the vector container as return
+    /// return last_delete_version, if has delete rowset, record the delete 
version from input_rowsets
+    /// return compaction_score, calculate the compaction score of picked 
input rowset
+    virtual int pick_input_rowsets(std::vector<RowsetSharedPtr>& 
candidate_rowsets,

Review comment:
       ```suggestion
       virtual int pick_input_rowsets(const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);

Review comment:
       This check should be done when starting BE process.

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = kInvalidCumulativePoint;
+    if (current_cumulative_point != kInvalidCumulativePoint) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();

Review comment:
       how to make sure that the first rowset is the base rowset which version 
is start from 0?
   I suggest to add a check.

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,

Review comment:
       `kInvalidCumulativePoint` can be a const static field, no need to be an 
input param

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = kInvalidCumulativePoint;
+    if (current_cumulative_point != kInvalidCumulativePoint) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+    int64_t promotion_size = 0;
+    _calc_promotion_size(*base_rowset_meta, &promotion_size);
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        // check the rowset is whether less than promotion size
+        if (rs->version().first != 0 && rs->total_disk_size() < 
promotion_size) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        prev_version = rs->version().second;
+        *ret_cumulative_point = prev_version + 1;
+    }
+    LOG(INFO) << "cumulative compaction universal policy, calculate cumulative 
point value = "
+              << *ret_cumulative_point << ", calc promotion size value = " << 
promotion_size
+              << " tablet = " << _tablet->full_name();
+}
+
+void 
UniversalCumulativeCompactionPolicy::_calc_promotion_size(RowsetMetaSharedPtr 
base_rowset_meta,
+                                                               int64_t* 
promotion_size) {
+    int64_t base_size = base_rowset_meta->total_disk_size();
+    *promotion_size = base_size * _universal_promotion_ratio;
+
+    // promotion_size is between _universal_promotion_size and 
_universal_promotion_min_size
+    if (*promotion_size >= _universal_promotion_size) {
+        *promotion_size = _universal_promotion_size;
+    } else if (*promotion_size <= _universal_promotion_min_size) {
+        *promotion_size = _universal_promotion_min_size;
+    }
+    _refresh_tablet_universal_promotion_size(*promotion_size);
+}
+
+void 
UniversalCumulativeCompactionPolicy::_refresh_tablet_universal_promotion_size(
+        int64_t promotion_size) {
+    _tablet_universal_promotion_size = promotion_size;
+}
+
+void UniversalCumulativeCompactionPolicy::update_cumulative_point(
+        std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr 
output_rowset,
+        Version& last_delete_version) {
+
+    // if rowsets have delete version, move to the last directly
+    if (last_delete_version.first != -1) {
+        _tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
+    } else {
+        // if rowsets have not delete version, check output_rowset total disk 
size 
+        // satisfies promotion size.
+        size_t total_size = output_rowset->rowset_meta()->total_disk_size();
+        if (total_size >= _tablet_universal_promotion_size) {

Review comment:
       What is the difference between `_tablet_universal_promotion_size` and 
the `promotion_size` calculated in method `_calc_promotion_size()`?

##########
File path: be/src/common/config.h
##########
@@ -265,6 +265,27 @@ namespace config {
     CONF_mInt64(base_compaction_interval_seconds_since_last_operation, 
"86400");
     CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
 
+    // config the cumulative compaction policy
+    // Valid configs: ordinary, universal
+    // ordinary policy, the original version of cumulative compaction, 
cumulative version compaction once.
+    // universal policy, a optimization version of cumulative compaction, 
targeting the use cases requiring 
+    // lower write amplification, trading off read amplification and space 
amplification.
+    CONF_String(cumulative_compaction_policy, "original");
+
+    // In universal policy, output rowset of cumulative compaction total disk 
size exceed this config size, 
+    // this rowset will be given to base compaction, unit is m byte.
+    CONF_mInt64(cumulative_compaction_universal_promotion_size_mbytes, "1024");

Review comment:
       These compaction config is not mutable at runtime because you init the 
compaction policy only once when initializing the tablet. So modify these 
configs makes no effect at runtime.
   But I think we should find a way to make it mutable at runtime.

##########
File path: be/src/olap/tablet.cpp
##########
@@ -53,14 +53,16 @@ TabletSharedPtr 
Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
     return std::make_shared<Tablet>(tablet_meta, data_dir);
 }
 
-Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
+Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
+               std::string cumulative_compaction_type) :

Review comment:
       ```suggestion
                  const std::string& cumulative_compaction_type) :
   ```

##########
File path: be/src/olap/cumulative_compaction_policy.cpp
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "olap/cumulative_compaction_policy.h"
+#include "util/time.h"
+
+#include <boost/algorithm/string.hpp>
+
+namespace doris {
+
+UniversalCumulativeCompactionPolicy::UniversalCumulativeCompactionPolicy(
+        Tablet* tablet, int64_t universal_promotion_size, double 
universal_promotion_ratio,
+        int64_t universal_promotion_min_size, int64_t 
universal_compaction_lower_bound_size)
+        : CumulativeCompactionPolicy(tablet),
+          _universal_promotion_size(universal_promotion_size),
+          _universal_promotion_ratio(universal_promotion_ratio),
+          _universal_promotion_min_size(universal_promotion_min_size),
+          
_universal_compaction_lower_bound_size(universal_compaction_lower_bound_size) {
+
+    // check universal_promotion_size must be greater than 
universal_promotion_min_size
+    CHECK(universal_promotion_size >= universal_promotion_min_size);
+    // check universal_promotion_size must be greater than 
universal_compaction_lower_bound_size twice 
+    CHECK(universal_promotion_size >= 2 * 
universal_compaction_lower_bound_size);
+
+    // init _levels by divide 2 between universal_promotion_size and 
universal_compaction_lower_bound_size
+    int64_t i_size = universal_promotion_size / 2;
+
+    while (i_size >= universal_compaction_lower_bound_size) {
+        _levels.push_back(i_size);
+        i_size /= 2;
+    }
+}
+
+void UniversalCumulativeCompactionPolicy::calculate_cumulative_point(
+        const std::vector<RowsetMetaSharedPtr>& all_metas, const int64_t 
kInvalidCumulativePoint,
+        int64_t current_cumulative_point, int64_t* ret_cumulative_point) {
+
+    *ret_cumulative_point = kInvalidCumulativePoint;
+    if (current_cumulative_point != kInvalidCumulativePoint) {
+        // only calculate the point once.
+        // after that, cumulative point will be updated along with compaction 
process.
+        return;
+    }
+    // empty return
+    if (all_metas.empty()) {
+        return;
+    }
+
+    std::list<RowsetMetaSharedPtr> existing_rss;
+    for (auto& rs : all_metas) {
+        existing_rss.emplace_back(rs);
+    }
+
+    // sort the existing rowsets by version in ascending order
+    existing_rss.sort([](const RowsetMetaSharedPtr& a, const 
RowsetMetaSharedPtr& b) {
+        // simple because 2 versions are certainly not overlapping
+        return a->version().first < b->version().first;
+    });
+
+    // calculate promotion size
+    auto base_rowset_meta = existing_rss.begin();
+    int64_t promotion_size = 0;
+    _calc_promotion_size(*base_rowset_meta, &promotion_size);
+
+    int64_t prev_version = -1;
+    for (const RowsetMetaSharedPtr& rs : existing_rss) {
+        if (rs->version().first > prev_version + 1) {
+            // There is a hole, do not continue
+            break;
+        }
+
+        // break the loop if segments in this rowset is overlapping, or is a 
singleton.
+        if (rs->is_segments_overlapping() || rs->is_singleton_delta()) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        // check the rowset is whether less than promotion size
+        if (rs->version().first != 0 && rs->total_disk_size() < 
promotion_size) {
+            *ret_cumulative_point = rs->version().first;
+            break;
+        }
+
+        prev_version = rs->version().second;
+        *ret_cumulative_point = prev_version + 1;
+    }
+    LOG(INFO) << "cumulative compaction universal policy, calculate cumulative 
point value = "

Review comment:
       There may be hundreds of thousands of tablets on a BE, which will cause 
too many logs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to