This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d1449e6b980 Pick "[Bug](ScanNode) Fix potential incorrect query result 
caused by concurrent NewOlapScanNode initialization and Compaction (#24638)" 
(#25127)
d1449e6b980 is described below

commit d1449e6b9800f51dd5b3f849f81893efb5a005bf
Author: plat1ko <[email protected]>
AuthorDate: Tue Oct 17 09:37:06 2023 +0800

    Pick "[Bug](ScanNode) Fix potential incorrect query result caused by 
concurrent NewOlapScanNode initialization and Compaction (#24638)" (#25127)
---
 be/src/olap/merger.cpp                        |  46 +++--
 be/src/olap/reader.cpp                        |  24 ++-
 be/src/olap/reader.h                          |  13 +-
 be/src/olap/schema_change.cpp                 |  13 +-
 be/src/olap/tablet.cpp                        |   4 -
 be/src/olap/tablet.h                          |   5 -
 be/src/olap/tablet_meta.cpp                   |  24 ---
 be/src/olap/tablet_meta.h                     |   2 -
 be/src/olap/tablet_schema.cpp                 |   6 +-
 be/src/olap/tablet_schema.h                   |   2 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp   |  73 +++----
 be/src/vec/exec/scan/new_olap_scanner.cpp     |  70 +++----
 be/src/vec/exec/scan/new_olap_scanner.h       |   3 +-
 be/test/olap/delete_handler_test.cpp          |  22 ++-
 be/test/olap/rowid_conversion_test.cpp        | 167 +++++++---------
 be/test/vec/olap/vertical_compaction_test.cpp | 262 ++++++++++++--------------
 16 files changed, 339 insertions(+), 397 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 587d5326a2e..734b81794fb 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -60,24 +60,23 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, 
ReaderType reader_type,
     TabletReader::ReaderParams reader_params;
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
-    reader_params.rs_splits.reserve(src_rowset_readers.size());
+
+    TabletReader::ReadSource read_source;
+    read_source.rs_splits.reserve(src_rowset_readers.size());
     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
-        reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+        read_source.rs_splits.emplace_back(RowSetSplits(rs_reader));
     }
+    read_source.fill_delete_predicates();
+    reader_params.set_read_source(std::move(read_source));
+
     reader_params.version = dst_rowset_writer->version();
 
     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
     merge_tablet_schema->copy_from(*cur_tablet_schema);
-    {
-        std::shared_lock rdlock(tablet->get_header_lock());
-        auto delete_preds = tablet->delete_predicates();
-        std::copy(delete_preds.cbegin(), delete_preds.cend(),
-                  std::inserter(reader_params.delete_predicates,
-                                reader_params.delete_predicates.begin()));
-        // Merge the columns in delete predicate that not in latest schema in 
to current tablet schema
-        for (auto& del_pred_rs : reader_params.delete_predicates) {
-            
merge_tablet_schema->merge_dropped_columns(del_pred_rs->tablet_schema());
-        }
+
+    // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
+    for (auto& del_pred_rs : reader_params.delete_predicates) {
+        
merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
     }
     reader_params.tablet_schema = merge_tablet_schema;
 
@@ -196,25 +195,24 @@ Status Merger::vertical_compact_one_group(
     reader_params.is_key_column_group = is_key;
     reader_params.tablet = tablet;
     reader_params.reader_type = reader_type;
-    reader_params.rs_splits.reserve(src_rowset_readers.size());
+
+    TabletReader::ReadSource read_source;
+    read_source.rs_splits.reserve(src_rowset_readers.size());
     for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
-        reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+        read_source.rs_splits.emplace_back(RowSetSplits(rs_reader));
     }
+    read_source.fill_delete_predicates();
+    reader_params.set_read_source(std::move(read_source));
+
     reader_params.version = dst_rowset_writer->version();
 
     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
     merge_tablet_schema->copy_from(*tablet_schema);
-    {
-        std::shared_lock rdlock(tablet->get_header_lock());
-        auto delete_preds = tablet->delete_predicates();
-        std::copy(delete_preds.cbegin(), delete_preds.cend(),
-                  std::inserter(reader_params.delete_predicates,
-                                reader_params.delete_predicates.begin()));
-
-        for (auto& del_pred_rs : reader_params.delete_predicates) {
-            
merge_tablet_schema->merge_dropped_columns(del_pred_rs->tablet_schema());
-        }
+
+    for (auto& del_pred_rs : reader_params.delete_predicates) {
+        
merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
     }
+
     reader_params.tablet_schema = merge_tablet_schema;
 
     if (is_key && stats_output && stats_output->rowid_conversion) {
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 573570bbcbc..43cdf4e637d 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -98,6 +98,16 @@ std::string TabletReader::KeysParam::to_string() const {
     return ss.str();
 }
 
+void TabletReader::ReadSource::fill_delete_predicates() {
+    DCHECK_EQ(delete_predicates.size(), 0);
+    for (auto&& split : rs_splits) {
+        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+        if (rs_meta->has_delete_predicate()) {
+            delete_predicates.push_back(rs_meta);
+        }
+    }
+}
+
 TabletReader::~TabletReader() {
     VLOG_NOTICE << "merged rows:" << _merged_rows;
     _delete_handler.finalize();
@@ -630,11 +640,14 @@ Status TabletReader::init_reader_params_and_create_block(
     reader_params->version =
             Version(input_rowsets.front()->start_version(), 
input_rowsets.back()->end_version());
 
+    ReadSource read_source;
     for (auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
         RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
-        reader_params->rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
+        read_source.rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
     }
+    read_source.fill_delete_predicates();
+    reader_params->set_read_source(std::move(read_source));
 
     std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
     std::transform(input_rowsets.begin(), input_rowsets.end(), 
rowset_metas.begin(),
@@ -644,14 +657,9 @@ Status TabletReader::init_reader_params_and_create_block(
     TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
     merge_tablet_schema->copy_from(*read_tablet_schema);
 
-    auto& delete_preds = tablet->delete_predicates();
-    std::copy(delete_preds.cbegin(), delete_preds.cend(),
-              std::inserter(reader_params->delete_predicates,
-                            reader_params->delete_predicates.begin()));
-
     // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
-    for (auto& del_pred_pb : reader_params->delete_predicates) {
-        
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_pb->version()));
+    for (auto& del_pred : reader_params->delete_predicates) {
+        merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
     }
     reader_params->tablet_schema = merge_tablet_schema;
     if (tablet->enable_unique_key_merge_on_write()) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 249c1997811..b053f710f05 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -90,6 +90,12 @@ class TabletReader {
     };
 
 public:
+    struct ReadSource {
+        std::vector<RowSetSplits> rs_splits;
+        std::vector<RowsetMetaSharedPtr> delete_predicates;
+        // Fill delete predicates with `rs_splits`
+        void fill_delete_predicates();
+    };
     // Params for Reader,
     // mainly include tablet, data version and fetch range.
     struct ReaderParams {
@@ -103,6 +109,11 @@ public:
                     
!rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
         }
 
+        void set_read_source(ReadSource read_source) {
+            rs_splits = std::move(read_source.rs_splits);
+            delete_predicates = std::move(read_source.delete_predicates);
+        }
+
         TabletSharedPtr tablet;
         TabletSchemaSPtr tablet_schema;
         ReaderType reader_type = ReaderType::READER_QUERY;
@@ -126,9 +137,9 @@ public:
         std::vector<FunctionFilter> function_filters;
         std::vector<RowsetMetaSharedPtr> delete_predicates;
 
+        std::vector<RowSetSplits> rs_splits;
         // For unique key table with merge-on-write
         DeleteBitmap* delete_bitmap {nullptr};
-        std::vector<RowSetSplits> rs_splits;
 
         // return_columns is init from query schema
         std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 9650e1a400e..33a9f6fee87 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -822,15 +822,16 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                         versions_to_be_changed.size(), rs_splits.size());
                 break;
             }
-            auto& all_del_preds = base_tablet->delete_predicates();
-            for (auto& delete_pred : all_del_preds) {
-                if (delete_pred->version().first > end_version) {
+            std::vector<RowsetMetaSharedPtr> del_preds;
+            for (auto&& split : rs_splits) {
+                auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+                if (!rs_meta->has_delete_predicate() || 
rs_meta->start_version() > end_version) {
                     continue;
                 }
-                base_tablet_schema->merge_dropped_columns(
-                        base_tablet->tablet_schema(delete_pred->version()));
+                
base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
+                del_preds.push_back(rs_meta);
             }
-            res = delete_handler.init(base_tablet_schema, all_del_preds, 
end_version);
+            res = delete_handler.init(base_tablet_schema, del_preds, 
end_version);
             if (!res) {
                 LOG(WARNING) << "init delete handler failed. base_tablet="
                              << base_tablet->full_name() << ", end_version=" 
<< end_version;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1f46cf2b99f..2ed3571575b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -979,10 +979,6 @@ Status Tablet::capture_rs_readers(const 
std::vector<Version>& version_path,
     return Status::OK();
 }
 
-bool Tablet::version_for_delete_predicate(const Version& version) {
-    return _tablet_meta->version_for_delete_predicate(version);
-}
-
 bool Tablet::can_do_compaction(size_t path_hash, CompactionType 
compaction_type) {
     if (compaction_type == CompactionType::BASE_COMPACTION && tablet_state() 
!= TABLET_RUNNING) {
         // base compaction can only be done for tablet in TABLET_RUNNING state.
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index bbdeb411f44..d57b043857d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -191,11 +191,6 @@ public:
     Status capture_rs_readers(const std::vector<Version>& version_path,
                               std::vector<RowSetSplits>* rs_splits) const;
 
-    const std::vector<RowsetMetaSharedPtr> delete_predicates() {
-        return _tablet_meta->delete_predicates();
-    }
-    bool version_for_delete_predicate(const Version& version);
-
     // meta lock
     std::shared_mutex& get_header_lock() { return _meta_lock; }
     std::mutex& get_rowset_update_lock() { return _rowset_update_lock; }
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 58e2876d81a..b407e35a449 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -844,30 +844,6 @@ RowsetMetaSharedPtr 
TabletMeta::acquire_stale_rs_meta_by_version(const Version&
     return nullptr;
 }
 
-const std::vector<RowsetMetaSharedPtr> TabletMeta::delete_predicates() const {
-    std::vector<RowsetMetaSharedPtr> res;
-    for (auto& del_pred : _rs_metas) {
-        if (del_pred->has_delete_predicate()) {
-            res.push_back(del_pred);
-        }
-    }
-    return res;
-}
-
-bool TabletMeta::version_for_delete_predicate(const Version& version) {
-    if (version.first != version.second) {
-        return false;
-    }
-
-    for (auto& del_pred : _rs_metas) {
-        if (del_pred->version().first == version.first && 
del_pred->has_delete_predicate()) {
-            return true;
-        }
-    }
-
-    return false;
-}
-
 std::string TabletMeta::full_name() const {
     std::stringstream ss;
     ss << _tablet_id << "." << _schema_hash << "." << _tablet_uid.to_string();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 42fec6489b2..11dc3532514 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -191,8 +191,6 @@ public:
     RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version) 
const;
     void delete_stale_rs_meta_by_version(const Version& version);
     RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version& 
version) const;
-    const std::vector<RowsetMetaSharedPtr> delete_predicates() const;
-    bool version_for_delete_predicate(const Version& version);
 
     std::string full_name() const;
 
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 9fc1c2eb0f3..b93f2fb849f 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -815,12 +815,12 @@ void TabletSchema::build_current_tablet_schema(int64_t 
index_id, int32_t version
     }
 }
 
-void TabletSchema::merge_dropped_columns(TabletSchemaSPtr src_schema) {
+void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) {
     // If they are the same tablet schema object, then just return
-    if (this == src_schema.get()) {
+    if (this == &src_schema) {
         return;
     }
-    for (const auto& src_col : src_schema->columns()) {
+    for (const auto& src_col : src_schema.columns()) {
         if (_field_id_to_index.find(src_col.unique_id()) == 
_field_id_to_index.end()) {
             CHECK(!src_col.is_key()) << src_col.name() << " is key column, 
should not be dropped.";
             ColumnPB src_col_pb;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 715af14191e..d04b009cee6 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -291,7 +291,7 @@ public:
     // 7. insert value  4, 5
     // Then the read schema should be ColA, ColB, ColB' because the delete 
predicate need ColB to remove related data.
     // Because they have same name, so that the dropped column should not be 
added to the map, only with unique id.
-    void merge_dropped_columns(std::shared_ptr<TabletSchema> src_schema);
+    void merge_dropped_columns(const TabletSchema& src_schema);
 
     bool is_dropped_column(const TabletColumn& col) const;
 
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 13b8e92119a..5aa179d3006 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -38,6 +38,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "exec/exec_node.h"
+#include "olap/reader.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/storage_engine.h"
@@ -446,15 +447,16 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
 
     bool is_dup_mow_key = false;
     size_t segment_count = 0;
-    std::vector<std::vector<RowSetSplits>> 
rowset_splits_vector(_scan_ranges.size());
-    std::vector<std::vector<size_t>> tablet_rs_seg_count(_scan_ranges.size());
+    std::vector<TabletReader::ReadSource> tablets_read_source;
+    tablets_read_source.reserve(_scan_ranges.size());
+    std::vector<std::vector<size_t>> tablet_rs_seg_count;
+    tablet_rs_seg_count.reserve(_scan_ranges.size());
 
     // Split tablet segment by scanner, only use in pipeline in duplicate key
     // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
     // TODO: some tablet may do not have segment, may need split segment all 
case
     if (_shared_scan_opt && _scan_ranges.size() < 
config::doris_scanner_thread_pool_thread_num) {
-        for (int i = 0; i < _scan_ranges.size(); ++i) {
-            auto& scan_range = _scan_ranges[i];
+        for (auto&& scan_range : _scan_ranges) {
             auto tablet_id = scan_range->tablet_id;
             auto [tablet, status] =
                     
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
@@ -472,24 +474,25 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
             std::from_chars(scan_range->version.c_str(),
                             scan_range->version.c_str() + 
scan_range->version.size(), version);
 
-            std::shared_lock rdlock(tablet->get_header_lock());
-            // acquire tablet rowset readers at the beginning of the scan node
-            // to prevent this case: when there are lots of olap scanners to 
run for example 10000
-            // the rowsets maybe compacted when the last olap scanner starts
-            Status acquire_reader_st =
-                    tablet->capture_rs_readers({0, version}, 
&rowset_splits_vector[i]);
-            if (!acquire_reader_st.ok()) {
-                LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
-                std::stringstream ss;
-                ss << "failed to initialize storage reader. tablet=" << 
tablet->full_name()
-                   << ", res=" << acquire_reader_st
-                   << ", backend=" << BackendOptions::get_localhost();
-                return Status::InternalError(ss.str());
+            auto& read_source = tablets_read_source.emplace_back();
+            {
+                std::shared_lock rdlock(tablet->get_header_lock());
+                auto st = tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits);
+                if (!st.ok()) {
+                    LOG(WARNING) << "fail to init reader.res=" << st;
+                    return Status::InternalError(
+                            "failed to initialize storage reader. tablet_id={} 
: {}",
+                            tablet->tablet_id(), st.to_string());
+                }
+            }
+            if (!_state->skip_delete_predicate()) {
+                read_source.fill_delete_predicates();
             }
 
-            for (const auto& rowset_splits : rowset_splits_vector[i]) {
+            auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
+            for (const auto& rowset_splits : read_source.rs_splits) {
                 auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
-                tablet_rs_seg_count[i].emplace_back(num_segments);
+                rs_seg_count.emplace_back(num_segments);
                 segment_count += num_segments;
             }
         }
@@ -498,14 +501,14 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     if (is_dup_mow_key) {
         auto build_new_scanner = [&](const TPaloScanRange& scan_range,
                                      const std::vector<OlapScanRange*>& 
key_ranges,
-                                     const std::vector<RowSetSplits>& 
rs_splits) {
+                                     TabletReader::ReadSource read_source) {
             std::shared_ptr<NewOlapScanner> scanner = 
NewOlapScanner::create_shared(
                     _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation, scan_range,
-                    key_ranges, rs_splits, _scanner_profile.get());
+                    key_ranges, std::move(read_source), 
_scanner_profile.get());
 
             RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
             scanner->set_compound_filters(_compound_filters);
-            scanners->push_back(scanner);
+            scanners->push_back(std::move(scanner));
             return Status::OK();
         };
         // 2. Split segment evenly to each scanner (e.g. each scanner need to 
scan `avg_segment_count_per_scanner` segments)
@@ -527,6 +530,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
             size_t segment_idx_to_scan = 0;
             size_t num_segments_assigned = 0;
 
+            auto& read_source = tablets_read_source[i];
             std::vector<RowSetSplits> rs_splits;
 
             while (rowset_idx < rs_seg_count.size()) {
@@ -538,35 +542,35 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 }
 
                 const auto max_add_seg_nums = rs_seg_count[rowset_idx] - 
segment_idx_to_scan;
-                rs_splits.emplace_back(RowSetSplits());
-                rs_splits.back().rs_reader = 
rowset_splits_vector[i][rowset_idx].rs_reader->clone();
+                auto& split = rs_splits.emplace_back();
+                split.rs_reader = 
read_source.rs_splits[rowset_idx].rs_reader->clone();
 
                 // if segments assigned to current scanner are already more 
than the average count,
                 // this scanner will just scan segments equal to the average 
count
                 if (num_segments_assigned + max_add_seg_nums > 
avg_segment_count_by_scanner) {
                     auto need_add_seg_nums = avg_segment_count_by_scanner - 
num_segments_assigned;
-                    rs_splits.back().segment_offsets = {
+                    split.segment_offsets = {
                             segment_idx_to_scan,
                             segment_idx_to_scan + need_add_seg_nums}; // only 
scan need_add_seg_nums
 
-                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_splits));
+                    RETURN_IF_ERROR(build_new_scanner(
+                            *scan_range, scanner_ranges,
+                            {std::move(rs_splits), 
read_source.delete_predicates}));
 
                     segment_idx_to_scan += need_add_seg_nums;
                     num_segments_assigned = 0;
-                    rs_splits.clear();
                 } else if (num_segments_assigned + max_add_seg_nums ==
                            avg_segment_count_by_scanner) {
-                    rs_splits.back().segment_offsets = {segment_idx_to_scan,
-                                                        
rs_seg_count[rowset_idx]};
-                    RETURN_IF_ERROR(build_new_scanner(*scan_range, 
scanner_ranges, rs_splits));
+                    split.segment_offsets = {segment_idx_to_scan, 
rs_seg_count[rowset_idx]};
+                    RETURN_IF_ERROR(build_new_scanner(
+                            *scan_range, scanner_ranges,
+                            {std::move(rs_splits), 
read_source.delete_predicates}));
 
                     segment_idx_to_scan = 0;
                     num_segments_assigned = 0;
-                    rs_splits.clear();
                     rowset_idx++;
                 } else {
-                    rs_splits.back().segment_offsets = {segment_idx_to_scan,
-                                                        
rs_seg_count[rowset_idx]};
+                    split.segment_offsets = {segment_idx_to_scan, 
rs_seg_count[rowset_idx]};
 
                     segment_idx_to_scan = 0;
                     num_segments_assigned += max_add_seg_nums;
@@ -583,7 +587,8 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
 
             // dispose some segment tail
             if (!rs_splits.empty()) {
-                build_new_scanner(*scan_range, scanner_ranges, rs_splits);
+                build_new_scanner(*scan_range, scanner_ranges,
+                                  {std::move(rs_splits), 
read_source.delete_predicates});
             }
         }
     } else {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 232493b66f4..ea4187e556a 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -60,6 +60,8 @@
 
 namespace doris::vectorized {
 
+using ReadSource = TabletReader::ReadSource;
+
 NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
                                bool aggregation, const TPaloScanRange& 
scan_range,
                                const std::vector<OlapScanRange*>& key_ranges,
@@ -76,13 +78,13 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state, 
NewOlapScanNode* parent, int
 NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
                                bool aggregation, const TPaloScanRange& 
scan_range,
                                const std::vector<OlapScanRange*>& key_ranges,
-                               const std::vector<RowSetSplits>& rs_splits, 
RuntimeProfile* profile)
+                               ReadSource read_source, RuntimeProfile* profile)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _aggregation(aggregation),
           _version(-1),
           _scan_range(scan_range),
           _key_ranges(key_ranges) {
-    _tablet_reader_params.rs_splits = rs_splits;
+    _tablet_reader_params.set_read_source(std::move(read_source));
     _tablet_schema = std::make_shared<TabletSchema>();
     _is_init = false;
 }
@@ -175,38 +177,33 @@ Status NewOlapScanner::init() {
             }
         }
 
-        {
-            std::shared_lock rdlock(_tablet->get_header_lock());
-            if (_tablet_reader_params.rs_splits.empty()) {
-                const RowsetSharedPtr rowset = 
_tablet->rowset_with_max_version();
-                if (rowset == nullptr) {
-                    std::stringstream ss;
-                    ss << "fail to get latest version of tablet: " << 
tablet_id;
-                    LOG(WARNING) << ss.str();
-                    return Status::InternalError(ss.str());
-                }
-
-                // acquire tablet rowset readers at the beginning of the scan 
node
-                // to prevent this case: when there are lots of olap scanners 
to run for example 10000
-                // the rowsets maybe compacted when the last olap scanner 
starts
-                Version rd_version(0, _version);
-                Status acquire_reader_st =
-                        _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_splits);
-                if (!acquire_reader_st.ok()) {
-                    LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
-                    std::stringstream ss;
-                    ss << "failed to initialize storage reader. tablet=" << 
_tablet->full_name()
-                       << ", res=" << acquire_reader_st
-                       << ", backend=" << BackendOptions::get_localhost();
-                    return Status::InternalError(ss.str());
+        if (_tablet_reader_params.rs_splits.empty()) {
+            // Non-pipeline mode, Tablet : Scanner = 1 : 1
+            // acquire tablet rowset readers at the beginning of the scan node
+            // to prevent this case: when there are lots of olap scanners to 
run for example 10000
+            // the rowsets maybe compacted when the last olap scanner starts
+            Version rd_version(0, _version);
+            ReadSource read_source;
+            {
+                std::shared_lock rdlock(_tablet->get_header_lock());
+                auto st = _tablet->capture_rs_readers(rd_version, 
&read_source.rs_splits);
+                if (!st.ok()) {
+                    LOG(WARNING) << "fail to init reader.res=" << st;
+                    return Status::InternalError(
+                            "failed to initialize storage reader. tablet_id={} 
: {}",
+                            _tablet->tablet_id(), st.to_string());
                 }
             }
-
-            // Initialize tablet_reader_params
-            RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges, 
parent->_olap_filters,
-                                                       
parent->_filter_predicates,
-                                                       
parent->_push_down_functions));
+            if (!_state->skip_delete_predicate()) {
+                read_source.fill_delete_predicates();
+            }
+            _tablet_reader_params.set_read_source(std::move(read_source));
         }
+
+        // Initialize tablet_reader_params
+        RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges, 
parent->_olap_filters,
+                                                   parent->_filter_predicates,
+                                                   
parent->_push_down_functions));
     }
 
     // add read columns in profile
@@ -306,16 +303,9 @@ Status NewOlapScanner::_init_tablet_reader_params(
               std::inserter(_tablet_reader_params.function_filters,
                             _tablet_reader_params.function_filters.begin()));
 
-    if (!_state->skip_delete_predicate()) {
-        auto& delete_preds = _tablet->delete_predicates();
-        std::copy(delete_preds.cbegin(), delete_preds.cend(),
-                  std::inserter(_tablet_reader_params.delete_predicates,
-                                
_tablet_reader_params.delete_predicates.begin()));
-    }
-
     // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
-    for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
-        
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_pb->version()));
+    for (auto& del_pred : _tablet_reader_params.delete_predicates) {
+        _tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
     }
 
     // Range
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 41dd4e416c0..6ecb6a1dc5d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "olap/data_dir.h"
 #include "olap/reader.h"
+#include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
@@ -59,7 +60,7 @@ public:
 
     NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t 
limit, bool aggregation,
                    const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
-                   const std::vector<RowSetSplits>& rs_splits, RuntimeProfile* 
profile);
+                   TabletReader::ReadSource read_source, RuntimeProfile* 
profile);
 
     Status init() override;
 
diff --git a/be/test/olap/delete_handler_test.cpp 
b/be/test/olap/delete_handler_test.cpp
index c2aa5f1efeb..792fc18f78c 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -909,6 +909,16 @@ protected:
         tablet->add_rowset(rowset);
     }
 
+    std::vector<RowsetMetaSharedPtr> get_delete_predicates() {
+        std::vector<RowsetMetaSharedPtr> delete_preds;
+        for (auto&& rs_meta : tablet->tablet_meta()->_rs_metas) {
+            if (rs_meta->has_delete_predicate()) {
+                delete_preds.push_back(rs_meta);
+            }
+        }
+        return delete_preds;
+    }
+
     std::string _tablet_path;
     RowCursor _data_row_cursor;
     TabletSharedPtr tablet;
@@ -929,7 +939,7 @@ TEST_F(TestDeleteHandler, ValueWithQuote) {
 
     add_delete_predicate(del_predicate, 2);
 
-    auto res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 5);
+    auto res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 5);
     EXPECT_EQ(Status::OK(), res);
     _delete_handler.finalize();
 }
@@ -942,7 +952,7 @@ TEST_F(TestDeleteHandler, ValueWithoutQuote) {
 
     add_delete_predicate(del_predicate, 2);
 
-    auto res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 5);
+    auto res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 5);
     EXPECT_EQ(Status::OK(), res);
     _delete_handler.finalize();
 }
@@ -1016,7 +1026,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
     add_delete_predicate(del_pred_4, 5);
 
     // Get delete conditions which version <= 5
-    res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 5);
+    res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 5);
     EXPECT_EQ(Status::OK(), res);
     _delete_handler.finalize();
 }
