This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 588e5be  [Bug] Fix bug of cumulative compaction and deletion of stale 
version (#4593)
588e5be is described below

commit 588e5bee4793cc383dc49e73f06f6cde2e286986
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Oct 21 10:03:55 2020 +0800

    [Bug] Fix bug of cumulative compaction and deletion of stale version (#4593)
    
    When selecting candidate rowsets to do the cumulative compaction,
    some rowsets may not be selected because the protection time has not 
expired.
    Therefore, we need to find the current longest continuous version path in 
the candidate rowsets.
---
 be/src/olap/base_tablet.cpp                        |  5 ++
 be/src/olap/base_tablet.h                          |  7 ++-
 be/src/olap/compaction.cpp                         | 27 ++++++++++
 be/src/olap/compaction.h                           |  1 +
 be/src/olap/cumulative_compaction.cpp              | 14 +++++-
 be/src/olap/cumulative_compaction_policy.cpp       |  3 +-
 be/test/olap/cumulative_compaction_policy_test.cpp | 57 +++++++++++++++++++++-
 7 files changed, 105 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 507bbc66..64a3105 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, 
DataDir* data_dir)
           _data_dir(data_dir) {
     _gen_tablet_path();
 
+    std::stringstream ss;
+    ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << 
"."
+       << _tablet_meta->tablet_uid().to_string();
+    _full_name = ss.str();
+
     _metric_entity = 
DorisMetrics::instance()->metric_registry()->register_entity(
         strings::Substitute("Tablet.$0", tablet_id()),
         {{"tablet_id", std::to_string(tablet_id())}},
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 6097d23..bce485d 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -77,6 +77,8 @@ protected:
 
     // metrics of this tablet
     std::shared_ptr<MetricEntity> _metric_entity = nullptr;
+
+    std::string _full_name;
 public:
     IntCounter* query_scan_bytes;
     IntCounter* query_scan_rows;
@@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const {
 }
 
 inline const std::string BaseTablet::full_name() const {
-    std::stringstream ss;
-    ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << 
"."
-       << _tablet_meta->tablet_uid().to_string();
-    return ss.str();
+    return _full_name;
 }
 
 inline int64_t BaseTablet::partition_id() const {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c552631..7aa6b9f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -177,6 +177,33 @@ OLAPStatus Compaction::gc_unused_rowsets() {
     return OLAP_SUCCESS;
 }
 
+// Find the longest consecutive version path in "rowset", from begining.
+// Two versions before and after the missing version will be saved in 
missing_version,
+// if missing_version is not null.
+OLAPStatus Compaction::find_longest_consecutive_version(
+        vector<RowsetSharedPtr>* rowsets,
+        vector<Version>* missing_version) {
+    if (rowsets->empty()) {
+        return OLAP_SUCCESS;
+    }
+    RowsetSharedPtr prev_rowset = rowsets->front();
+    size_t i = 1;
+    for (; i < rowsets->size(); ++i) {
+        RowsetSharedPtr rowset = (*rowsets)[i];
+        if (rowset->start_version() != prev_rowset->end_version() + 1) {
+            if (missing_version != nullptr) {
+                missing_version->push_back(prev_rowset->version());
+                missing_version->push_back(rowset->version());
+            }
+            break;
+        }
+        prev_rowset = rowset;
+    }
+
+    rowsets->resize(i);
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus Compaction::check_version_continuity(const vector<RowsetSharedPtr>& 
rowsets) {
     RowsetSharedPtr prev_rowset = rowsets.front();
     for (size_t i = 1; i < rowsets.size(); ++i) {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 12adbd4..ed767a7 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -65,6 +65,7 @@ protected:
 
     OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>& 
rowsets);
     OLAPStatus check_correctness(const Merger::Statistics& stats);
+    OLAPStatus find_longest_consecutive_version(std::vector<RowsetSharedPtr>* 
rowsets, std::vector<Version>* missing_version);
 
 private:
     // get num rows from segment group meta of input rowsets.
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index 49d7c0e..25398a0 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -84,7 +84,18 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
         return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
     }
 
-    RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
+    // candidate_rowsets may not be continuous. Because some rowset may not be 
selected
+    // because the protection time has not 
expired(config::cumulative_compaction_skip_window_seconds).
+    // So we need to choose the longest continuous path from it.
+    std::vector<Version> missing_versions;
+    RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, 
&missing_versions));
+    if (!missing_versions.empty()) {
+        DCHECK(missing_versions.size() == 2);
+        LOG(WARNING) << "There are missed versions among rowsets. "
+            << "prev rowset verison=" << missing_versions[0]
+            << ", next rowset version=" << missing_versions[1]
+            << ", tablet=" << _tablet->full_name();
+    }
 
     size_t compaction_score = 0;
     int transient_size = 
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
@@ -150,4 +161,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
 }
 
 }  // namespace doris
-
diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index dc3257e..ee39fb3 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -436,7 +436,6 @@ void 
CumulativeCompactionPolicy::pick_candidate_rowsets(int64_t skip_window_sec,
         }
     }
     std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), 
