chaoyli closed pull request #516: Modify compaction code to be adapted to 
Rowset interface
URL: https://github.com/apache/incubator-doris/pull/516
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index a33959b9..06491efb 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -551,7 +551,7 @@ add_subdirectory(${SRC_DIR}/util)
 add_subdirectory(${SRC_DIR}/gen_cpp)
 add_subdirectory(${SRC_DIR}/gutil)
 add_subdirectory(${SRC_DIR}/olap)
-add_subdirectory(${SRC_DIR}/agent)
+#add_subdirectory(${SRC_DIR}/agent)
 add_subdirectory(${SRC_DIR}/http)
 add_subdirectory(${SRC_DIR}/service)
 add_subdirectory(${SRC_DIR}/exec)
@@ -559,7 +559,7 @@ add_subdirectory(${SRC_DIR}/exprs)
 add_subdirectory(${SRC_DIR}/udf)
 add_subdirectory(${SRC_DIR}/runtime)
 add_subdirectory(${SRC_DIR}/testutil)
-add_subdirectory(${SRC_DIR}/tools)
+#add_subdirectory(${SRC_DIR}/tools)
 
 # Utility CMake function to make specifying tests and benchmarks less verbose
 FUNCTION(ADD_BE_TEST TEST_NAME)
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index ddaf685e..383d5bf4 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -52,10 +52,10 @@ add_library(Olap STATIC
     options.cpp
     out_stream.cpp
     push_handler.cpp
-    reader.cpp
+    #reader.cpp
     row_block.cpp
     row_cursor.cpp
-    schema_change.cpp
+    #schema_change.cpp
     serialize.cpp
     storage_engine.cpp
     data_dir.cpp
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 7eca9ffc..9f2dc2dc 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -104,9 +104,9 @@ OLAPStatus BaseCompaction::run() {
     VLOG(10) << "new_base_version_hash" << new_base_version_hash;
 
     // 2. 获取生成新base需要的data sources
-    vector<ColumnData*> base_data_sources;
-    _tablet->acquire_data_sources_by_versions(_need_merged_versions, 
&base_data_sources);
-    if (base_data_sources.empty()) {
+    vector<RowsetSharedPtr> rowsets;
+    _tablet->capture_consistent_rowsets(_need_merged_versions, &rowsets);
+    if (rowsets.empty()) {
         OLAP_LOG_WARNING("fail to acquire need data sources. [tablet=%s; 
version=%d]",
                          _tablet->full_name().c_str(),
                          _new_base_version.second);
@@ -117,8 +117,8 @@ OLAPStatus BaseCompaction::run() {
     {
         
DorisMetrics::base_compaction_deltas_total.increment(_need_merged_versions.size());
         int64_t merge_bytes = 0;
-        for (ColumnData* i_data : base_data_sources) {
-            merge_bytes += i_data->segment_group()->data_size();
+        for (auto& rowset : rowsets) {
+            merge_bytes += rowset->data_disk_size();
         }
         DorisMetrics::base_compaction_bytes_total.increment(merge_bytes);
     }
@@ -129,11 +129,8 @@ OLAPStatus BaseCompaction::run() {
     // 3. 执行base compaction
     //    执行过程可能会持续比较长时间
     stage_watch.reset();
-    res = _do_base_compaction(new_base_version_hash,
-                             &base_data_sources,
-                             &row_count);
+    res = _do_base_compaction(new_base_version_hash, rowsets, &row_count);
     // 释放不再使用的ColumnData对象
-    _tablet->release_data_sources(&base_data_sources);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to do base version. [tablet=%s; version=%d]",
                          _tablet->full_name().c_str(),
@@ -142,38 +139,28 @@ OLAPStatus BaseCompaction::run() {
         return res;
     }
 
+    //  validate that delete action is right
+    //  if error happened, sleep 1 hour. Report a fatal log every 1 minute
+    if (_validate_delete_file_action() != OLAP_SUCCESS) {
+        LOG(WARNING) << "failed to do base compaction. delete action has 
error.";
+        _garbage_collection();
+        return OLAP_ERR_BE_ERROR_DELETE_ACTION;
+    }
+
     VLOG(3) << "elapsed time of doing base compaction:" << 
stage_watch.get_elapse_time_us();
 
     // 4. make new versions visable.
     //    If success, remove files belong to old versions;
     //    If fail, gc files belong to new versions.
-    vector<SegmentGroup*> unused_olap_indices;
-    res = _update_header(row_count, &unused_olap_indices);
+    vector<RowsetSharedPtr> unused_rowsets;
+    res = _update_header(row_count, &unused_rowsets);
     if (res != OLAP_SUCCESS) {
         LOG(WARNING) << "fail to update header. tablet=" << 
_tablet->full_name()
                      << ", version=" << _new_base_version.first << "-" << 
_new_base_version.second;
         _garbage_collection();
         return res;
     }
-    _delete_old_files(&unused_olap_indices);
-
-    //  validate that delete action is right
-    //  if error happened, sleep 1 hour. Report a fatal log every 1 minute
-    if (_validate_delete_file_action() != OLAP_SUCCESS) {
-        int sleep_count = 0;
-        while (true) {
-            if (sleep_count >= 60) {
-                break;
-            }
-
-            ++sleep_count;
-            LOG(FATAL) << "base compaction's delete action has error.sleep 1 
minute...";
-            sleep(60);
-        }
-
-        _garbage_collection();
-        return OLAP_ERR_BE_ERROR_DELETE_ACTION;
-    }
+    _delete_old_files(&unused_rowsets);
 
     _release_base_compaction_lock();
 
@@ -319,21 +306,29 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool 
is_manual_trigger,
 }
 
 OLAPStatus BaseCompaction::_do_base_compaction(VersionHash 
new_base_version_hash,
-                                               vector<ColumnData*>* 
base_data_sources,
+                                               const vector<RowsetSharedPtr>& 
rowsets,
                                                uint64_t* row_count) {
     // 1. 生成新base文件对应的olap index
-    /*
-    SegmentGroup* new_base = new (std::nothrow) SegmentGroup(_tablet.get(),
-                                                       _new_base_version,
-                                                       new_base_version_hash,
-                                                       false, 0, 0);
-    */
-
-    SegmentGroup* new_base = nullptr;
-    if (new_base == NULL) {
-        OLAP_LOG_WARNING("fail to new SegmentGroup.");
+    RowsetId rowset_id = 0;
+    RowsetIdGenerator::instance()->get_next_id(_tablet->data_dir(), 
&rowset_id);
+    RowsetBuilderContext context = {_tablet->partition_id(), 
_tablet->tablet_id(),
+                                    _tablet->schema_hash(), rowset_id, 
+                                    RowsetTypePB::ALPHA_ROWSET, 
_tablet->rowset_path_prefix(),
+                                    _tablet->tablet_schema(), 
_tablet->num_key_fields(),
+                                    _tablet->num_short_key_fields(), 
_tablet->num_rows_per_row_block(),
+                                    _tablet->compress_kind(), 
_tablet->bloom_filter_fpp()};
+    RowsetBuilder* builder = new AlphaRowsetBuilder(); 
+    if (builder == nullptr) {
+        LOG(WARNING) << "fail to new rowset.";
         return OLAP_ERR_MALLOC_ERROR;
     }
+    builder->init(context);
+
+    vector<RowsetReaderSharedPtr> rs_readers;
+    for (auto& rowset : rowsets) {
+        RowsetReaderSharedPtr rs_reader(rowset->create_reader());
+        rs_readers.push_back(rs_reader);
+    }
 
     LOG(INFO) << "start merge new base. tablet=" << _tablet->full_name()
               << ", version=" << _new_base_version.second;
@@ -349,12 +344,13 @@ OLAPStatus 
BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
     uint64_t merged_rows = 0;
     uint64_t filted_rows = 0;
     OLAPStatus res = OLAP_SUCCESS;
+    RowsetSharedPtr new_base = builder->build();
     if (_tablet->data_file_type() == COLUMN_ORIENTED_FILE) {
         _tablet->obtain_header_rdlock();
         _tablet->release_header_lock();
 
-        Merger merger(_tablet, new_base, READER_BASE_COMPACTION);
-        res = merger.merge(*base_data_sources, &merged_rows, &filted_rows);
+        Merger merger(_tablet, builder, READER_BASE_COMPACTION);
+        res = merger.merge(rs_readers, _new_base_version, &merged_rows, 
&filted_rows);
         if (res == OLAP_SUCCESS) {
             *row_count = merger.row_count();
         }
@@ -372,19 +368,17 @@ OLAPStatus 
BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
                          _new_base_version.second,
                          res);
 
-        new_base->delete_all_files();
-        SAFE_DELETE(new_base);
-
+        StorageEngine::get_instance()->add_unused_rowset(new_base);
         return OLAP_ERR_BE_MERGE_ERROR;
     }
 
     // 4. 如果merge成功,则将新base文件对应的olap index载入
-    _new_olap_indices.push_back(new_base);
+    _new_rowsets.push_back(new_base);
 
     VLOG(10) << "merge new base success, start load index. tablet=" << 
_tablet->full_name()
              << ", version=" << _new_base_version.second;
 
-    res = new_base->load();
+    res = new_base->init();
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to load index. [version='%d-%d' 
version_hash=%ld tablet='%s']",
                          new_base->version().first,
@@ -396,8 +390,8 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash 
new_base_version_hash
 
     // Check row num changes
     uint64_t source_rows = 0;
-    for (ColumnData* i_data : *base_data_sources) {
-        source_rows += i_data->segment_group()->num_rows();
+    for (auto& rowset : rowsets) {
+        source_rows += rowset->num_rows();
     }
     bool row_nums_check = config::row_nums_check;
     if (row_nums_check) {
@@ -419,19 +413,20 @@ OLAPStatus 
BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
 
     LOG(INFO) << "succeed to do base compaction. tablet=" << 
_tablet->full_name()
               << ", base_version=" << _new_base_version.first << "-" << 
_new_base_version.second;
+    _tablet->release_rs_readers(&rs_readers);
     return OLAP_SUCCESS;
 }
 
-OLAPStatus BaseCompaction::_update_header(uint64_t row_count, 
vector<SegmentGroup*>* unused_olap_indices) {
+OLAPStatus BaseCompaction::_update_header(uint64_t row_count, 
vector<RowsetSharedPtr>* unused_rowsets) {
     WriteLock wrlock(_tablet->get_header_lock_ptr());
     vector<Version> unused_versions;
     _get_unused_versions(&unused_versions);
 
     OLAPStatus res = OLAP_SUCCESS;
-    // 由于在replace_data_sources中可能会发生很小概率的非事务性失败, 因此这里定位FATAL错误
-    res = _tablet->replace_data_sources(&unused_versions,
-                                       &_new_olap_indices,
-                                       unused_olap_indices);
+    // 由于在modify_rowsets中可能会发生很小概率的非事务性失败, 因此这里定位FATAL错误
+    res = _tablet->modify_rowsets(&unused_versions,
+                                  &_new_rowsets,
+                                  unused_rowsets);
     if (res != OLAP_SUCCESS) {
         LOG(FATAL) << "fail to replace data sources. res" << res
                    << ", tablet=" << _tablet->full_name()
@@ -456,30 +451,30 @@ OLAPStatus BaseCompaction::_update_header(uint64_t 
row_count, vector<SegmentGrou
                    << ", old_base_version=" << _old_base_version.second;
         return OLAP_ERR_BE_SAVE_HEADER_ERROR;
     }
-    _new_olap_indices.clear();
+    _new_rowsets.clear();
 
     return OLAP_SUCCESS;
 }
 
-void BaseCompaction::_delete_old_files(vector<SegmentGroup*>* unused_indices) {
+void BaseCompaction::_delete_old_files(vector<RowsetSharedPtr>* 
unused_indices) {
     if (!unused_indices->empty()) {
-        StorageEngine* unused_index = StorageEngine::get_instance();
+        StorageEngine* storage_engine = StorageEngine::get_instance();
 
-        for (vector<SegmentGroup*>::iterator it = unused_indices->begin();
+        for (vector<RowsetSharedPtr>::iterator it = unused_indices->begin();
                 it != unused_indices->end(); ++it) {
-            unused_index->add_unused_index(*it);
+            storage_engine->add_unused_rowset(*it);
         }
     }
 }
 
 void BaseCompaction::_garbage_collection() {
     // 清理掉已生成的版本文件
-    for (vector<SegmentGroup*>::iterator it = _new_olap_indices.begin();
-            it != _new_olap_indices.end(); ++it) {
-        (*it)->delete_all_files();
-        SAFE_DELETE(*it);
+    StorageEngine* storage_engine = StorageEngine::get_instance();
+    for (vector<RowsetSharedPtr>::iterator it = _new_rowsets.begin();
+            it != _new_rowsets.end(); ++it) {
+        storage_engine->add_unused_rowset(*it);
     }
-    _new_olap_indices.clear();
+    _new_rowsets.clear();
 
     _release_base_compaction_lock();
 }
@@ -526,17 +521,17 @@ OLAPStatus BaseCompaction::_validate_delete_file_action() 
{
     // 1. acquire the latest version to make sure all is right after deleting 
files
     ReadLock rdlock(_tablet->get_header_lock_ptr());
     const PDelta* lastest_version = _tablet->lastest_version();
-    Version test_version = Version(0, lastest_version->end_version());
-    vector<ColumnData*> test_sources;
-    _tablet->acquire_data_sources(test_version, &test_sources);
+    Version spec_version = Version(0, lastest_version->end_version());
+    vector<RowsetReaderSharedPtr> rs_readers;
+    _tablet->capture_rs_readers(spec_version, &rs_readers);
 
-    if (test_sources.size() == 0) {
+    if (rs_readers.empty()) {
         LOG(INFO) << "acquire data sources failed. version="
-           << test_version.first << "-" << test_version.second; 
+           << spec_version.first << "-" << spec_version.second; 
         return OLAP_ERR_BE_ERROR_DELETE_ACTION;
     }
 
-    _tablet->release_data_sources(&test_sources);
+    _tablet->release_rs_readers(&rs_readers);
     VLOG(3) << "delete file action is OK";
 
     return OLAP_SUCCESS;
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index b2bfb119..42bb2699 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -24,11 +24,13 @@
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/tablet.h"
+#include "rowset/rowset_id_generator.h"
+#include "rowset/alpha_rowset_builder.h"
 
 namespace doris {
 
-class ColumnData;
-class SegmentGroup;
+class Rowset;
+class RowsetReader;
 
 // @brief 实现对START_BASE_COMPACTION命令的处理逻辑,并返回处理结果
 class BaseCompaction {
@@ -82,35 +84,35 @@ class BaseCompaction {
     // 
     // 输入参数:
     // - new_base_version_hash: 新Base的VersionHash
-    // - base_data_sources: 生成新Base需要的ColumnData*
+    // - rs_readers : 生成新Base需要的RowsetReaders*
     // - row_count: 生成Base过程中产生的row_count
     //
     // 返回值:
     // - 如果执行成功,则返回OLAP_SUCCESS;
     // - 其它情况下,返回相应的错误码
     OLAPStatus _do_base_compaction(VersionHash new_base_version_hash,
-                                  std::vector<ColumnData*>* base_data_sources,
-                                  uint64_t* row_count);
+                                   const std::vector<RowsetSharedPtr>& rowsets,
+                                   uint64_t* row_count);
    
     // 更新Header使得修改对外可见
     // 输出参数:
-    // - unused_olap_indices: 需要被物理删除的SegmentGroup*
+    // - unused_rowsets: 需要被物理删除的Rowset*
     //
     // 返回值:
     // - 如果执行成功,则返回OLAP_SUCCESS;
     // - 其它情况下,返回相应的错误码
     OLAPStatus _update_header(uint64_t row_count,
-                              std::vector<SegmentGroup*>* unused_olap_indices);
+                              std::vector<RowsetSharedPtr>* unused_rowsets);
 
-    // 删除不再使用的SegmentGroup文件
+    // 删除不再使用的Rowset
     // 
     // 输入参数:
-    // - unused_olap_indices: 需要被物理删除的SegmentGroup*
+    // - unused_rowsets: 需要被物理删除的Rowset*
     //
     // 返回值:
     // - 如果执行成功,则返回OLAP_SUCCESS;
     // - 其它情况下,返回相应的错误码
-    void _delete_old_files(std::vector<SegmentGroup*>* unused_indices);
+    void _delete_old_files(std::vector<RowsetSharedPtr>* unused_indices);
 
     // 其它函数执行失败时,调用该函数进行清理工作
     void _garbage_collection();
@@ -173,8 +175,8 @@ class BaseCompaction {
     Version _latest_cumulative;
     // 在此次base compaction执行过程中,将被合并的cumulative文件版本
     std::vector<Version> _need_merged_versions;
-    // 需要新增的版本对应的SegmentGroup
-    std::vector<SegmentGroup*> _new_olap_indices;
+    // 需要新增的版本对应Rowset的
+    std::vector<RowsetSharedPtr> _new_rowsets;
 
     bool _base_compaction_locked;
 
diff --git a/be/src/olap/cumulative_compaction.cpp 
b/be/src/olap/cumulative_compaction.cpp
index f29a55c0..027d3481 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -115,8 +115,8 @@ OLAPStatus CumulativeCompaction::run() {
     }
 
     // 2. 获取待合并的delta文件对应的data文件
-    _tablet->acquire_data_sources_by_versions(_need_merged_versions, 
&_data_source);
-    if (_data_source.size() == 0) {
+    _tablet->capture_consistent_rowsets(_need_merged_versions, &_rowsets);
+    if (_rowsets.empty()) {
         _tablet->release_cumulative_lock();
         OLAP_LOG_WARNING("failed to acquire data source. [tablet=%s]",
                          _tablet->full_name().c_str());
@@ -126,32 +126,31 @@ OLAPStatus CumulativeCompaction::run() {
     {
         
DorisMetrics::cumulative_compaction_deltas_total.increment(_need_merged_versions.size());
         int64_t merge_bytes = 0;
-        for (ColumnData* i_data : _data_source) {
-            merge_bytes += i_data->segment_group()->data_size();
+        for (auto& rowset : _rowsets) {
+            merge_bytes += rowset->data_disk_size();
         }
         DorisMetrics::cumulative_compaction_bytes_total.increment(merge_bytes);
     }
 
     do {
         // 3. 生成新cumulative文件对应的olap index
-        /*
-        _new_segment_group = new (nothrow) SegmentGroup(_tablet.get(),
-                                                        _cumulative_version,
-                                                        
_cumulative_version_hash,
-                                                        false, 0, 0);
-        */
-        _new_segment_group = nullptr;
-        if (_new_segment_group == NULL) {
-            OLAP_LOG_WARNING("failed to malloc new cumulative olap index. "
-                             "[tablet=%s; cumulative_version=%d-%d]",
-                             _tablet->full_name().c_str(),
-                             _cumulative_version.first,
-                             _cumulative_version.second);
-            break;
-        }
+        RowsetId rowset_id = 0;
+        RowsetIdGenerator::instance()->get_next_id(_tablet->data_dir(), 
&rowset_id);
+        RowsetBuilderContext context = {_tablet->partition_id(), 
_tablet->tablet_id(),
+                                        _tablet->schema_hash(), rowset_id, 
+                                        RowsetTypePB::ALPHA_ROWSET, 
_tablet->rowset_path_prefix(),
+                                        _tablet->tablet_schema(), 
_tablet->num_key_fields(),
+                                        _tablet->num_short_key_fields(), 
_tablet->num_rows_per_row_block(),
+                                        _tablet->compress_kind(), 
_tablet->bloom_filter_fpp()};
+        _builder->init(context);
 
         // 4. 执行cumulative compaction合并过程
+        for (auto& rowset : _rowsets) {
+            RowsetReaderSharedPtr rs_reader(rowset->create_reader());
+            _rs_readers.push_back(rs_reader);
+        }
         res = _do_cumulative_compaction();
+        _rowset = _builder->build();
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("failed to do cumulative compaction. "
                              "[tablet=%s; cumulative_version=%d-%d]",
@@ -163,13 +162,12 @@ OLAPStatus CumulativeCompaction::run() {
     } while (0);
 
     // 5. 如果出现错误,执行清理工作
-    if (res != OLAP_SUCCESS && _new_segment_group != NULL) {
-        _new_segment_group->delete_all_files();
-        SAFE_DELETE(_new_segment_group);
+    if (res != OLAP_SUCCESS && _rowset != NULL) {
+        StorageEngine::get_instance()->add_unused_rowset(_rowset);
     }
     
-    if (_data_source.size() != 0) {
-        _tablet->release_data_sources(&_data_source);
+    if (_rs_readers.empty()) {
+        _tablet->release_rs_readers(&_rs_readers);
     }
 
     _tablet->release_cumulative_lock();
@@ -372,12 +370,12 @@ bool CumulativeCompaction::_find_previous_version(const 
Version current_version,
 
 OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
     OLAPStatus res = OLAP_SUCCESS;
-    Merger merger(_tablet, _new_segment_group, READER_CUMULATIVE_COMPACTION);
+    Merger merger(_tablet, _builder, READER_CUMULATIVE_COMPACTION);
 
     // 1. merge delta files into new cumulative file
     uint64_t merged_rows = 0;
     uint64_t filted_rows = 0;
-    res = merger.merge(_data_source, &merged_rows, &filted_rows);
+    res = merger.merge(_rs_readers, _cumulative_version, &merged_rows, 
&filted_rows);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("failed to do cumulative merge. [tablet=%s; 
cumulative_version=%d-%d]",
                          _tablet->full_name().c_str(),
@@ -387,7 +385,7 @@ OLAPStatus 
CumulativeCompaction::_do_cumulative_compaction() {
     }
 
     // 2. load new cumulative file
-    res = _new_segment_group->load();
+    res = _rowset->init();
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("failed to load cumulative index. [tablet=%s; 
cumulative_version=%d-%d]",
                          _tablet->full_name().c_str(),
@@ -398,30 +396,30 @@ OLAPStatus 
CumulativeCompaction::_do_cumulative_compaction() {
 
     // Check row num changes
     uint64_t source_rows = 0;
-    for (ColumnData* i_data : _data_source) {
-        source_rows += i_data->segment_group()->num_rows();
+    for (auto rowset : _rowsets) {
+        source_rows += rowset->num_rows();
     }
     bool row_nums_check = config::row_nums_check;
     if (row_nums_check) {
-        if (source_rows != _new_segment_group->num_rows() + merged_rows + 
filted_rows) {
+        if (source_rows != _rowset->num_rows() + merged_rows + filted_rows) {
             LOG(FATAL) << "fail to check row num! "
                        << "source_rows=" << source_rows
                        << ", merged_rows=" << merged_rows
                        << ", filted_rows=" << filted_rows
-                       << ", new_index_rows=" << 
_new_segment_group->num_rows();
+                       << ", new_index_rows=" << _rowset->num_rows();
             return OLAP_ERR_CHECK_LINES_ERROR;
         }
     } else {
         LOG(INFO) << "all row nums. source_rows=" << source_rows
                   << ", merged_rows=" << merged_rows
                   << ", filted_rows=" << filted_rows
-                  << ", new_index_rows=" << _new_segment_group->num_rows();
+                  << ", new_index_rows=" << _rowset->num_rows();
     }
 
     // 3. add new cumulative file into tablet
-    vector<SegmentGroup*> unused_indices;
+    vector<RowsetSharedPtr> unused_rowsets;
     _obtain_header_wrlock();
-    res = _update_header(&unused_indices);
+    res = _update_header(&unused_rowsets);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("failed to update header for new cumulative."
                          "[tablet=%s; cumulative_version=%d-%d]",
@@ -440,7 +438,7 @@ OLAPStatus 
CumulativeCompaction::_do_cumulative_compaction() {
                    << ", cumulative_version=" << _cumulative_version.first 
                    << "-" << _cumulative_version.second;
         // if error happened, roll back
-        OLAPStatus ret = _roll_back(unused_indices);
+        OLAPStatus ret = _roll_back(unused_rowsets);
         if (ret != OLAP_SUCCESS) {
             LOG(FATAL) << "roll back failed. [tablet=" <<  
_tablet->full_name() << "]";
         }
@@ -454,7 +452,7 @@ OLAPStatus 
CumulativeCompaction::_do_cumulative_compaction() {
     _release_header_lock();
 
     // 6. delete delta files which have been merged into new cumulative file
-    _delete_unused_delta_files(&unused_indices);
+    _delete_unused_delta_files(&unused_rowsets);
 
     LOG(INFO) << "succeed to do cumulative compaction. tablet=" << 
_tablet->full_name()
               << ", cumulative_version=" << _cumulative_version.first << "-"
@@ -462,12 +460,12 @@ OLAPStatus 
CumulativeCompaction::_do_cumulative_compaction() {
     return res;
 }
 
-OLAPStatus CumulativeCompaction::_update_header(vector<SegmentGroup*>* 
unused_indices) {
-    vector<SegmentGroup*> new_indices;
-    new_indices.push_back(_new_segment_group);
+OLAPStatus CumulativeCompaction::_update_header(vector<RowsetSharedPtr>* 
unused_rowsets) {
+    vector<RowsetSharedPtr> new_rowsets;
+    new_rowsets.push_back(_rowset);
 
     OLAPStatus res = OLAP_SUCCESS;
-    res = _tablet->replace_data_sources(&_need_merged_versions, &new_indices, 
unused_indices);
+    res = _tablet->modify_rowsets(&_need_merged_versions, &new_rowsets, 
unused_rowsets);
     if (res != OLAP_SUCCESS) {
         LOG(FATAL) << "failed to replace data sources. res=" << res
                    << ", tablet=" << _tablet->full_name();
@@ -484,13 +482,13 @@ OLAPStatus 
CumulativeCompaction::_update_header(vector<SegmentGroup*>* unused_in
     return res;
 }
 
-void CumulativeCompaction::_delete_unused_delta_files(vector<SegmentGroup*>* 
unused_indices) {
-    if (!unused_indices->empty()) {
-        StorageEngine* unused_index = StorageEngine::get_instance();
+void CumulativeCompaction::_delete_unused_delta_files(vector<RowsetSharedPtr>* 
unused_rowsets) {
+    if (!unused_rowsets->empty()) {
+        StorageEngine* storage_engine = StorageEngine::get_instance();
 
-        for (vector<SegmentGroup*>::iterator it = unused_indices->begin();
-                it != unused_indices->end(); ++it) {
-            unused_index->add_unused_index(*it);
+        for (vector<RowsetSharedPtr>::iterator it = unused_rowsets->begin();
+                it != unused_rowsets->end(); ++it) {
+            storage_engine->add_unused_rowset(*it);
         }
     }
 }
@@ -515,28 +513,28 @@ bool 
CumulativeCompaction::_validate_need_merged_versions() {
 
 OLAPStatus CumulativeCompaction::_validate_delete_file_action() {
     // 1. acquire the new cumulative version to make sure that all is right 
after deleting files
-    Version test_version = Version(0, _cumulative_version.second);
-    vector<ColumnData*> test_sources;
-    _tablet->acquire_data_sources(test_version, &test_sources);
-    if (test_sources.size() == 0) {
-        OLAP_LOG_WARNING("acquire data source failed. [test_verison=%d-%d]",
-                         test_version.first, test_version.second);
+    Version spec_version = Version(0, _cumulative_version.second);
+    vector<RowsetReaderSharedPtr> rs_readers;
+    _tablet->capture_rs_readers(spec_version, &rs_readers);
+    if (rs_readers.empty()) {
+        LOG(WARNING) << "acquire data source failed. "
+            << "spec_verison=" << spec_version.first << "-" << 
spec_version.second;
         return OLAP_ERR_CUMULATIVE_ERROR_DELETE_ACTION;
     }
 
-    _tablet->release_data_sources(&test_sources);
+    _tablet->release_rs_readers(&rs_readers);
     return OLAP_SUCCESS;
 }
 
-OLAPStatus CumulativeCompaction::_roll_back(const vector<SegmentGroup*>& 
old_olap_indices) {
+OLAPStatus CumulativeCompaction::_roll_back(vector<RowsetSharedPtr>& 
old_olap_indices) {
     vector<Version> need_remove_version;
     need_remove_version.push_back(_cumulative_version);
-    // unused_indices will only contain new cumulative index
+    // unused_rowsets will only contain new cumulative index
     // we don't need to delete it here; we will delete new cumulative index in 
the end.
-    vector<SegmentGroup*> unused_indices;
+    vector<RowsetSharedPtr> unused_rowsets;
 
     OLAPStatus res = OLAP_SUCCESS;
-    res = _tablet->replace_data_sources(&need_remove_version, 
&old_olap_indices, &unused_indices);
+    res = _tablet->modify_rowsets(&need_remove_version, &old_olap_indices, 
&unused_rowsets);
     if (res != OLAP_SUCCESS) {
         LOG(FATAL) << "failed to replace data sources. [tablet=" << 
_tablet->full_name() << "]";
         return res;
diff --git a/be/src/olap/cumulative_compaction.h 
b/be/src/olap/cumulative_compaction.h
index 89c74e27..f55b24f0 100755
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -24,13 +24,14 @@
 #include <vector>
 
 #include "olap/merger.h"
-#include "olap/rowset/column_data.h"
 #include "olap/olap_define.h"
 #include "olap/tablet.h"
+#include "rowset/rowset_id_generator.h"
+#include "rowset/alpha_rowset_builder.h"
 
 namespace doris {
 
-class SegmentGroup;
+class Rowset;
 
 class CumulativeCompaction {
 public:
@@ -40,7 +41,7 @@ class CumulativeCompaction {
             _old_cumulative_layer_point(0),
             _new_cumulative_layer_point(0),
             _max_delta_file_size(0),
-            _new_segment_group(NULL) {}
+            _rowset(nullptr) {}
 
     ~CumulativeCompaction() {}
     
@@ -105,18 +106,18 @@ class CumulativeCompaction {
     // 将合并得到的新cumulative文件载入tablet
     //
     // 输出参数:
-    // - unused_indices: 返回不再使用的delta文件对应的olap index
+    // - unused_rowsets: 返回不再使用的delta文件对应的olap index
     //
     // 返回值:
     // - 如果成功,返回OLAP_SUCCESS
     // - 如果不成功,返回相应错误码
-    OLAPStatus _update_header(std::vector<SegmentGroup*>* unused_indices);
+    OLAPStatus _update_header(std::vector<RowsetSharedPtr>* unused_rowsets);
 
     // 删除不再使用的delta文件
     //
     // 输入输出参数
-    // - unused_indices: 待删除的不再使用的delta文件对应的olap index
-    void _delete_unused_delta_files(std::vector<SegmentGroup*>* 
unused_indices);
+    // - unused_rowsets: 待删除的不再使用的delta文件对应的olap index
+    void _delete_unused_delta_files(std::vector<RowsetSharedPtr>* 
unused_rowsets);
 
     // 验证得到的m_need_merged_versions是否正确
     //
@@ -133,7 +134,7 @@ class CumulativeCompaction {
     OLAPStatus _validate_delete_file_action();
 
     // 恢复header头文件的文件版本和table的data source
-    OLAPStatus _roll_back(const std::vector<SegmentGroup*>& old_olap_indices);
+    OLAPStatus _roll_back(std::vector<RowsetSharedPtr>& old_olap_indices);
 
     void _obtain_header_rdlock() {
         _tablet->obtain_header_rdlock();
@@ -170,9 +171,11 @@ class CumulativeCompaction {
     // 新cumulative文件的version hash
     VersionHash _cumulative_version_hash;
     // 新cumulative文件对应的olap index
-    SegmentGroup* _new_segment_group;
+    RowsetSharedPtr _rowset;
+    RowsetBuilder* _builder;
     // 可合并的delta文件的data文件
-    std::vector<ColumnData*> _data_source;
+    std::vector<RowsetSharedPtr> _rowsets;
+    std::vector<RowsetReaderSharedPtr> _rs_readers;
     // 可合并的delta文件的版本
     std::vector<Version> _need_merged_versions;
 
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index d995c82c..a4f60957 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -20,13 +20,11 @@
 #include <memory>
 #include <vector>
 
-#include "olap/rowset/column_data.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/segment_group.h"
 #include "olap/tablet.h"
 #include "olap/reader.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/column_data_writer.h"
 
 using std::list;
 using std::string;
@@ -35,24 +33,24 @@ using std::vector;
 
 namespace doris {
 
-Merger::Merger(TabletSharedPtr tablet, SegmentGroup* segment_group, ReaderType 
type) : 
+Merger::Merger(TabletSharedPtr tablet, RowsetBuilder* builder, ReaderType 
type) : 
         _tablet(tablet),
-        _segment_group(segment_group),
+        _builder(builder),
         _reader_type(type),
         _row_count(0) {}
 
-OLAPStatus Merger::merge(const vector<ColumnData*>& olap_data_arr,
-                         uint64_t* merged_rows, uint64_t* filted_rows) {
+OLAPStatus Merger::merge(const vector<RowsetReaderSharedPtr>& rs_readers,
+                         const Version& version, uint64_t* merged_rows, 
uint64_t* filted_rows) {
     // Create and initiate reader for scanning and multi-merging specified
     // OLAPDatas.
     Reader reader;
     ReaderParams reader_params;
     reader_params.tablet = _tablet;
     reader_params.reader_type = _reader_type;
-    reader_params.olap_data_arr = olap_data_arr;
+    reader_params.rs_readers = rs_readers;
 
     if (_reader_type == READER_BASE_COMPACTION) {
-        reader_params.version = _segment_group->version();
+        reader_params.version = version;
     }
 
     if (OLAP_SUCCESS != reader.init(reader_params)) {
@@ -61,15 +59,6 @@ OLAPStatus Merger::merge(const vector<ColumnData*>& 
olap_data_arr,
         return OLAP_ERR_INIT_FAILED;
     }
 
-    // create and initiate writer for generating new index and data files.
-    unique_ptr<ColumnDataWriter> 
writer(ColumnDataWriter::create(_segment_group, false, 
_tablet->compress_kind(), _tablet->bloom_filter_fpp()));
-    DCHECK(writer != nullptr) << "memory error occur when creating writer";
-
-    if (NULL == writer) {
-        OLAP_LOG_WARNING("fail to allocate writer.");
-        return OLAP_ERR_MALLOC_ERROR;
-    }
-
     bool has_error = false;
     RowCursor row_cursor;
 
@@ -83,13 +72,13 @@ OLAPStatus Merger::merge(const vector<ColumnData*>& 
olap_data_arr,
     while (!has_error) {
         // Attach row cursor to the memory position of the row block being
         // written in writer.
-        if (OLAP_SUCCESS != writer->attached_by(&row_cursor)) {
-            OLAP_LOG_WARNING("attach row failed. [tablet='%s']",
-                    _tablet->full_name().c_str());
+        if (OLAP_SUCCESS != _builder->add_row(&row_cursor)) {
+            LOG(WARNING) << "add row to builder failed. tablet=" << 
_tablet->full_name();
             has_error = true;
             break;
+
         }
-        row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema(), 
writer->mem_pool());
+        row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema(), 
_builder->mem_pool());
 
         // Read one row into row_cursor
         OLAPStatus res = reader.next_row_with_aggregation(&row_cursor, &eof);
@@ -103,11 +92,10 @@ OLAPStatus Merger::merge(const vector<ColumnData*>& 
olap_data_arr,
         }
 
         // Goto next row position in the row block being written
-        writer->next(row_cursor);
         ++_row_count;
     }
 
-    if (OLAP_SUCCESS != writer->finalize()) {
+    if (OLAP_SUCCESS != _builder->flush()) {
         OLAP_LOG_WARNING("fail to finalize writer. [tablet='%s']",
                 _tablet->full_name().c_str());
         has_error = true;
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index c754be04..ccabf7f3 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -29,15 +29,15 @@ class ColumnData;
 class Merger {
 public:
     // parameter index is created by caller, and it is empty.
-    Merger(TabletSharedPtr tablet, SegmentGroup* index, ReaderType type);
+    Merger(TabletSharedPtr tablet, RowsetBuilder* builder, ReaderType type);
 
     virtual ~Merger() {};
 
     // @brief read from multiple OLAPData and SegmentGroup, then write into 
single OLAPData and SegmentGroup
     // @return  OLAPStatus: OLAP_SUCCESS or FAIL
     // @note it will take long time to finish.
-    OLAPStatus merge(const std::vector<ColumnData*>& olap_data_arr, 
-                     uint64_t* merged_rows, uint64_t* filted_rows);
+    OLAPStatus merge(const std::vector<RowsetReaderSharedPtr>& rs_readers, 
+                     const Version& version, uint64_t* merged_rows, uint64_t* 
filted_rows);
 
     // 获取在做merge过程中累积的行数
     uint64_t row_count() {
@@ -45,7 +45,7 @@ class Merger {
     }
 private:
     TabletSharedPtr _tablet;
-    SegmentGroup* _segment_group;
+    RowsetBuilder* _builder;
     ReaderType _reader_type;
     uint64_t _row_count;
     Version _simple_merge_version;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index a2f99106..3626e93b 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -308,13 +308,13 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
         return res;
     }
 
-    res = _acquire_data_sources(read_params);
+    res = _capture_rs_readers(read_params);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("fail to init reader when acquire data 
sources.[res=%d]", res);      
         return res;
     }
 
-    for (auto i_data: _data_sources) {
+    for (auto i_data : _rs_readers) {
         i_data->set_stats(&_stats);
     }
 
@@ -468,7 +468,7 @@ void Reader::close() {
     VLOG(3) << "merged rows:" << _merged_rows;
     _conditions.finalize();
     _delete_handler.finalize();
-    _tablet->release_data_sources(&_own_data_sources);
+    _tablet->release_rs_readers(&_own_rs_readers);
 
     for (auto pred : _col_predicates) {
         delete pred;
@@ -477,23 +477,23 @@ void Reader::close() {
     delete _collect_iter;
 }
 
-OLAPStatus Reader::_acquire_data_sources(const ReaderParams& read_params) {
-    const std::vector<ColumnData*>* data_sources;
+OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
+    const std::vector<RowsetReaderSharedPtr>* rs_readers;
     if (read_params.reader_type == READER_ALTER_TABLE
             || read_params.reader_type == READER_BASE_COMPACTION
             || read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
-        data_sources = &read_params.olap_data_arr;
+        rs_readers = &read_params.rs_readers;
     } else {
         _tablet->obtain_header_rdlock();
-        _tablet->acquire_data_sources(_version, &_own_data_sources);
+        _tablet->capture_rs_readers(_version, &_own_rs_readers);
         _tablet->release_header_lock();
 
-        if (_own_data_sources.size() < 1) {
+        if (_own_rs_readers.size() < 1) {
             LOG(WARNING) << "fail to acquire data sources. [table_name='" << 
_tablet->full_name()
                          << "' version=" << _version.first << "-" << 
_version.second << "]";
             return OLAP_ERR_VERSION_NOT_EXIST;
         }
-        data_sources = &_own_data_sources;
+        rs_readers = &_own_rs_readers;
     }
     
     // do not use index stream cache when be/ce/alter/checksum,
@@ -503,7 +503,7 @@ OLAPStatus Reader::_acquire_data_sources(const 
ReaderParams& read_params) {
         is_using_cache = false;
     }
 
-    for (auto i_data: *data_sources) {
+    for (auto i_data: *rs_readers) {
         // skip empty version
         if (i_data->empty() || i_data->zero_num_rows()) {
             continue;
@@ -538,7 +538,7 @@ OLAPStatus Reader::_acquire_data_sources(const 
ReaderParams& read_params) {
                     << i_data->version().first << ", " << 
i_data->version().second;
             i_data->set_delete_status(DEL_NOT_SATISFIED);
         }
-        _data_sources.push_back(i_data);
+        _rs_readers.push_back(i_data);
     }
 
     return OLAP_SUCCESS;
@@ -712,7 +712,7 @@ OLAPStatus Reader::_attach_data_to_merge_set(bool first, 
bool *eof) {
             return res;
         }
 
-        for (auto data : _data_sources) {
+        for (auto data : _rs_readers) {
             RowBlock* block = nullptr;
             auto res = data->prepare_block_read(
                 start_key, find_last_row, end_key, end_key_find_last_row, 
&block);
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 53b1fe6d..4605bbb4 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -60,7 +60,7 @@ struct ReaderParams {
     std::vector<OlapTuple> end_key;
     std::vector<TCondition> conditions;
     // The ColumnData will be set when using Merger, eg Cumulative, BE.
-    std::vector<ColumnData*> olap_data_arr;
+    std::vector<RowsetReaderSharedPtr> rs_readers;
     std::vector<uint32_t> return_columns;
     RuntimeProfile* profile;
     RuntimeState* runtime_state;
@@ -73,7 +73,7 @@ struct ReaderParams {
         start_key.clear();
         end_key.clear();
         conditions.clear();
-        olap_data_arr.clear();
+        rs_readers.clear();
     }
 
     std::string to_string() {
@@ -165,7 +165,7 @@ class Reader {
 
     OLAPStatus _init_params(const ReaderParams& read_params);
 
-    OLAPStatus _acquire_data_sources(const ReaderParams& read_params);
+    OLAPStatus _capture_rs_readers(const ReaderParams& read_params);
 
     OLAPStatus _init_keys_param(const ReaderParams& read_params);
 
@@ -202,10 +202,10 @@ class Reader {
 
     TabletSharedPtr _tablet;
 
-    // _own_data_sources is data source that reader aquire from tablet, so we 
need to
+    // _own_rs_readers is data source that reader aquire from tablet, so we 
need to
     // release these when reader closing
-    std::vector<ColumnData*> _own_data_sources;
-    std::vector<ColumnData*> _data_sources;
+    std::vector<RowsetReaderSharedPtr> _own_rs_readers;
+    std::vector<RowsetReaderSharedPtr> _rs_readers;
 
     KeysParam _keys_param;
     int32_t _next_key_index;
diff --git a/be/src/olap/rowset/alpha_rowset.h 
b/be/src/olap/rowset/alpha_rowset.h
index 87d4af06..620af910 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -62,6 +62,15 @@ class AlphaRowset : public Rowset {
 
     virtual size_t num_rows() const;
 
+    virtual Version version() const;
+
+    virtual VersionHash version_hash() const;
+
+    virtual bool in_use() const;
+
+    virtual RowsetId rowset_id() const;
+
+    virtual bool delete_files() const;
 private:
     OLAPStatus _init_segment_groups();
 
diff --git a/be/src/olap/rowset/alpha_rowset_builder.cpp 
b/be/src/olap/rowset/alpha_rowset_builder.cpp
index 27590889..d6bc2a07 100644
--- a/be/src/olap/rowset/alpha_rowset_builder.cpp
+++ b/be/src/olap/rowset/alpha_rowset_builder.cpp
@@ -32,17 +32,29 @@ OLAPStatus AlphaRowsetBuilder::init(const 
RowsetBuilderContext& rowset_builder_c
     _init();
     _current_rowset_meta->set_rowset_id(_rowset_builder_context.rowset_id);
     _current_rowset_meta->set_tablet_id(_rowset_builder_context.tablet_id);
-    _current_rowset_meta->set_txn_id(_rowset_builder_context.txn_id);
     
_current_rowset_meta->set_tablet_schema_hash(_rowset_builder_context.tablet_schema_hash);
     _current_rowset_meta->set_rowset_type(_rowset_builder_context.rowset_type);
     _current_rowset_meta->set_rowset_state(PREPARING);
     
_current_rowset_meta->set_rowset_path(_rowset_builder_context.rowset_path_prefix);
-    _current_rowset_meta->set_version(_rowset_builder_context.version);
-    
_current_rowset_meta->set_version_hash(_rowset_builder_context.version_hash);
-    _current_rowset_meta->set_load_id(_rowset_builder_context.load_id);
     return OLAP_SUCCESS;
 }
 
+void AlphaRowsetBuilder::set_txn_id(const int64_t& txn_id) {
+    _current_rowset_meta->set_txn_id(txn_id);
+}
+
+void AlphaRowsetBuilder::set_load_id(const PUniqueId& load_id) {
+    _current_rowset_meta->set_load_id(load_id);
+}
+
+void AlphaRowsetBuilder::set_version(const Version& version) {
+    _current_rowset_meta->set_version(version);
+}
+
+void AlphaRowsetBuilder::set_version_hash(const VersionHash& version_hash) {
+    _current_rowset_meta->set_version_hash(version_hash);
+}
+
 OLAPStatus AlphaRowsetBuilder::add_row(RowCursor* row) {
     OLAPStatus status = _column_data_writer->attached_by(row);
     if (status != OLAP_SUCCESS) {
@@ -66,9 +78,9 @@ std::shared_ptr<Rowset> AlphaRowsetBuilder::build() {
         PendingSegmentGroupPB pending_segment_group_pb;
         
pending_segment_group_pb.set_pending_segment_group_id(segment_group->segment_group_id());
         
pending_segment_group_pb.set_num_segments(segment_group->num_segments());
-        PUniqueId* unique_id = pending_segment_group_pb.mutable_load_id();
-        unique_id->set_hi(_rowset_builder_context.load_id.hi());
-        unique_id->set_lo(_rowset_builder_context.load_id.lo());
+        //PUniqueId* unique_id = pending_segment_group_pb.mutable_load_id();
+        //unique_id->set_hi(_rowset_builder_context.load_id.hi());
+        //unique_id->set_lo(_rowset_builder_context.load_id.lo());
         pending_segment_group_pb.set_empty(segment_group->empty());
         const std::vector<KeyRange>* column_statistics = 
&(segment_group->get_column_statistics());
         if (column_statistics != nullptr) {
@@ -100,10 +112,10 @@ void AlphaRowsetBuilder::_init() {
             _rowset_builder_context.num_rows_per_row_block,
             _rowset_builder_context.rowset_path_prefix,
             false, _segment_group_id, 0, true,
-            _rowset_builder_context.partition_id, 
_rowset_builder_context.txn_id);
+            _rowset_builder_context.partition_id, 0);
     DCHECK(_cur_segment_group != nullptr) << "failed to malloc SegmentGroup";
     _cur_segment_group->acquire();
-    _cur_segment_group->set_load_id(_rowset_builder_context.load_id);
+    //_cur_segment_group->set_load_id(_rowset_builder_context.load_id);
     _segment_groups.push_back(_cur_segment_group);
 
     _column_data_writer = ColumnDataWriter::create(_cur_segment_group, true,
diff --git a/be/src/olap/rowset/alpha_rowset_builder.h 
b/be/src/olap/rowset/alpha_rowset_builder.h
index 6cb431c4..f9d7f231 100644
--- a/be/src/olap/rowset/alpha_rowset_builder.h
+++ b/be/src/olap/rowset/alpha_rowset_builder.h
@@ -32,6 +32,10 @@ class AlphaRowsetBuilder : public RowsetBuilder {
     AlphaRowsetBuilder();
 
     virtual OLAPStatus init(const RowsetBuilderContext& 
rowset_builder_context);
+    virtual void set_txn_id(const int64_t& txn_id);
+    virtual void set_load_id(const PUniqueId& load_id);
+    virtual void set_version(const Version& version);
+    virtual void set_version_hash(const VersionHash& version_hash);
 
     // add a row block to rowset
     virtual OLAPStatus add_row(RowCursor* row);
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 26c55501..4ae4353a 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -58,6 +58,16 @@ class Rowset {
     virtual bool zero_num_rows() const = 0;
 
     virtual size_t num_rows() const = 0;
+
+    virtual Version version() const = 0;
+
+    virtual VersionHash version_hash() const = 0;
+
+    virtual bool in_use() const = 0;
+
+    virtual RowsetId rowset_id() const = 0;
+
+    virtual bool delete_files() const = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_builder.h 
b/be/src/olap/rowset/rowset_builder.h
index ec942e02..cb4baf65 100644
--- a/be/src/olap/rowset/rowset_builder.h
+++ b/be/src/olap/rowset/rowset_builder.h
@@ -23,26 +23,23 @@
 #include "olap/schema.h"
 #include "olap/row_block.h"
 #include "gen_cpp/types.pb.h"
+#include "runtime/mem_pool.h"
 
 namespace doris {
 
 class Rowset;
 
 struct RowsetBuilderContext {
+    int64_t partition_id;
     int64_t tablet_id;
-    int tablet_schema_hash;
+    int64_t tablet_schema_hash;
     int64_t rowset_id;
     RowsetTypePB rowset_type;
     std::string rowset_path_prefix;
     RowFields tablet_schema;
-    int64_t partition_id;
-    int64_t txn_id;
-    int num_key_fields;
-    int num_short_key_fields;
-    int num_rows_per_row_block;
-    Version version;
-    VersionHash version_hash;
-    PUniqueId load_id;
+    size_t num_key_fields;
+    size_t num_short_key_fields;
+    size_t num_rows_per_row_block;
     CompressKind compress_kind;
     double bloom_filter_fpp;
 };
@@ -52,6 +49,10 @@ class RowsetBuilder {
     virtual ~RowsetBuilder() { }
     
     virtual OLAPStatus init(const RowsetBuilderContext& 
rowset_builder_context) = 0;
+    virtual void set_txn_id(const int64_t& txn_id) = 0;
+    virtual void set_load_id(const PUniqueId& load_id) = 0; 
+    virtual void set_version(const Version& version) = 0; 
+    virtual void set_version_hash(const VersionHash& version_hash) = 0;
 
     // add a row to rowset
     virtual OLAPStatus add_row(RowCursor* row_block) = 0;
@@ -60,6 +61,7 @@ class RowsetBuilder {
 
     // get a rowset
     virtual std::shared_ptr<Rowset> build() = 0;
+    virtual MemPool* mem_pool();
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_id_generator.cpp 
b/be/src/olap/rowset/rowset_id_generator.cpp
index 85257e12..495a77bb 100644
--- a/be/src/olap/rowset/rowset_id_generator.cpp
+++ b/be/src/olap/rowset/rowset_id_generator.cpp
@@ -34,7 +34,7 @@ RowsetIdGenerator RowsetIdGenerator::instance() {
     return _s_instance;
 }
 
-OLAPStatus RowsetIdGenerator::get_next_id(DataDir* dir, RowsetId& 
gen_rowset_id) {
+OLAPStatus RowsetIdGenerator::get_next_id(DataDir* dir, RowsetId* 
gen_rowset_id) {
     WriteLock wrlock(&_ids_lock);
     // if could not find the dir in map, then load the start id from meta
     RowsetId batch_end_id = 10000;
@@ -71,9 +71,9 @@ OLAPStatus RowsetIdGenerator::get_next_id(DataDir* dir, 
RowsetId& gen_rowset_id)
     } 
 
     start_end_id.first = start_end_id.first + 1;
-    gen_rowset_id = start_end_id.first;
+    *gen_rowset_id = start_end_id.first;
 
     return OLAP_SUCCESS;
 } // get_next_id
 
-} // doris
\ No newline at end of file
+} // doris
diff --git a/be/src/olap/rowset/rowset_id_generator.h 
b/be/src/olap/rowset/rowset_id_generator.h
index e8e2cbbc..6c365a58 100644
--- a/be/src/olap/rowset/rowset_id_generator.h
+++ b/be/src/olap/rowset/rowset_id_generator.h
@@ -33,7 +33,7 @@ class RowsetIdGenerator {
     // generator a id according to data dir
     // rowsetid is not globally unique, it is dir level
     // it saves the batch end id into meta env
-    OLAPStatus get_next_id(DataDir* dir, RowsetId& rowset_id); 
+    OLAPStatus get_next_id(DataDir* dir, RowsetId* rowset_id); 
 
 private:
     RowsetIdGenerator(){}
@@ -43,7 +43,7 @@ class RowsetIdGenerator {
     std::map<DataDir*, std::pair<RowsetId,RowsetId>> _dir_ids; 
     static RowsetIdGenerator* _s_instance;
     static std::mutex _mlock;
-} // RowsetIdGenerator
+}; // RowsetIdGenerator
 
 } // namespace doris
-#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_ID_GENERATOR_H
\ No newline at end of file
+#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_ID_GENERATOR_H
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 387e553a..838c660a 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -32,7 +32,7 @@ namespace doris {
 
 class RowsetMeta;
 using RowsetMetaSharedPtr = std::shared_ptr<RowsetMeta>;
-typedef uint64_t RowsetId;
+typedef int64_t RowsetId;
 
 class RowsetMeta {
 public:
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index 64f8c1d6..c6ba7788 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -33,6 +33,9 @@
 
 namespace doris {
 
+class RowsetReader;
+using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
+
 struct ReaderContext {
        const RowFields& tablet_schema;
     // projection columns
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index f16ba161..5d786f9b 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1177,7 +1177,7 @@ bool SchemaChangeWithSorting::_external_sorting(
 
     uint64_t merged_rows = 0;
     uint64_t filted_rows = 0;
-    vector<ColumnData*> olap_data_arr;
+    vector<RowsetReaderSharedPtr> rs_readers;
 
     for (vector<SegmentGroup*>::iterator it = src_segment_groups.begin();
             it != src_segment_groups.end(); ++it) {
@@ -1187,7 +1187,7 @@ bool SchemaChangeWithSorting::_external_sorting(
             goto EXTERNAL_SORTING_ERR;
         }
 
-        olap_data_arr.push_back(olap_data);
+        rs_readers.push_back(olap_data);
 
         if (OLAP_SUCCESS != olap_data->init()) {
             OLAP_LOG_WARNING("fail to initial olap data. [version='%d-%d' 
tablet='%s']",
@@ -1198,7 +1198,7 @@ bool SchemaChangeWithSorting::_external_sorting(
         }
     }
 
-    if (OLAP_SUCCESS != merger.merge(olap_data_arr, &merged_rows, 
&filted_rows)) {
+    if (OLAP_SUCCESS != merger.merge(rs_readers, &merged_rows, &filted_rows)) {
         OLAP_LOG_WARNING("fail to merge deltas. [tablet='%s' version='%d-%d']",
                          _tablet->full_name().c_str(),
                          dest_segment_group->version().first,
@@ -1216,16 +1216,16 @@ bool SchemaChangeWithSorting::_external_sorting(
         goto EXTERNAL_SORTING_ERR;
     }
 
-    for (vector<ColumnData*>::iterator it = olap_data_arr.begin();
-            it != olap_data_arr.end(); ++it) {
+    for (vector<RowsetReaderSharedPtr>::iterator it = rs_readers.begin();
+            it != rs_readers.end(); ++it) {
         SAFE_DELETE(*it);
     }
 
     return true;
 
 EXTERNAL_SORTING_ERR:
-    for (vector<ColumnData*>::iterator it = olap_data_arr.begin();
-            it != olap_data_arr.end(); ++it) {
+    for (vector<RowsetReaderSharedPtr>::iterator it = rs_readers.begin();
+            it != rs_readers.end(); ++it) {
         SAFE_DELETE(*it);
     }
 
@@ -1498,7 +1498,7 @@ OLAPStatus SchemaChangeHandler::_do_alter_tablet(
     }
 
     vector<Version> versions_to_be_changed;
-    vector<ColumnData*> olap_data_arr;
+    vector<RowsetReaderSharedPtr> rs_readers;
     // delete handlers for new tablet
     DeleteHandler delete_handler;
     do {
@@ -1523,22 +1523,20 @@ OLAPStatus SchemaChangeHandler::_do_alter_tablet(
         }
 
         // acquire data sources correspond to history versions
-        ref_tablet->acquire_data_sources_by_versions(
-                versions_to_be_changed, &olap_data_arr);
-        if (olap_data_arr.size() < 1) {
-            OLAP_LOG_WARNING("fail to acquire all data sources."
-                             "[version_num=%d data_source_num=%d]",
-                             versions_to_be_changed.size(),
-                             olap_data_arr.size());
+        ref_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers);
+        if (rs_readers.size() < 1) {
+            LOG(WARNING) << "fail to acquire all data sources. "
+                         << "version_num=" << versions_to_be_changed.size()
+                         << ", data_source_num=" << rs_readers.size();
             res = OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS;
             break;
         }
 
         // init one delete handler
         int32_t end_version = -1;
-        for (size_t i = 0; i < olap_data_arr.size(); ++i) {
-            if (olap_data_arr[i]->version().second > end_version) {
-                end_version = olap_data_arr[i]->version().second;
+        for (size_t i = 0; i < rs_readers.size(); ++i) {
+            if (rs_readers[i]->version().second > end_version) {
+                end_version = rs_readers[i]->version().second;
             }
         }
 
@@ -1563,7 +1561,7 @@ OLAPStatus SchemaChangeHandler::_do_alter_tablet(
         sc_params.alter_tablet_type = type;
         sc_params.ref_tablet = ref_tablet;
         sc_params.new_tablet = new_tablet;
-        sc_params.ref_olap_data_arr = olap_data_arr;
+        sc_params.ref_olap_data_arr = rs_readers;
         sc_params.delete_handler = delete_handler;
 
 
@@ -2132,8 +2130,8 @@ OLAPStatus 
SchemaChangeHandler::_alter_tablet(SchemaChangeParams* sc_params) {
             << ", version=" << (*it)->version().first << "-" << 
(*it)->version().second;
 
         // 释放ColumnData
-        vector<ColumnData*> olap_data_to_be_released(it, it + 1);
-        sc_params->ref_tablet->release_data_sources(&olap_data_to_be_released);
+        vector<RowsetReaderSharedPtr> olap_data_to_be_released(it, it + 1);
+        sc_params->ref_tablet->release_rs_readers(&olap_data_to_be_released);
 
         it = sc_params->ref_olap_data_arr.erase(it); // after erasing, it will 
point to end()
     }
@@ -2179,7 +2177,7 @@ OLAPStatus 
SchemaChangeHandler::_alter_tablet(SchemaChangeParams* sc_params) {
                 << "status=" << 
sc_params->ref_tablet->schema_change_status().status;
     }
 
-    
sc_params->ref_tablet->release_data_sources(&(sc_params->ref_olap_data_arr));
+    sc_params->ref_tablet->release_rs_readers(&(sc_params->ref_olap_data_arr));
     SAFE_DELETE(sc_procedure);
 
     LOG(INFO) << "finish to process alter tablet job. res=" << res;
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index f058b81f..47efde1a 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -298,7 +298,7 @@ class SchemaChangeHandler {
         AlterTabletType alter_tablet_type;
         TabletSharedPtr ref_tablet;
         TabletSharedPtr new_tablet;
-        std::vector<ColumnData*> ref_olap_data_arr;
+        std::vector<RowsetReaderSharedPtr> ref_olap_data_arr;
         std::string debug_message;
         DeleteHandler delete_handler;
         // TODO(zc): fuck me please, I don't add mutable here, but no where
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index d9e84ede..c39e51d5 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -264,7 +264,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
     ref_tablet->obtain_header_rdlock();
     header_locked = true;
 
-    vector<ColumnData*> olap_data_sources;
+    vector<RowsetReaderSharedPtr> rs_readers;
     TabletMeta* new_tablet_meta = nullptr;
     do {
         // get latest version
@@ -309,8 +309,8 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
         }
 
         // get data source and add reference count for prevent to delete data 
files
-        ref_tablet->acquire_data_sources_by_versions(shortest_path, 
&olap_data_sources);
-        if (olap_data_sources.size() == 0) {
+        ref_tablet->capture_rs_readers(shortest_path, &rs_readers);
+        if (rs_readers.empty()) {
             OLAP_LOG_WARNING("failed to acquire data sources. [tablet='%s', 
version=%d]",
                     ref_tablet->full_name().c_str(), version);
             res = OLAP_ERR_OTHER_ERROR;
@@ -384,7 +384,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(
 
     if (ref_tablet.get() != NULL) {
         VLOG(10) << "release data sources.";
-        ref_tablet->release_data_sources(&olap_data_sources);
+        ref_tablet->release_rs_readers(&rs_readers);
     }
 
     if (res != OLAP_SUCCESS) {
@@ -596,5 +596,5 @@ OLAPStatus SnapshotManager::_create_hard_link(const string& 
from_path, const str
         return OLAP_ERR_OTHER_ERROR;
     }
 }
-  
+
 }  // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index e80c78c2..bb1205db 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -819,7 +819,6 @@ OLAPStatus StorageEngine::_do_sweep(
     return res;
 }
 
-
 void StorageEngine::start_delete_unused_index() {
     _gc_mutex.lock();
 
@@ -856,6 +855,31 @@ void StorageEngine::add_unused_index(SegmentGroup* 
segment_group) {
     _gc_mutex.unlock();
 }
 
+void StorageEngine::start_delete_unused_rowset() {
+    _gc_mutex.lock();
+
+    auto it = _unused_rowsets.begin();
+    for (; it != _unused_rowsets.end();) { 
+        if (it->second->in_use()) {
+            ++it;
+        } else {
+            it->second->delete_files();
+            _unused_rowsets.erase(it);
+        }
+    }
+
+    _gc_mutex.unlock();
+}
+
+void StorageEngine::add_unused_rowset(RowsetSharedPtr rowset) {
+    _gc_mutex.lock();
+    auto it = _unused_rowsets.find(rowset->rowset_id());
+    if (it == _unused_rowsets.end()) {
+        _unused_rowsets[rowset->rowset_id()] = rowset;
+    }
+    _gc_mutex.unlock();
+}
+
 // TODO(zc): refactor this funciton
 OLAPStatus StorageEngine::create_tablet(const TCreateTabletReq& request) {
     
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index ed315387..19e70824 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -153,8 +153,10 @@ class StorageEngine {
     }
 
     void start_delete_unused_index();
+    void start_delete_unused_rowset();
 
     void add_unused_index(SegmentGroup* olap_index);
+    void add_unused_rowset(RowsetSharedPtr rowset);
 
     OLAPStatus recover_tablet_until_specfic_version(
         const TRecoverTabletReq& recover_tablet_req);
@@ -301,6 +303,7 @@ class StorageEngine {
     static StorageEngine* _s_instance;
 
     std::unordered_map<SegmentGroup*, std::vector<std::string>> _gc_files;
+    std::unordered_map<int64_t, RowsetSharedPtr> _unused_rowsets;
     Mutex _gc_mutex;
 
     // thread to monitor snapshot expiry
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 0ae56f73..aff4290d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -105,56 +105,101 @@ bool Tablet::can_do_compaction() {
     return true;
 }
 
-OLAPStatus Tablet::capture_consistent_versions(
-                        const Version& version, vector<Version>* 
span_versions) const {
-    OLAPStatus status = _rs_graph->capture_consistent_versions(version, 
span_versions);
-    if (status != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to generate shortest version path. tablet=" << 
full_name()
-                     << ", version='" << version.first << "-" << 
version.second;
+OLAPStatus Tablet::compute_all_versions_hash(const vector<Version>& versions,
+                                             VersionHash* version_hash) const {
+    DCHECK(version_hash != nullptr) << "invalid parameter, version_hash is 
nullptr";
+
+    int64_t v_hash  = 0L;
+    for (auto version : versions) {
+        auto it = _rs_version_map.find(version);
+        if (it == _rs_version_map.end()) {
+            LOG(WARNING) << "fail to find Rowset. "
+                << "version=" << version.first << "-" << version.second;
+            return OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR;
+        }
+        v_hash ^= it->second->version_hash();
     }
-    return status;
+    *version_hash = v_hash;
+    return OLAP_SUCCESS;
 }
 
 OLAPStatus Tablet::capture_consistent_rowsets(const Version& spec_version,
-                                             
vector<std::shared_ptr<RowsetReader>>* rs_readers) {
+                                              vector<RowsetSharedPtr>* 
rowsets) const {
     vector<Version> version_path;
     _rs_graph->capture_consistent_versions(spec_version, &version_path);
 
-    acquire_rs_reader_by_version(version_path, rs_readers);
+    capture_consistent_rowsets(version_path, rowsets);
     return OLAP_SUCCESS;
 }
 
-void Tablet::acquire_rs_reader_by_version(const vector<Version>& version_vec,
-                                          
vector<std::shared_ptr<RowsetReader>>* rs_readers) const {
-    DCHECK(rs_readers != NULL && rs_readers->empty());
-    for (auto version : version_vec) {
-        auto it2 = _rs_version_map.find(version);
-        if (it2 == _rs_version_map.end()) {
+OLAPStatus Tablet::capture_consistent_rowsets(const vector<Version>& 
version_path,
+                                        vector<RowsetSharedPtr>* rowsets) 
const {
+    DCHECK(rowsets != nullptr && rowsets->empty());
+    for (auto& version : version_path) {
+        auto it = _rs_version_map.find(version);
+        if (it == _rs_version_map.end()) {
             LOG(WARNING) << "fail to find Rowset for version. tablet=" << 
full_name()
                          << ", version='" << version.first << "-" << 
version.second;
-            release_rs_readers(rs_readers);
-            return;
+            release_rowsets(rowsets);
+            return OLAP_SUCCESS;
         }
 
-        std::shared_ptr<RowsetReader> rs_reader(RowsetReader::create());
-        OLAPStatus status = rs_reader->init(nullptr);
-        if (status != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to init rowset_reader. tablet=" << 
full_name()
-                         << ", version=" << version.first << "-" << 
version.second;
+        rowsets->push_back(it->second);
+    }
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus Tablet::release_rowsets(vector<RowsetSharedPtr>* rowsets) const {
+    DCHECK(rowsets != nullptr) << "rowsets is null. tablet=" << full_name();
+    rowsets->clear();
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus Tablet::capture_rs_readers(const Version& spec_version,
+                                      vector<RowsetReaderSharedPtr>* 
rs_readers) const {
+    vector<Version> version_path;
+    _rs_graph->capture_consistent_versions(spec_version, &version_path);
+
+    capture_rs_readers(version_path, rs_readers);
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus Tablet::capture_rs_readers(const vector<Version>& version_path,
+                                vector<RowsetReaderSharedPtr>* rs_readers) 
const {
+    DCHECK(rs_readers != NULL && rs_readers->empty());
+    for (auto version : version_path) {
+        auto it = _rs_version_map.find(version);
+        if (it == _rs_version_map.end()) {
+            LOG(WARNING) << "fail to find Rowset for version. tablet=" << 
full_name()
+                         << ", version='" << version.first << "-" << 
version.second;
             release_rs_readers(rs_readers);
+            return OLAP_SUCCESS;
         }
-        rs_readers->push_back(std::move(rs_reader));
+
+        std::shared_ptr<RowsetReader> rs_reader(it->second->create_reader());
+        rs_readers->push_back(rs_reader);
     }
+    return OLAP_SUCCESS;
 }
 
-OLAPStatus Tablet::release_rs_readers(vector<std::shared_ptr<RowsetReader>>* 
rs_readers) const {
+OLAPStatus Tablet::release_rs_readers(vector<RowsetReaderSharedPtr>* 
rs_readers) const {
     DCHECK(rs_readers != nullptr) << "rs_readers is null. tablet=" << 
full_name();
     rs_readers->clear();
     return OLAP_SUCCESS;
 }
 
+OLAPStatus Tablet::capture_consistent_versions(
+                        const Version& version, vector<Version>* 
span_versions) const {
+    OLAPStatus status = _rs_graph->capture_consistent_versions(version, 
span_versions);
+    if (status != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to generate shortest version path. tablet=" << 
full_name()
+                     << ", version='" << version.first << "-" << 
version.second;
+    }
+    return status;
+}
+
 OLAPStatus Tablet::add_inc_rowset(const Rowset& rowset) {
-    return _tablet_meta.add_inc_rs_meta(rowset.get_rs_meta());
+    return _tablet_meta.add_inc_rs_meta(rowset.rowset_meta());
 }
 
 OLAPStatus Tablet::delete_expired_inc_rowset() {
@@ -219,8 +264,9 @@ void Tablet::calc_missed_versions(int64_t spec_version,
     }
 }
 
-OLAPStatus Tablet::modify_rowsets(vector<RowsetSharedPtr>& to_add,
-                                 vector<RowsetSharedPtr>& to_delete) {
+OLAPStatus Tablet::modify_rowsets(std::vector<Version>* old_version,
+                                  vector<RowsetSharedPtr>* to_add,
+                                  vector<RowsetSharedPtr>* to_delete) {
     return OLAP_SUCCESS;
 }
 
@@ -232,8 +278,8 @@ RowsetSharedPtr Tablet::rowset_with_largest_size() {
         if (largest_rowset->empty() || largest_rowset->zero_num_rows()) {
             continue;
         }
-        if (it.second->get_rs_meta()->get_index_disk_size()
-                > largest_rowset->get_rs_meta()->get_index_disk_size()) {
+        if (it.second->rowset_meta()->index_disk_size()
+                > largest_rowset->rowset_meta()->index_disk_size()) {
             largest_rowset = it.second;
         }
     }
@@ -431,15 +477,15 @@ size_t Tablet::get_row_size() const {
 size_t Tablet::get_data_size() const {
     size_t total_size = 0;
     for (auto& it : _rs_version_map) {
-        total_size += it.second->get_data_disk_size();
+        total_size += it.second->data_disk_size();
     }
     return total_size;
 }
 
-size_t Tablet::get_num_rows() const {
+size_t Tablet::num_rows() const {
     size_t num_rows = 0;
     for (auto& it : _rs_version_map) {
-        num_rows += it.second->get_num_rows();
+        num_rows += it.second->num_rows();
     }
     return num_rows;
 }
@@ -531,7 +577,7 @@ VersionEntity Tablet::get_version_entity_by_version(const 
Version& version) {
 
 size_t Tablet::get_version_data_size(const Version& version) {
     RowsetSharedPtr rowset = _rs_version_map[version];
-    return rowset->get_data_disk_size();
+    return rowset->data_disk_size();
 }
 
 OLAPStatus Tablet::recover_tablet_until_specfic_version(const int64_t& 
spec_version,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index b4046131..ef7ae1bc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -76,10 +76,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
 
     OLAPStatus select_versions_to_span(const Version& version,
                                        std::vector<Version>* span_versions) 
const;
-    void acquire_data_sources(const Version& version, 
std::vector<ColumnData*>* sources) const;
-    void acquire_data_sources_by_versions(const std::vector<Version>& 
version_list,
-                                          std::vector<ColumnData*>* sources) 
const;
-    OLAPStatus release_data_sources(std::vector<ColumnData*>* data_sources) 
const;
     OLAPStatus register_data_source(const std::vector<SegmentGroup*>& 
segment_group_vec);
     OLAPStatus unregister_data_source(const Version& version, 
std::vector<SegmentGroup*>* segment_group_vec);
     OLAPStatus add_pending_version(int64_t partition_id, int64_t 
transaction_id,
@@ -99,9 +95,6 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
     OLAPStatus clone_data(const TabletMeta& clone_header,
                           const std::vector<const PDelta*>& clone_deltas,
                           const std::vector<Version>& versions_to_delete);
-    OLAPStatus replace_data_sources(const std::vector<Version>* old_versions,
-                                const std::vector<SegmentGroup*>* 
new_data_sources,
-                                std::vector<SegmentGroup*>* old_data_sources);
     OLAPStatus compute_all_versions_hash(const std::vector<Version>& versions,
                                          VersionHash* version_hash) const;
     OLAPStatus merge_tablet_meta(const TabletMeta& hdr, int to_version);
@@ -207,13 +200,19 @@ class Tablet : public 
std::enable_shared_from_this<Tablet> {
 
     OLAPStatus init_once();
     OLAPStatus capture_consistent_rowsets(const Version& spec_version,
-                                          
vector<std::shared_ptr<RowsetReader>>* rs_readers);
+                                          vector<RowsetSharedPtr>* rowsets) 
const;
+    OLAPStatus capture_consistent_rowsets(const vector<Version>& version_vec,
+                                          vector<RowsetSharedPtr>* rowsets) 
const;
+    OLAPStatus release_rowsets(vector<RowsetSharedPtr>* rowsets) const;
+    OLAPStatus capture_rs_readers(const Version& spec_version,
+                                  vector<RowsetReaderSharedPtr>* rs_readers) 
const;
+    OLAPStatus capture_rs_readers(const vector<Version>& version_path,
+                                  vector<RowsetReaderSharedPtr>* rs_readers) 
const;
+    OLAPStatus release_rs_readers(vector<RowsetReaderSharedPtr>* rs_readers) 
const;
     OLAPStatus capture_consistent_versions(const Version& version, 
vector<Version>* span_versions) const;
-    void acquire_rs_reader_by_version(const vector<Version>& version_vec,
-                                      vector<std::shared_ptr<RowsetReader>>* 
rs_readers) const;
-    OLAPStatus release_rs_readers(vector<std::shared_ptr<RowsetReader>>* 
rs_readers) const;
-    OLAPStatus modify_rowsets(vector<RowsetSharedPtr>& to_add,
-                             vector<RowsetSharedPtr>& to_delete);
+    OLAPStatus modify_rowsets(std::vector<Version>* old_version,
+                              vector<RowsetSharedPtr>* to_add,
+                              vector<RowsetSharedPtr>* to_delete);
 
     const int64_t table_id() const;
     const std::string table_name() const;
@@ -240,7 +239,7 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
     size_t get_index_size() const;
     size_t all_rowsets_size() const;
     size_t get_data_size() const;
-    size_t get_num_rows() const;
+    size_t num_rows() const;
     size_t get_rowset_size(const Version& version);
 
     AlterTabletState alter_tablet_state();
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index dd3d6950..2fbc6e19 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -974,7 +974,7 @@ void TabletManager::_build_tablet_info(TabletSharedPtr 
tablet, TTabletInfo* tabl
     tablet_info->schema_hash = tablet->schema_hash();
 
     tablet->obtain_header_rdlock();
-    tablet_info->row_count = tablet->get_num_rows();
+    tablet_info->row_count = tablet->num_rows();
     tablet_info->data_size = tablet->get_data_size();
     const PDelta* last_file_version = tablet->lastest_version();
     if (last_file_version == NULL) {
@@ -1014,10 +1014,10 @@ void TabletManager::_build_tablet_stat() {
                 
             // we only get base tablet's stat
             stat.__set_data_size(tablet->get_data_size());
-            stat.__set_row_num(tablet->get_num_rows());
+            stat.__set_row_num(tablet->num_rows());
             VLOG(3) << "tablet_id=" << item.first 
                     << ", data_size=" << tablet->get_data_size()
-                    << ", row_num:" << tablet->get_num_rows();
+                    << ", row_num:" << tablet->num_rows();
             break;
         }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to