@@ -1048,7 +1058,7 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
     add_delete_predicate(del_pred, 2);
 
     // 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1)
-    res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 4);
+    res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 4);
     EXPECT_EQ(Status::OK(), res);
 
     // 构造一行测试数据
@@ -1133,7 +1143,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
     add_delete_predicate(del_pred_3, 4);
 
     // 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1)
-    res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 4);
+    res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 4);
     EXPECT_EQ(Status::OK(), res);
 
     std::vector<string> data_str;
@@ -1196,7 +1206,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
     add_delete_predicate(del_pred_2, 4);
 
     // 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2)
-    res = _delete_handler.init(tablet->tablet_schema(), 
tablet->delete_predicates(), 4);
+    res = _delete_handler.init(tablet->tablet_schema(), 
get_delete_predicates(), 4);
     EXPECT_EQ(Status::OK(), res);
 
     // 构造一行测试数据
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index 95b34cf0f88..e6bb0efeb46 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -65,6 +65,18 @@ using namespace ErrorCode;
 static const uint32_t MAX_PATH_LEN = 1024;
 static StorageEngine* k_engine = nullptr;
 
+static constexpr int64_t tablet_id = 10005;
+
+static RowsetId next_rowset_id() {
+    // FIXME(plat1ko): If `inc_id` set to 1000, and run 
`VerticalCompactionTest` before `TestRowIdConversion`,
+    // will `TestRowIdConversion` will fail. There may be some strange global 
states here.
+    static int64_t inc_id = 0;
+    RowsetId rowset_id;
+    inc_id++;
+    rowset_id.init(inc_id);
+    return rowset_id;
+}
+
 class TestRowIdConversion : public testing::TestWithParam<std::tuple<KeysType, 