Rowset::comparator);
-
 }
 
 std::unique_ptr<CumulativeCompactionPolicy> 
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string
 type) {
@@ -467,4 +466,4 @@ void 
CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std:
         *policy_type = NUM_BASED_POLICY;
     }
 }
-}
\ No newline at end of file
+}
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp 
b/be/test/olap/cumulative_compaction_policy_test.cpp
index 7819cc2..ca80100 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -20,6 +20,7 @@
 
 #include "olap/tablet_meta.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/cumulative_compaction.h"
 #include "olap/cumulative_compaction_policy.h"
 
 namespace doris {
@@ -638,6 +639,24 @@ public:
         rs_metas->push_back(ptr5);
     }
 
+    void init_rs_meta_missing_version(std::vector<RowsetMetaSharedPtr>* 
rs_metas) {
+        RowsetMetaSharedPtr ptr1(new RowsetMeta());
+        init_rs_meta(ptr1, 0, 0);
+        rs_metas->push_back(ptr1);
+
+        RowsetMetaSharedPtr ptr2(new RowsetMeta());
+        init_rs_meta(ptr2, 1, 1);
+        rs_metas->push_back(ptr2);
+
+        RowsetMetaSharedPtr ptr3(new RowsetMeta());
+        init_rs_meta(ptr3, 2, 2);
+        rs_metas->push_back(ptr3);
+
+        RowsetMetaSharedPtr ptr5(new RowsetMeta());
+        init_rs_meta(ptr5, 4, 4);
+        rs_metas->push_back(ptr5);
+    }
+
 protected:
     std::string _json_rowset_meta;
     TabletMetaSharedPtr _tablet_meta;
@@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
_level_size) {
     ASSERT_EQ(134217728, policy->_levels[2]);
     ASSERT_EQ(67108864, policy->_levels[3]);
 }
+
+TEST_F(TestSizeBasedCumulativeCompactionPolicy, 
_pick_missing_version_cumulative_compaction) {
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    init_rs_meta_missing_version(&rs_metas);
+
+    for (auto &rowset : rs_metas) {
+        _tablet_meta->add_rs_meta(rowset);
+    }
+
+    TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, 
CUMULATIVE_SIZE_BASED_POLICY));
+    _tablet->init();
+
+    // has miss version
+    std::vector<RowsetSharedPtr> rowsets;
+    rowsets.push_back(_tablet->get_rowset_by_version({0, 0}));
+    rowsets.push_back(_tablet->get_rowset_by_version({1, 1}));
+    rowsets.push_back(_tablet->get_rowset_by_version({2, 2}));
+    rowsets.push_back(_tablet->get_rowset_by_version({4, 4}));
+    std::shared_ptr<MemTracker> mem_tracker(new MemTracker());
+    CumulativeCompaction compaction(_tablet, "label", mem_tracker);
+    compaction.find_longest_consecutive_version(&rowsets);
+    ASSERT_EQ(3, rowsets.size());
+    ASSERT_EQ(2, rowsets[2]->end_version());
+
+    // no miss version
+    std::vector<RowsetSharedPtr> rowsets2;
+    rowsets2.push_back(_tablet->get_rowset_by_version({0, 0}));
+    compaction.find_longest_consecutive_version(&rowsets2);
+    ASSERT_EQ(1, rowsets2.size());
+    ASSERT_EQ(0, rowsets[0]->end_version());
+
+    // no version
+    std::vector<RowsetSharedPtr> rowsets3;
+    compaction.find_longest_consecutive_version(&rowsets3);
+    ASSERT_EQ(0, rowsets3.size());
+}
 }
 
 // @brief Test Stub
 int main(int argc, char** argv) {
     testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS(); 
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to