bool, bool, bool>> {
 protected:
     void SetUp() override {
@@ -137,22 +149,21 @@ protected:
         return tablet_schema;
     }
 
-    void create_rowset_writer_context(TabletSchemaSPtr tablet_schema,
-                                      const SegmentsOverlapPB& overlap,
-                                      uint32_t max_rows_per_segment,
-                                      RowsetWriterContext* 
rowset_writer_context) {
-        static int64_t inc_id = 0;
-        RowsetId rowset_id;
-        rowset_id.init(inc_id);
-        rowset_writer_context->rowset_id = rowset_id;
-        rowset_writer_context->rowset_type = BETA_ROWSET;
-        rowset_writer_context->rowset_state = VISIBLE;
-        rowset_writer_context->tablet_schema = tablet_schema;
-        rowset_writer_context->rowset_dir = absolute_dir + "/tablet_path";
-        rowset_writer_context->version = Version(inc_id, inc_id);
-        rowset_writer_context->segments_overlap = overlap;
-        rowset_writer_context->max_rows_per_segment = max_rows_per_segment;
-        inc_id++;
+    RowsetWriterContext create_rowset_writer_context(TabletSchemaSPtr 
tablet_schema,
+                                                     const SegmentsOverlapPB& 
overlap,
+                                                     uint32_t 
max_rows_per_segment,
+                                                     Version version) {
+        RowsetWriterContext rowset_writer_context;
+        rowset_writer_context.tablet_id = tablet_id;
+        rowset_writer_context.rowset_id = next_rowset_id();
+        rowset_writer_context.rowset_type = BETA_ROWSET;
+        rowset_writer_context.rowset_state = VISIBLE;
+        rowset_writer_context.tablet_schema = tablet_schema;
+        rowset_writer_context.rowset_dir = absolute_dir + "/tablet_path";
+        rowset_writer_context.version = version;
+        rowset_writer_context.segments_overlap = overlap;
+        rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
+        return rowset_writer_context;
     }
 
     void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& 
context,
@@ -167,8 +178,7 @@ protected:
 
     RowsetSharedPtr create_rowset(
             TabletSchemaSPtr tablet_schema, const SegmentsOverlapPB& overlap,
-            std::vector<std::vector<std::tuple<int64_t, int64_t>>> 
rowset_data) {
-        RowsetWriterContext writer_context;
+            std::vector<std::vector<std::tuple<int64_t, int64_t>>> 
rowset_data, int64_t version) {
         if (overlap == NONOVERLAPPING) {
             for (auto i = 1; i < rowset_data.size(); i++) {
                 auto& last_seg_data = rowset_data[i - 1];
@@ -178,8 +188,8 @@ protected:
                 EXPECT_LT(last_seg_max, cur_seg_min);
             }
         }
-        create_rowset_writer_context(tablet_schema, overlap, UINT32_MAX, 
&writer_context);
-
+        auto writer_context = create_rowset_writer_context(tablet_schema, 
overlap, UINT32_MAX,
+                                                           {version, version});
         std::unique_ptr<RowsetWriter> rowset_writer;
         Status s = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
         EXPECT_TRUE(s.ok());
@@ -214,51 +224,21 @@ protected:
         return rowset;
     }
 
-    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
-        std::string json_rowset_meta = R"({
-            "rowset_id": 540081,
-            "tablet_id": 15673,
-            "txn_id": 4042,
-            "tablet_schema_hash": 567997577,
-            "rowset_type": "BETA_ROWSET",
-            "rowset_state": "VISIBLE",
-            "start_version": 2,
-            "end_version": 2,
-            "num_rows": 3929,
-            "total_disk_size": 84699,
-            "data_disk_size": 84464,
-            "index_disk_size": 235,
-            "empty": false,
-            "load_id": {
-                "hi": -5350970832824939812,
-                "lo": -6717994719194512122
-            },
-            "creation_time": 1553765670
-        })";
-        RowsetMetaPB rowset_meta_pb;
-        json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
-        rowset_meta_pb.set_start_version(start);
-        rowset_meta_pb.set_end_version(end);
-        rowset_meta_pb.set_creation_time(10000);
-        pb1->init_from_pb(rowset_meta_pb);
-    }
-
-    void add_delete_predicate(TabletSharedPtr tablet, DeletePredicatePB& 
del_pred,
-                              int64_t version) {
-        RowsetMetaSharedPtr rsm(new RowsetMeta());
-        init_rs_meta(rsm, version, version);
-        RowsetId id;
-        id.init(version * 1000);
-        rsm->set_rowset_id(id);
-        rsm->set_delete_predicate(del_pred);
-        rsm->set_tablet_schema(tablet->tablet_schema());
-        RowsetSharedPtr rowset = 
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
-        tablet->add_rowset(rowset);
+    RowsetSharedPtr create_delete_predicate(const TabletSchemaSPtr& schema,
+                                            DeletePredicatePB del_pred, 
int64_t version) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_tablet_id(tablet_id);
+        rs_meta->set_rowset_id(next_rowset_id());
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_rowset_state(VISIBLE);
+        rs_meta->set_delete_predicate(std::move(del_pred));
+        rs_meta->set_tablet_schema(schema);
+        rs_meta->set_version({version, version});
+        return std::make_shared<BetaRowset>(schema, "", std::move(rs_meta));
     }
 
     TabletSharedPtr create_tablet(const TabletSchema& tablet_schema,
-                                  bool enable_unique_key_merge_on_write, 
int64_t version,
-                                  bool has_delete_handler) {
+                                  bool enable_unique_key_merge_on_write) {
         std::vector<TColumn> cols;
         std::unordered_map<uint32_t, uint32_t> col_ordinal_to_unique_id;
         for (auto i = 0; i < tablet_schema.num_columns(); i++) {
@@ -281,29 +261,13 @@ protected:
         }
         t_tablet_schema.__set_storage_type(TStorageType::COLUMN);
         t_tablet_schema.__set_columns(cols);
-        TabletMetaSharedPtr tablet_meta(
-                new TabletMeta(1, 1, 1, 1, 1, 1, t_tablet_schema, 1, 
col_ordinal_to_unique_id,
-                               UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
-                               TCompressionType::LZ4F, 0, 
enable_unique_key_merge_on_write));
+        TabletMetaSharedPtr tablet_meta(new TabletMeta(
+                1, 1, tablet_id, 1, 1, 1, t_tablet_schema, 1, 
col_ordinal_to_unique_id,
+                UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK, 
TCompressionType::LZ4F, 0,
+                enable_unique_key_merge_on_write));
 
         TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
         tablet->init();
-        if (has_delete_handler) {
-            // delete data with key < 1000
-            std::vector<TCondition> conditions;
-            TCondition condition;
-            condition.column_name = tablet_schema.column(0).name();
-            condition.condition_op = "<";
-            condition.condition_values.clear();
-            condition.condition_values.push_back("1000");
-            conditions.push_back(condition);
-
-            DeletePredicatePB del_pred;
-            Status st =
-                    DeleteHandler::generate_delete_predicate(tablet_schema, 
conditions, &del_pred);
-            EXPECT_EQ(Status::OK(), st);
-            add_delete_predicate(tablet, del_pred, version);
-        }
         return tablet;
     }
 
@@ -327,13 +291,30 @@ protected:
                     new_overlap = OVERLAPPING;
                 }
             }
-            RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
+            RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i], i);
             input_rowsets.push_back(rowset);
         }
+        if (has_delete_handler) {
+            // delete data with key < 1000
+            std::vector<TCondition> conditions;
+            TCondition condition;
+            condition.column_name = tablet_schema->column(0).name();
+            condition.condition_op = "<";
+            condition.condition_values.clear();
+            condition.condition_values.push_back("1000");
+            conditions.push_back(condition);
+
+            DeletePredicatePB del_pred;
+            Status st =
+                    DeleteHandler::generate_delete_predicate(*tablet_schema, 
conditions, &del_pred);
+            ASSERT_TRUE(st.ok()) << st;
+            input_rowsets.push_back(
+                    create_delete_predicate(tablet_schema, del_pred, 
num_input_rowset));
+        }
 
         // create output rowset writer
-        RowsetWriterContext writer_context;
-        create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+        auto writer_context = create_rowset_writer_context(
+                tablet_schema, NONOVERLAPPING, 3456, {0, 
input_rowsets.back()->end_version()});
         std::unique_ptr<RowsetWriter> output_rs_writer;
         Status s;
         if (is_vertical_merger) {
@@ -341,12 +322,10 @@ protected:
         } else {
             s = RowsetFactory::create_rowset_writer(writer_context, false, 
&output_rs_writer);
         }
-        EXPECT_TRUE(s.ok());
+        ASSERT_TRUE(s.ok()) << s;
 
         // merge input rowset
-        TabletSharedPtr tablet =
-                create_tablet(*tablet_schema, enable_unique_key_merge_on_write,
-                              output_rs_writer->version().first - 1, 
has_delete_handler);
+        TabletSharedPtr tablet = create_tablet(*tablet_schema, 
enable_unique_key_merge_on_write);
 
         // create input rowset reader
         vector<RowsetReaderSharedPtr> input_rs_readers;
@@ -390,15 +369,15 @@ protected:
                 output_data.emplace_back(columns[0].column->get_int(i),
                                          columns[1].column->get_int(i));
             }
-        } while (s == Status::OK());
-        EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+        } while (s.ok());
+        EXPECT_TRUE(s.is<END_OF_FILE>()) << s;
         EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
         std::vector<uint32_t> segment_num_rows;
         
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
         if (has_delete_handler) {
             // All keys less than 1000 are deleted by delete handler
             for (auto& item : output_data) {
-                EXPECT_GE(std::get<0>(item), 1000);
+                ASSERT_GE(std::get<0>(item), 1000);
             }
         }
 
@@ -579,7 +558,6 @@ TEST_P(TestRowIdConversion, Conversion) {
         {
             uint32_t num_segments = 1;
             SegmentsOverlapPB overlap = NONOVERLAPPING;
-            std::vector<std::vector<std::vector<std::tuple<int64_t, 
int64_t>>>> input_data;
             check_rowid_conversion(keys_type, 
enable_unique_key_merge_on_write, num_input_rowset,
                                    num_segments, rows_per_segment, overlap, 
has_delete_handler,
                                    is_vertical_merger);
@@ -588,7 +566,6 @@ TEST_P(TestRowIdConversion, Conversion) {
         {
             uint32_t num_segments = 2;
             SegmentsOverlapPB overlap = OVERLAPPING;
-            std::vector<std::vector<std::vector<std::tuple<int64_t, 
int64_t>>>> input_data;
             check_rowid_conversion(keys_type, 
enable_unique_key_merge_on_write, num_input_rowset,
                                    num_segments, rows_per_segment, overlap, 
has_delete_handler,
                                    is_vertical_merger);
@@ -597,7 +574,6 @@ TEST_P(TestRowIdConversion, Conversion) {
         {
             uint32_t num_segments = 2;
             SegmentsOverlapPB overlap = NONOVERLAPPING;
-            std::vector<std::vector<std::vector<std::tuple<int64_t, 
int64_t>>>> input_data;
             check_rowid_conversion(keys_type, 
enable_unique_key_merge_on_write, num_input_rowset,
                                    num_segments, rows_per_segment, overlap, 
has_delete_handler,
                                    is_vertical_merger);
@@ -606,7 +582,6 @@ TEST_P(TestRowIdConversion, Conversion) {
         {
             uint32_t num_segments = 2;
             SegmentsOverlapPB overlap = OVERLAP_UNKNOWN;
-            std::vector<std::vector<std::vector<std::tuple<int64_t, 
int64_t>>>> input_data;
             check_rowid_conversion(keys_type, 
enable_unique_key_merge_on_write, num_input_rowset,
                                    num_segments, rows_per_segment, overlap, 
has_delete_handler,
                                    is_vertical_merger);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 14a202845ae..963b5241800 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -188,22 +188,24 @@ protected:
         return tablet_schema;
     }
 
-    void create_rowset_writer_context(TabletSchemaSPtr tablet_schema,
-                                      const SegmentsOverlapPB& overlap,
-                                      uint32_t max_rows_per_segment,
-                                      RowsetWriterContext* 
rowset_writer_context) {
+    RowsetWriterContext create_rowset_writer_context(TabletSchemaSPtr 
tablet_schema,
+                                                     const SegmentsOverlapPB& 
overlap,
+                                                     uint32_t 
max_rows_per_segment,
+                                                     Version version) {
         static int64_t inc_id = 1000;
+        RowsetWriterContext rowset_writer_context;
         RowsetId rowset_id;
         rowset_id.init(inc_id);
-        rowset_writer_context->rowset_id = rowset_id;
-        rowset_writer_context->rowset_type = BETA_ROWSET;
-        rowset_writer_context->rowset_state = VISIBLE;
-        rowset_writer_context->tablet_schema = tablet_schema;
-        rowset_writer_context->rowset_dir = absolute_dir + "/tablet_path";
-        rowset_writer_context->version = Version(inc_id, inc_id);
-        rowset_writer_context->segments_overlap = overlap;
-        rowset_writer_context->max_rows_per_segment = max_rows_per_segment;
+        rowset_writer_context.rowset_id = rowset_id;
+        rowset_writer_context.rowset_type = BETA_ROWSET;
+        rowset_writer_context.rowset_state = VISIBLE;
+        rowset_writer_context.tablet_schema = tablet_schema;
+        rowset_writer_context.rowset_dir = absolute_dir + "/tablet_path";
+        rowset_writer_context.version = version;
+        rowset_writer_context.segments_overlap = overlap;
+        rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
         inc_id++;
+        return rowset_writer_context;
     }
 
     void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& 
context,
@@ -218,8 +220,7 @@ protected:
 
     RowsetSharedPtr create_rowset(
             TabletSchemaSPtr tablet_schema, const SegmentsOverlapPB& overlap,
-            std::vector<std::vector<std::tuple<int64_t, int64_t>>> 
rowset_data) {
-        RowsetWriterContext writer_context;
+            std::vector<std::vector<std::tuple<int64_t, int64_t>>> 
rowset_data, int64_t version) {
         if (overlap == NONOVERLAPPING) {
             for (auto i = 1; i < rowset_data.size(); i++) {
                 auto& last_seg_data = rowset_data[i - 1];
@@ -229,7 +230,8 @@ protected:
                 EXPECT_LT(last_seg_max, cur_seg_min);
             }
         }
-        create_rowset_writer_context(tablet_schema, overlap, UINT32_MAX, 
&writer_context);
+        auto writer_context = create_rowset_writer_context(tablet_schema, 
overlap, UINT32_MAX,
+                                                           {version, version});
 
         std::unique_ptr<RowsetWriter> rowset_writer;
         Status s = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
@@ -265,51 +267,37 @@ protected:
         return rowset;
     }
 
-    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
+    void init_rs_meta(RowsetMetaSharedPtr& rs_meta, int64_t start, int64_t 
end) {
         std::string json_rowset_meta = R"({
-            "rowset_id": 540085,
-            "tablet_id": 15674,
-            "txn_id": 4045,
-            "tablet_schema_hash": 567997588,
+            "rowset_id": 540081,
+            "tablet_id": 15673,
+            "tablet_schema_hash": 567997577,
             "rowset_type": "BETA_ROWSET",
             "rowset_state": "VISIBLE",
-            "start_version": 2,
-            "end_version": 2,
-            "num_rows": 3929,
-            "total_disk_size": 84699,
-            "data_disk_size": 84464,
-            "index_disk_size": 235,
-            "empty": false,
-            "load_id": {
-                "hi": -5350970832824939812,
-                "lo": -6717994719194512122
-            },
-            "creation_time": 1553765670
+            "empty": false
         })";
         RowsetMetaPB rowset_meta_pb;
         json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
         rowset_meta_pb.set_start_version(start);
         rowset_meta_pb.set_end_version(end);
         rowset_meta_pb.set_creation_time(10000);
-        pb1->init_from_pb(rowset_meta_pb);
+        rs_meta->init_from_pb(rowset_meta_pb);
     }
 
-    void add_delete_predicate(TabletSharedPtr tablet, DeletePredicatePB& 
del_pred,
-                              int64_t version) {
+    RowsetSharedPtr create_delete_predicate(const TabletSchemaSPtr& schema,
+                                            DeletePredicatePB del_pred, 
int64_t version) {
         RowsetMetaSharedPtr rsm(new RowsetMeta());
         init_rs_meta(rsm, version, version);
         RowsetId id;
         id.init(version * 1000);
         rsm->set_rowset_id(id);
-        rsm->set_delete_predicate(del_pred);
-        rsm->set_tablet_schema(tablet->tablet_schema());
-        RowsetSharedPtr rowset = 
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
-        tablet->add_rowset(rowset);
+        rsm->set_delete_predicate(std::move(del_pred));
+        rsm->set_tablet_schema(schema);
+        return std::make_shared<BetaRowset>(schema, "", rsm);
     }
 
     TabletSharedPtr create_tablet(const TabletSchema& tablet_schema,
-                                  bool enable_unique_key_merge_on_write, 
int64_t version,
-                                  bool has_delete_handler) {
+                                  bool enable_unique_key_merge_on_write) {
         std::vector<TColumn> cols;
         std::unordered_map<uint32_t, uint32_t> col_ordinal_to_unique_id;
         for (auto i = 0; i < tablet_schema.num_columns(); i++) {
@@ -346,23 +334,6 @@ protected:
         EXPECT_TRUE(res.ok() && !exists);
         res = 
io::global_local_filesystem()->create_directory(tablet->tablet_path());
         EXPECT_TRUE(res.ok());
-
-        if (has_delete_handler) {
-            // delete data with key < 1000
-            std::vector<TCondition> conditions;
-            TCondition condition;
-            condition.column_name = tablet_schema.column(0).name();
-            condition.condition_op = "<";
-            condition.condition_values.clear();
-            condition.condition_values.push_back("100");
-            conditions.push_back(condition);
-
-            DeletePredicatePB del_pred;
-            Status st =
-                    DeleteHandler::generate_delete_predicate(tablet_schema, 
conditions, &del_pred);
-            EXPECT_EQ(Status::OK(), st);
-            add_delete_predicate(tablet, del_pred, version);
-        }
         return tablet;
     }
 
@@ -491,35 +462,34 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
                 new_overlap = OVERLAPPING;
             }
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
+        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i], i);
         input_rowsets.push_back(rowset);
     }
     // create input rowset reader
     vector<RowsetReaderSharedPtr> input_rs_readers;
     for (auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
-        EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+        ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
         input_rs_readers.push_back(std::move(rs_reader));
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
     Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
-    EXPECT_TRUE(s.ok());
+    ASSERT_TRUE(s.ok()) << s;
 
     // merge input rowset
-    bool has_delete_handler = false;
-    TabletSharedPtr tablet = create_tablet(
-            *tablet_schema, false, output_rs_writer->version().first - 1, 
has_delete_handler);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
-    EXPECT_TRUE(s.ok());
+    ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset = output_rs_writer->build();
+    ASSERT_TRUE(out_rowset);
 
     // create output rowset reader
     RowsetReaderContext reader_context;
@@ -598,7 +568,7 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
                 new_overlap = OVERLAPPING;
             }
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
+        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i], i);
         input_rowsets.push_back(rowset);
     }
     // create input rowset reader
@@ -610,22 +580,20 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
     Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
     EXPECT_TRUE(s.ok());
 
     // merge input rowset
-    bool has_delete_handler = false;
-    TabletSharedPtr tablet = create_tablet(
-            *tablet_schema, false, output_rs_writer->version().first - 1, 
has_delete_handler);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
-    EXPECT_TRUE(s.ok());
+    ASSERT_TRUE(s.ok()) << s;
     RowsetSharedPtr out_rowset = output_rs_writer->build();
 
     // create output rowset reader
@@ -706,7 +674,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
                 new_overlap = OVERLAPPING;
             }
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
+        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i], i);
         input_rowsets.push_back(rowset);
     }
     // create input rowset reader
@@ -718,16 +686,14 @@ TEST_F(VerticalCompactionTest, 
TestUniqueKeyVerticalMerge) {
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
     Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
     EXPECT_TRUE(s.ok());
 
     // merge input rowset
-    bool has_delete_handler = false;
-    TabletSharedPtr tablet = create_tablet(
-            *tablet_schema, false, output_rs_writer->version().first - 1, 
has_delete_handler);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
@@ -795,46 +761,55 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
     }
 
     TabletSchemaSPtr tablet_schema = create_schema(DUP_KEYS);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     // create input rowset
-    vector<RowsetSharedPtr> input_rowsets;
     SegmentsOverlapPB new_overlap = overlap;
+    std::vector<RowsetSharedPtr> input_rowsets;
     for (auto i = 0; i < num_input_rowset; i++) {
         if (overlap == OVERLAP_UNKNOWN) {
-            if (i == 0) {
-                new_overlap = NONOVERLAPPING;
-            } else {
-                new_overlap = OVERLAPPING;
-            }
+            new_overlap = (i == 0) ? NONOVERLAPPING : OVERLAPPING;
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
-        input_rowsets.push_back(rowset);
+        input_rowsets.push_back(create_rowset(tablet_schema, new_overlap, 
input_data[i], i));
     }
+
+    // delete data with key < 100
+    std::vector<TCondition> conditions;
+    TCondition condition;
+    condition.column_name = tablet->tablet_schema()->column(0).name();
+    condition.condition_op = "<";
+    condition.condition_values.clear();
+    condition.condition_values.push_back("100");
+    conditions.push_back(condition);
+    DeletePredicatePB del_pred;
+    auto st = 
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), conditions,
+                                                       &del_pred);
+    ASSERT_TRUE(st.ok()) << st;
+    input_rowsets.push_back(create_delete_predicate(tablet->tablet_schema(), 
std::move(del_pred),
+                                                    num_input_rowset));
+
     // create input rowset reader
     vector<RowsetReaderSharedPtr> input_rs_readers;
     for (auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
-        EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+        ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
         input_rs_readers.push_back(std::move(rs_reader));
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
-    Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
-    EXPECT_TRUE(s.ok());
-
+    st = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
+    ASSERT_TRUE(st.ok()) << st;
     // merge input rowset
-    bool has_delete_handler = true;
-    TabletSharedPtr tablet = create_tablet(*tablet_schema, false, 
output_rs_writer->version().first,
-                                           has_delete_handler);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
-    s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
-    EXPECT_TRUE(s.ok());
+    st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
+                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+    ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset = output_rs_writer->build();
+    ASSERT_TRUE(out_rowset);
 
     // create output rowset reader
     RowsetReaderContext reader_context;
@@ -851,24 +826,22 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
     std::vector<std::tuple<int64_t, int64_t>> output_data;
     do {
         block_create(tablet_schema, &output_block);
-        s = output_rs_reader->next_block(&output_block);
+        st = output_rs_reader->next_block(&output_block);
         auto columns = output_block.get_columns_with_type_and_name();
         EXPECT_EQ(columns.size(), 2);
         for (auto i = 0; i < output_block.rows(); i++) {
             output_data.emplace_back(columns[0].column->get_int(i), 
columns[1].column->get_int(i));
         }
-    } while (s == Status::OK());
-    EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+    } while (st.ok());
+    EXPECT_TRUE(st.is<END_OF_FILE>()) << st;
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(),
               num_input_rowset * num_segments * rows_per_segment - 
num_input_rowset * 100);
     std::vector<uint32_t> segment_num_rows;
     
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
-    if (has_delete_handler) {
-        // All keys less than 1000 are deleted by delete handler
-        for (auto& item : output_data) {
-            EXPECT_GE(std::get<0>(item), 100);
-        }
+    // All keys less than 1000 are deleted by delete handler
+    for (auto& item : output_data) {
+        ASSERT_GE(std::get<0>(item), 100);
     }
 }
 
@@ -889,46 +862,55 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
     }
 
     TabletSchemaSPtr tablet_schema = create_schema(DUP_KEYS, true);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     // create input rowset
-    vector<RowsetSharedPtr> input_rowsets;
     SegmentsOverlapPB new_overlap = overlap;
+    std::vector<RowsetSharedPtr> input_rowsets;
     for (auto i = 0; i < num_input_rowset; i++) {
         if (overlap == OVERLAP_UNKNOWN) {
-            if (i == 0) {
-                new_overlap = NONOVERLAPPING;
-            } else {
-                new_overlap = OVERLAPPING;
-            }
+            new_overlap = (i == 0) ? NONOVERLAPPING : OVERLAPPING;
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
-        input_rowsets.push_back(rowset);
+        input_rowsets.push_back(create_rowset(tablet_schema, new_overlap, 
input_data[i], i));
     }
+
+    // delete data with key < 100
+    std::vector<TCondition> conditions;
+    TCondition condition;
+    condition.column_name = tablet->tablet_schema()->column(0).name();
+    condition.condition_op = "<";
+    condition.condition_values.clear();
+    condition.condition_values.push_back("100");
+    conditions.push_back(condition);
+    DeletePredicatePB del_pred;
+    auto st = 
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), conditions,
+                                                       &del_pred);
+    ASSERT_TRUE(st.ok()) << st;
+    input_rowsets.push_back(create_delete_predicate(tablet->tablet_schema(), 
std::move(del_pred),
+                                                    num_input_rowset));
+
     // create input rowset reader
     vector<RowsetReaderSharedPtr> input_rs_readers;
     for (auto& rowset : input_rowsets) {
         RowsetReaderSharedPtr rs_reader;
-        EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+        ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
         input_rs_readers.push_back(std::move(rs_reader));
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
-    Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
-    EXPECT_TRUE(s.ok());
-
+    st = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
+    ASSERT_TRUE(st.ok()) << st;
     // merge input rowset
-    bool has_delete_handler = true;
-    TabletSharedPtr tablet = create_tablet(*tablet_schema, false, 
output_rs_writer->version().first,
-                                           has_delete_handler);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
-    s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
-    EXPECT_TRUE(s.ok());
+    st = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
+                                        input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+    ASSERT_TRUE(st.ok()) << st;
     RowsetSharedPtr out_rowset = output_rs_writer->build();
+    ASSERT_TRUE(out_rowset);
 
     // create output rowset reader
     RowsetReaderContext reader_context;
@@ -945,24 +927,22 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
     std::vector<std::tuple<int64_t, int64_t>> output_data;
     do {
         block_create(tablet_schema, &output_block);
-        s = output_rs_reader->next_block(&output_block);
+        st = output_rs_reader->next_block(&output_block);
         auto columns = output_block.get_columns_with_type_and_name();
         EXPECT_EQ(columns.size(), 2);
         for (auto i = 0; i < output_block.rows(); i++) {
             output_data.emplace_back(columns[0].column->get_int(i), 
columns[1].column->get_int(i));
         }
-    } while (s == Status::OK());
-    EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+    } while (st.ok());
+    EXPECT_TRUE(st.is<END_OF_FILE>()) << st;
     EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
     EXPECT_EQ(output_data.size(),
               num_input_rowset * num_segments * rows_per_segment - 
num_input_rowset * 100);
     std::vector<uint32_t> segment_num_rows;
     
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
-    if (has_delete_handler) {
-        // All keys less than 1000 are deleted by delete handler
-        for (auto& item : output_data) {
-            EXPECT_GE(std::get<0>(item), 100);
-        }
+    // All keys less than 1000 are deleted by delete handler
+    for (auto& item : output_data) {
+        ASSERT_GE(std::get<0>(item), 100);
     }
 }
 
@@ -994,7 +974,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
                 new_overlap = OVERLAPPING;
             }
         }
-        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i]);
+        RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap, 
input_data[i], i);
         input_rowsets.push_back(rowset);
     }
     // create input rowset reader
@@ -1006,16 +986,14 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
     }
 
     // create output rowset writer
-    RowsetWriterContext writer_context;
-    create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, 
&writer_context);
+    auto writer_context = create_rowset_writer_context(tablet_schema, 
NONOVERLAPPING, 3456,
+                                                       {0, 
input_rowsets.back()->end_version()});
     std::unique_ptr<RowsetWriter> output_rs_writer;
     Status s = RowsetFactory::create_rowset_writer(writer_context, true, 
&output_rs_writer);
     EXPECT_TRUE(s.ok());
 
     // merge input rowset
-    bool has_delete_handler = false;
-    TabletSharedPtr tablet = create_tablet(
-            *tablet_schema, false, output_rs_writer->version().first - 1, 
has_delete_handler);
+    TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
     Merger::Statistics stats;
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to