chaoyli closed pull request #437: Move tablet management code from 
StorageEngine to TabletManager
URL: https://github.com/apache/incubator-doris/pull/437
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/http/action/restore_tablet_action.cpp 
b/be/src/http/action/restore_tablet_action.cpp
index a7bf7629..c07e2810 100644
--- a/be/src/http/action/restore_tablet_action.cpp
+++ b/be/src/http/action/restore_tablet_action.cpp
@@ -178,7 +178,7 @@ Status RestoreTabletAction::_restore(const std::string& 
key, int64_t tablet_id,
 
     std::string root_path = 
OlapStore::get_root_path_from_schema_hash_path_in_trash(latest_tablet_path);
     OlapStore* store = StorageEngine::get_instance()->get_store(root_path);
-    std::string restore_schema_hash_path = 
store->get_tablet_schema_hash_path_from_header(&header);
+    std::string restore_schema_hash_path = 
store->get_absolute_tablet_path(&header, true);
     Status s = FileUtils::create_dir(restore_schema_hash_path);
     if (!s.ok()) {
         LOG(WARNING) << "create tablet path failed:" << 
restore_schema_hash_path;
@@ -198,7 +198,7 @@ Status RestoreTabletAction::_restore(const std::string& 
key, int64_t tablet_id,
         if (link_ret != 0) {
             LOG(WARNING) << "link from:" << from
                     << " to:" << to  << " failed, link ret:" << link_ret;
-            std::string restore_tablet_path = 
store->get_tablet_path_from_header(&header);
+            std::string restore_tablet_path = 
store->get_absolute_tablet_path(&header, false);
             LOG(WARNING) << "remove tablet_path:" << restore_tablet_path;
             Status s = FileUtils::remove_all(restore_tablet_path);
             if (!s.ok()) {
@@ -207,7 +207,7 @@ Status RestoreTabletAction::_restore(const std::string& 
key, int64_t tablet_id,
             return Status("create link path failed");
         }
     }
-    std::string restore_shard_path = 
store->get_shard_path_from_header(std::to_string(header.shard()));
+    std::string restore_shard_path = 
store->get_absolute_shard_path(std::to_string(header.shard()));
     Status status = _reload_tablet(key, restore_shard_path, tablet_id, 
schema_hash);
     return status;
 }
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index a61b7f19..dbfe5d38 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -78,6 +78,7 @@ add_library(Olap STATIC
     stream_index_writer.cpp
     stream_name.cpp
     tablet.cpp
+    tablet_manager.cpp
     tablet_meta.cpp
     tablet_meta_manager.cpp
     txn_manager.cpp
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 47c08679..2b1e1cac 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -39,6 +39,27 @@ typedef int64_t VersionHash;
 typedef __int128 int128_t;
 typedef unsigned __int128 uint128_t;
 
+enum CompactionType {
+    BASE_COMPACTION = 1,
+    CUMULATIVE_COMPACTION = 2
+};
+
+struct RootPathInfo {
+    RootPathInfo():
+            capacity(1),
+            available(0),
+            data_used_capacity(0),
+            is_used(false) { }
+
+    std::string path;
+    int64_t path_hash;
+    int64_t capacity;                  // 总空间,单位字节
+    int64_t available;                 // 可用空间,单位字节
+    int64_t data_used_capacity;
+    bool is_used;                       // 是否可用标识
+    TStorageMedium::type storage_medium;  // 存储介质类型:SSD|HDD
+};
+
 struct TabletInfo {
     TabletInfo(
             TTabletId in_tablet_id,
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index afcbf144..61c49c79 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1687,8 +1687,7 @@ OLAPStatus SchemaChangeHandler::_create_new_tablet(
         // 4. Register tablet into store, so that we can manage tablet from
         // the perspective of root path.
         // Example: unregister all tables when a bad disk found.
-        res = StorageEngine::get_instance()->register_tablet_into_root_path(
-                new_tablet.get());
+        res = new_tablet->register_tablet_into_dir();
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to register tablet into root path. "
                              "[root_path='%s' tablet='%s']",
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 313b40ec..2d69ef03 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -78,10 +78,6 @@ const std::string HTTP_REQUEST_FILE_PARAM = "&file=";
 const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
 const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
 
-bool _sort_tablet_by_create_time(const TabletSharedPtr& a, const 
TabletSharedPtr& b) {
-    return a->creation_time() < b->creation_time();
-}
-
 static Status _validate_options(const EngineOptions& options) {
     if (options.store_paths.empty()) {
         return Status("store paths is empty");;
@@ -112,13 +108,12 @@ StorageEngine::StorageEngine(const EngineOptions& options)
         _effective_cluster_id(-1),
         _is_all_cluster_id_exist(true),
         _is_drop_tables(false),
-        _global_tablet_id(0),
         _index_stream_lru_cache(NULL),
-        _tablet_stat_cache_update_time_ms(0),
         _snapshot_base_id(0),
         _is_report_disk_state_already(false),
         _is_report_tablet_already(false),
-        _txn_mgr() {
+        _txn_mgr(),
+        _tablet_mgr() {
     if (_s_instance == nullptr) {
         _s_instance = this;
     }
@@ -140,7 +135,7 @@ OLAPStatus StorageEngine::_load_store(OlapStore* store) {
     }
     if (is_header_converted) {
         LOG(INFO) << "load header from meta";
-        OLAPStatus s = store->load_tables(this);
+        OLAPStatus s = store->load_tablets(this);
         LOG(INFO) << "load header from meta finished";
         if (s != OLAP_SUCCESS) {
             LOG(WARNING) << "there is failure when loading tablet headers, 
path:" << store_path;
@@ -204,66 +199,8 @@ OLAPStatus StorageEngine::_load_store(OlapStore* store) {
 OLAPStatus StorageEngine::load_one_tablet(
         OlapStore* store, TTabletId tablet_id, SchemaHash schema_hash,
         const string& schema_hash_path, bool force) {
-    stringstream header_name_stream;
-    header_name_stream << schema_hash_path << "/" << tablet_id << ".hdr";
-    string header_path = header_name_stream.str();
-    path boost_schema_hash_path(schema_hash_path);
-
-    if (access(header_path.c_str(), F_OK) != 0) {
-        LOG(WARNING) << "fail to find header file. [header_path=" << 
header_path << "]";
-        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
-        return OLAP_ERR_FILE_NOT_EXIST;
-    }
-
-    auto tablet = Tablet::create_from_header_file(
-            tablet_id, schema_hash, header_path, store);
-    if (tablet == NULL) {
-        LOG(WARNING) << "fail to load tablet. [header_path=" << header_path << 
"]";
-        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
-        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
-    }
-
-    if (tablet->lastest_version() == NULL && !tablet->is_schema_changing()) {
-        OLAP_LOG_WARNING("tablet not in schema change state without delta is 
invalid. "
-                         "[header_path=%s]",
-                         header_path.c_str());
-        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
-        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
-    }
-
-    // 这里不需要SAFE_DELETE(tablet),因为tablet指针已经在add_table中托管到smart pointer中
-    OLAPStatus res = OLAP_SUCCESS;
-    string table_name = tablet->full_name();
-    res = add_tablet(tablet_id, schema_hash, tablet, force);
-    if (res != OLAP_SUCCESS) {
-        // 插入已经存在的table时返回成功
-        if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) {
-            return OLAP_SUCCESS;
-        }
-
-        LOG(WARNING) << "failed to add tablet. [tablet=" << table_name << "]";
-        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
-    }
-
-    if (register_tablet_into_root_path(tablet.get()) != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to register tablet into root path. 
[root_path=%s]",
-                         schema_hash_path.c_str());
-
-        if (StorageEngine::get_instance()->drop_tablet(tablet_id, schema_hash) 
!= OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to drop tablet when create tablet failed. "
-                             "[tablet=%ld schema_hash=%d]",
-                             tablet_id, schema_hash);
-        }
-
-        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
-    }
-
-    // load pending data (for realtime push), will add transaction 
relationship into engine
-    tablet->load_pending_data();
-
-    VLOG(3) << "succeed to add tablet. tablet=" << tablet->full_name()
-            << ", path=" << schema_hash_path;
-    return OLAP_SUCCESS;
+    return _tablet_mgr.load_one_tablet(store, tablet_id, schema_hash,
+        schema_hash_path, force);
 }
 
 void StorageEngine::load_stores(const std::vector<OlapStore*>& stores) {
@@ -286,7 +223,7 @@ OLAPStatus StorageEngine::open() {
     // init store_map
     for (auto& path : _options.store_paths) {
         OlapStore* store = new OlapStore(path.path, path.capacity_bytes);
-        auto st = store->load();
+        auto st = store->init();
         if (!st.ok()) {
             LOG(WARNING) << "Store load failed, path=" << path.path;
             return OLAP_ERR_INVALID_ROOT_PATH;
@@ -305,7 +242,7 @@ OLAPStatus StorageEngine::open() {
     auto cache = new_lru_cache(config::file_descriptor_cache_capacity);
     if (cache == nullptr) {
         OLAP_LOG_WARNING("failed to init file descriptor LRUCache");
-        _tablet_map.clear();
+        _tablet_mgr.clear();
         return OLAP_ERR_INIT_FAILED;
     }
     FileHandler::set_fd_cache(cache);
@@ -315,7 +252,7 @@ OLAPStatus StorageEngine::open() {
     _index_stream_lru_cache = 
new_lru_cache(config::index_stream_cache_capacity);
     if (_index_stream_lru_cache == NULL) {
         OLAP_LOG_WARNING("failed to init index stream LRUCache");
-        _tablet_map.clear();
+        _tablet_mgr.clear();
         return OLAP_ERR_INIT_FAILED;
     }
 
@@ -329,7 +266,7 @@ OLAPStatus StorageEngine::open() {
     auto stores = get_stores();
     load_stores(stores);
     // 取消未完成的SchemaChange任务
-    _cancel_unfinished_schema_change();
+    _tablet_mgr.cancel_unfinished_schema_change();
 
     return OLAP_SUCCESS;
 }
@@ -345,6 +282,7 @@ void StorageEngine::_update_storage_medium_type_count() {
     }
 
     _available_storage_medium_type_count = 
available_storage_medium_types.size();
+    
_tablet_mgr.update_storage_medium_type_count(_available_storage_medium_type_count);
 }
 
 
@@ -441,22 +379,7 @@ OLAPStatus 
StorageEngine::get_all_root_path_info(vector<RootPathInfo>* root_path
 
     // for each tablet, get it's data size, and accumulate the path 
'data_used_capacity'
     // which the tablet belongs to.
-    _tablet_map_lock.rdlock();
-    for (auto& entry : _tablet_map) {
-        TableInstances& instance = entry.second;
-        for (auto& tablet : instance.table_arr) {
-            ++tablet_counter;
-            int64_t data_size = tablet->get_data_size();
-            auto find = path_map.find(tablet->storage_root_path_name()); 
-            if (find == path_map.end()) {
-                continue;
-            }
-            if (find->second.is_used) {
-                find->second.data_used_capacity += data_size;
-            }
-        } 
-    }
-    _tablet_map_lock.unlock();
+    _tablet_mgr.update_root_path_info(&path_map, &tablet_counter);
 
     // add path info to root_paths_info
     for (auto& entry : path_map) {
@@ -476,10 +399,6 @@ OLAPStatus 
StorageEngine::get_all_root_path_info(vector<RootPathInfo>* root_path
     return res;
 }
 
-OLAPStatus StorageEngine::register_tablet_into_root_path(Tablet* tablet) {
-    return tablet->store()->register_tablet(tablet);
-}
-
 void StorageEngine::start_disk_stat_monitor() {
     for (auto& it : _store_map) {
         it.second->health_check();
@@ -565,7 +484,6 @@ std::vector<OlapStore*> 
StorageEngine::get_stores_for_create_tablet(
             }
         }
     }
-
     std::random_device rd;
     srand(rd());
     std::random_shuffle(stores.begin(), stores.end());
@@ -610,7 +528,7 @@ void StorageEngine::_delete_tables_on_unused_root_path() {
         _is_drop_tables = true;
     }
     
-    
StorageEngine::get_instance()->drop_tables_on_error_root_path(tablet_info_vec);
+    _tablet_mgr.drop_tablets_on_error_root_path(tablet_info_vec);
 }
 
 OLAPStatus StorageEngine::_get_path_available_capacity(
@@ -636,172 +554,29 @@ OLAPStatus StorageEngine::clear() {
     FileHandler::set_fd_cache(nullptr);
     SAFE_DELETE(_index_stream_lru_cache);
 
-    _tablet_map.clear();
-    _global_tablet_id = 0;
-
     return OLAP_SUCCESS;
 }
 
-TabletSharedPtr StorageEngine::_get_tablet_with_no_lock(TTabletId tablet_id, 
SchemaHash schema_hash) {
-    VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id;
-    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
-    if (it != _tablet_map.end()) {
-        for (TabletSharedPtr tablet : it->second.table_arr) {
-            if (tablet->equal(tablet_id, schema_hash)) {
-                VLOG(3) << "get tablet success. tablet_id=" << tablet_id;
-                return tablet;
-            }
-        }
-    }
-
-    VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id;
-    // Return empty tablet if fail
-    TabletSharedPtr tablet;
-    return tablet;
-}
-
 TabletSharedPtr StorageEngine::get_tablet(TTabletId tablet_id, SchemaHash 
schema_hash, bool load_tablet) {
-    _tablet_map_lock.rdlock();
-    TabletSharedPtr tablet;
-    tablet = _get_tablet_with_no_lock(tablet_id, schema_hash);
-    _tablet_map_lock.unlock();
-
-    if (tablet.get() != NULL) {
-        if (!tablet->is_used()) {
-            OLAP_LOG_WARNING("tablet cannot be used. [tablet=%ld]", tablet_id);
-            tablet.reset();
-        } else if (load_tablet && !tablet->is_loaded()) {
-            if (tablet->load() != OLAP_SUCCESS) {
-                OLAP_LOG_WARNING("fail to load tablet. [tablet=%ld]", 
tablet_id);
-                tablet.reset();
-            }
-        }
-    }
-
-    return tablet;
+    
+    return _tablet_mgr.get_tablet(tablet_id, schema_hash, load_tablet);
 }
 
 OLAPStatus StorageEngine::get_tables_by_id(
         TTabletId tablet_id,
-        list<TabletSharedPtr>* table_list) {
-    OLAPStatus res = OLAP_SUCCESS;
-    VLOG(3) << "begin to get tables by id. tablet_id=" << tablet_id;
-
-    _tablet_map_lock.rdlock();
-    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
-    if (it != _tablet_map.end()) {
-        for (TabletSharedPtr tablet : it->second.table_arr) {
-            table_list->push_back(tablet);
-        }
-    }
-    _tablet_map_lock.unlock();
-
-    if (table_list->size() == 0) {
-        OLAP_LOG_WARNING("there is no tablet with specified id. [tablet=%ld]", 
tablet_id);
-        return OLAP_ERR_TABLE_NOT_FOUND;
-    }
-
-    for (std::list<TabletSharedPtr>::iterator it = table_list->begin();
-            it != table_list->end();) {
-        if (!(*it)->is_loaded()) {
-            if ((*it)->load() != OLAP_SUCCESS) {
-                OLAP_LOG_WARNING("fail to load tablet. [tablet='%s']",
-                                 (*it)->full_name().c_str());
-                it = table_list->erase(it);
-                continue;
-            }
-        }
-        ++it;
-    }
-
-    VLOG(3) << "success to get tables by id. table_num=" << table_list->size();
-    return res;
+        list<TabletSharedPtr>* tablet_list) {
+    return _tablet_mgr.get_tablets_by_id(tablet_id, tablet_list);
 }
 
 bool StorageEngine::check_tablet_id_exist(TTabletId tablet_id) {
-    bool is_exist = false;
-    _tablet_map_lock.rdlock();
-
-    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
-    if (it != _tablet_map.end() && it->second.table_arr.size() != 0) {
-        is_exist = true;
-    }
 
-    _tablet_map_lock.unlock();
-    return is_exist;
+    return _tablet_mgr.check_tablet_id_exist(tablet_id);
 }
 
 OLAPStatus StorageEngine::add_tablet(TTabletId tablet_id, SchemaHash 
schema_hash,
                                  const TabletSharedPtr& tablet, bool force) {
-    OLAPStatus res = OLAP_SUCCESS;
-    VLOG(3) << "begin to add tablet to StorageEngine. "
-            << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
-            << ", force=" << force;
-    _tablet_map_lock.wrlock();
-
-    tablet->set_id(_global_tablet_id++);
-
-    TabletSharedPtr table_item;
-    for (TabletSharedPtr item : _tablet_map[tablet_id].table_arr) {
-        if (item->equal(tablet_id, schema_hash)) {
-            table_item = item;
-            break;
-        }
-    }
-
-    if (table_item.get() == NULL) {
-        _tablet_map[tablet_id].table_arr.push_back(tablet);
-        _tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_create_time);
-        _tablet_map_lock.unlock();
-
-        return res;
-    }
-    _tablet_map_lock.unlock();
-
-    if (!force) {
-        if (table_item->tablet_path() == tablet->tablet_path()) {
-            LOG(WARNING) << "add the same tablet twice! tablet_id="
-                << tablet_id << " schema_hash=" << tablet_id;
-            return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
-        }
-    }
-
-    table_item->obtain_header_rdlock();
-    int64_t old_time = table_item->lastest_version()->creation_time();
-    int64_t new_time = tablet->lastest_version()->creation_time();
-    int32_t old_version = table_item->lastest_version()->end_version();
-    int32_t new_version = tablet->lastest_version()->end_version();
-    table_item->release_header_lock();
-
-    /*
-     * In restore process, we replace all origin files in tablet dir with
-     * the downloaded snapshot files. Than we try to reload tablet header.
-     * force == true means we forcibly replace the Tablet in _tablet_map
-     * with the new one. But if we do so, the files in the tablet dir will be
-     * dropped when the origin Tablet deconstruct.
-     * So we set keep_files == true to not delete files when the
-     * origin Tablet deconstruct.
-     */
-    bool keep_files = force ? true : false;
-    if (force || (new_version > old_version
-            || (new_version == old_version && new_time > old_time))) {
-        drop_tablet(tablet_id, schema_hash, keep_files);
-        _tablet_map_lock.wrlock();
-        _tablet_map[tablet_id].table_arr.push_back(tablet);
-        _tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_create_time);
-        _tablet_map_lock.unlock();
-    } else {
-        tablet->mark_dropped();
-        res = OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
-    }
-    LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res
-            << ", tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
-            << ", old_version=" << old_version << ", new_version=" << 
new_version
-            << ", old_time=" << old_time << ", new_time=" << new_time
-            << ", old_tablet_path=" << table_item->tablet_path()
-            << ", new_tablet_path=" << tablet->tablet_path();
-
-    return res;
+    
+    return _tablet_mgr.add_tablet(tablet_id, schema_hash, tablet, force);
 }
 
 OLAPStatus StorageEngine::add_transaction(
@@ -1089,172 +864,14 @@ OLAPStatus 
StorageEngine::clone_full_data(TabletSharedPtr tablet, TabletMeta& cl
 //          drop specified tablet and clear schema change info.
 OLAPStatus StorageEngine::drop_tablet(
         TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
-    LOG(INFO) << "begin to process drop tablet."
-        << "tablet=" << tablet_id << ", schema_hash=" << schema_hash;
-    DorisMetrics::drop_tablet_requests_total.increment(1);
-
-    OLAPStatus res = OLAP_SUCCESS;
-
-    // Get tablet which need to be droped
-    _tablet_map_lock.rdlock();
-    TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
-    _tablet_map_lock.unlock();
-    if (dropped_tablet.get() == NULL) {
-        OLAP_LOG_WARNING("fail to drop not existed tablet. [tablet_id=%ld 
schema_hash=%d]",
-                         tablet_id, schema_hash);
-        return OLAP_ERR_TABLE_NOT_FOUND;
-    }
-
-    // Try to get schema change info
-    AlterTabletType type;
-    TTabletId related_tablet_id;
-    TSchemaHash related_schema_hash;
-    vector<Version> schema_change_versions;
-    dropped_tablet->obtain_header_rdlock();
-    bool ret = dropped_tablet->get_schema_change_request(
-            &related_tablet_id, &related_schema_hash, &schema_change_versions, 
&type);
-    dropped_tablet->release_header_lock();
-
-    // Drop tablet directly when not in schema change
-    if (!ret) {
-        return _drop_tablet_directly(tablet_id, schema_hash, keep_files);
-    }
-
-    // Check tablet is in schema change or not, is base tablet or not
-    bool is_schema_change_finished = true;
-    if (schema_change_versions.size() != 0) {
-        is_schema_change_finished = false;
-    }
-
-    bool is_drop_base_tablet = false;
-    _tablet_map_lock.rdlock();
-    TabletSharedPtr related_tablet = _get_tablet_with_no_lock(
-            related_tablet_id, related_schema_hash);
-    _tablet_map_lock.unlock();
-    if (related_tablet.get() == NULL) {
-        OLAP_LOG_WARNING("drop tablet directly when related tablet not found. "
-                         "[tablet_id=%ld schema_hash=%d]",
-                         related_tablet_id, related_schema_hash);
-        return _drop_tablet_directly(tablet_id, schema_hash, keep_files);
-    }
-
-    if (dropped_tablet->creation_time() < related_tablet->creation_time()) {
-        is_drop_base_tablet = true;
-    }
-
-    if (is_drop_base_tablet && !is_schema_change_finished) {
-        OLAP_LOG_WARNING("base tablet in schema change cannot be droped. 
[tablet=%s]",
-                         dropped_tablet->full_name().c_str());
-        return OLAP_ERR_PREVIOUS_SCHEMA_CHANGE_NOT_FINISHED;
-    }
-
-    // Drop specified tablet and clear schema change info
-    _tablet_map_lock.wrlock();
-    related_tablet->obtain_header_wrlock();
-    related_tablet->clear_schema_change_request();
-    res = related_tablet->save_tablet_meta();
-    if (res != OLAP_SUCCESS) {
-        LOG(FATAL) << "fail to save tablet header. res=" << res
-                   << ", tablet=" << related_tablet->full_name();
-    }
-
-    res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
-    related_tablet->release_header_lock();
-    _tablet_map_lock.unlock();
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to drop tablet which in schema change. 
[tablet=%s]",
-                         dropped_tablet->full_name().c_str());
-        return res;
-    }
-
-    LOG(INFO) << "finish to drop tablet. res=" << res;
-    return res;
-}
-
-OLAPStatus StorageEngine::_drop_tablet_directly(
-        TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
-    _tablet_map_lock.wrlock();
-    OLAPStatus res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, 
keep_files);
-    _tablet_map_lock.unlock();
-    return res;
-}
-
-OLAPStatus StorageEngine::_drop_tablet_directly_unlocked(
-        TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
-    if (dropped_tablet.get() == NULL) {
-        OLAP_LOG_WARNING("fail to drop not existed tablet. [tablet_id=%ld 
schema_hash=%d]",
-                         tablet_id, schema_hash);
-        return OLAP_ERR_TABLE_NOT_FOUND;
-    }
-
-    for (list<TabletSharedPtr>::iterator it = 
_tablet_map[tablet_id].table_arr.begin();
-            it != _tablet_map[tablet_id].table_arr.end();) {
-        if ((*it)->equal(tablet_id, schema_hash)) {
-            if (!keep_files) {
-                (*it)->mark_dropped();
-            }
-            it = _tablet_map[tablet_id].table_arr.erase(it);
-        } else {
-            ++it;
-        }
-    }
-
-    if (_tablet_map[tablet_id].table_arr.empty()) {
-        _tablet_map.erase(tablet_id);
-    }
-
-    res = dropped_tablet->store()->deregister_tablet(dropped_tablet.get());
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("fail to unregister from root path. [res=%d 
tablet=%ld]",
-                         res, tablet_id);
-    }
-
-    return res;
-}
-
-OLAPStatus StorageEngine::drop_tables_on_error_root_path(
-        const vector<TabletInfo>& tablet_info_vec) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    _tablet_map_lock.wrlock();
-
-    for (const TabletInfo& tablet_info : tablet_info_vec) {
-        TTabletId tablet_id = tablet_info.tablet_id;
-        TSchemaHash schema_hash = tablet_info.schema_hash;
-        VLOG(3) << "drop_tablet begin. tablet_id=" << tablet_id
-                << ", schema_hash=" << schema_hash;
-        TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
-        if (dropped_tablet.get() == NULL) {
-            OLAP_LOG_WARNING("dropping tablet not exist. [tablet=%ld 
schema_hash=%d]",
-                             tablet_id, schema_hash);
-            continue;
-        } else {
-            for (list<TabletSharedPtr>::iterator it = 
_tablet_map[tablet_id].table_arr.begin();
-                    it != _tablet_map[tablet_id].table_arr.end();) {
-                if ((*it)->equal(tablet_id, schema_hash)) {
-                    it = _tablet_map[tablet_id].table_arr.erase(it);
-                } else {
-                    ++it;
-                }
-            }
-
-            if (_tablet_map[tablet_id].table_arr.empty()) {
-                _tablet_map.erase(tablet_id);
-            }
-        }
-    }
-
-    _tablet_map_lock.unlock();
-
-    return res;
+    
+    return _tablet_mgr.drop_tablet(tablet_id, schema_hash, keep_files);
 }
 
 TabletSharedPtr StorageEngine::create_tablet(
         const TCreateTabletReq& request, const string* ref_root_path, 
         const bool is_schema_change_tablet, const TabletSharedPtr ref_tablet) {
+
     // Get all available stores, use ref_root_path if the caller specified
     std::vector<OlapStore*> stores;
     if (ref_root_path == nullptr) {
@@ -1267,322 +884,32 @@ TabletSharedPtr StorageEngine::create_tablet(
         stores.push_back(ref_tablet->store());
     }
 
-    TabletSharedPtr tablet;
-    // Try to create tablet on each of all_available_root_path, util success
-    for (auto& store : stores) {
-        TabletMeta* header = new TabletMeta();
-        OLAPStatus res = _create_new_tablet_header(request, store, 
is_schema_change_tablet, ref_tablet, header);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to create tablet header. [res=" << res << " 
root=" << store->path();
-            break;
-        }
-
-        tablet = Tablet::create_from_header(header, store);
-        if (tablet == nullptr) {
-            LOG(WARNING) << "fail to load tablet from header. root_path:%s" << 
store->path();
-            break;
-        }
-
-        // commit header finally
-        res = TabletMetaManager::save(store, request.tablet_id, 
request.tablet_schema.schema_hash, header);
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to save header. [res=" << res << " root=" << 
store->path();
-            break;
-        }
-        break;
-    }
-
-    return tablet;
-}
-
-OLAPStatus StorageEngine::create_init_version(TTabletId tablet_id, SchemaHash 
schema_hash,
-                                           Version version, VersionHash 
version_hash) {
-    VLOG(3) << "begin to create init version. "
-            << "begin=" << version.first << ", end=" << version.second;
-    TabletSharedPtr tablet;
-    ColumnDataWriter* writer = NULL;
-    SegmentGroup* new_segment_group = NULL;
-    OLAPStatus res = OLAP_SUCCESS;
-    std::vector<SegmentGroup*> index_vec;
-
-    do {
-        if (version.first > version.second) {
-            OLAP_LOG_WARNING("begin should not larger than end. [begin=%d 
end=%d]",
-                             version.first, version.second);
-            res = OLAP_ERR_INPUT_PARAMETER_ERROR;
-            break;
-        }
-
-        // Get tablet and generate new index
-        tablet = get_tablet(tablet_id, schema_hash);
-        if (tablet.get() == NULL) {
-            OLAP_LOG_WARNING("fail to find tablet. [tablet=%ld]", tablet_id);
-            res = OLAP_ERR_TABLE_NOT_FOUND;
-            break;
-        }
-
-        new_segment_group = new(nothrow) SegmentGroup(tablet.get(), version, 
version_hash, false, 0, 0);
-        if (new_segment_group == NULL) {
-            LOG(WARNING) << "fail to malloc index. [tablet=" << 
tablet->full_name() << "]";
-            res = OLAP_ERR_MALLOC_ERROR;
-            break;
-        }
-
-        // Create writer, which write nothing to tablet, to generate empty 
data file
-        writer = ColumnDataWriter::create(tablet, new_segment_group, false);
-        if (writer == NULL) {
-            LOG(WARNING) << "fail to create writer. [tablet=" << 
tablet->full_name() << "]";
-            res = OLAP_ERR_MALLOC_ERROR;
-            break;
-        }
-
-        res = writer->finalize();
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to finalize writer. [tablet=" << 
tablet->full_name() << "]";
-            break;
-        }
-
-        // Load new index and add to tablet
-        res = new_segment_group->load();
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to load new index. [tablet=" << 
tablet->full_name() << "]";
-            break;
-        }
-
-        WriteLock wrlock(tablet->get_header_lock_ptr());
-        index_vec.push_back(new_segment_group);
-        res = tablet->register_data_source(index_vec);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to register index to data sources. 
[tablet=%s]",
-                             tablet->full_name().c_str());
-            break;
-        }
-
-        res = tablet->save_tablet_meta();
-        if (res != OLAP_SUCCESS) {
-            LOG(WARNING) << "fail to save header. [tablet=" << 
tablet->full_name() << "]";
-            break;
-        }
-    } while (0);
-
-    // Unregister index and delete files(index and data) if failed
-    if (res != OLAP_SUCCESS && tablet.get() != NULL) {
-        std::vector<SegmentGroup*> unused_index;
-        tablet->obtain_header_wrlock();
-        tablet->unregister_data_source(version, &unused_index);
-        tablet->release_header_lock();
-
-        for (SegmentGroup* index : index_vec) {
-            index->delete_all_files();
-            SAFE_DELETE(index);
-        }
-    }
-
-    VLOG(3) << "create init version end. res=" << res;
-    SAFE_DELETE(writer);
-    return res;
+    return _tablet_mgr.create_tablet(request, ref_root_path, 
+        is_schema_change_tablet, ref_tablet, stores);
 }
 
 bool StorageEngine::try_schema_change_lock(TTabletId tablet_id) {
-    bool res = false;
-    VLOG(3) << "try_schema_change_lock begin. table_id=" << tablet_id;
-    _tablet_map_lock.rdlock();
 
-    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
-    if (it == _tablet_map.end()) {
-        OLAP_LOG_WARNING("tablet does not exists. [tablet=%ld]", tablet_id);
-    } else {
-        res = (it->second.schema_change_lock.trylock() == OLAP_SUCCESS);
-    }
-
-    _tablet_map_lock.unlock();
-    VLOG(3) << "try_schema_change_lock end. table_id=" <<  tablet_id;
-    return res;
+    return _tablet_mgr.try_schema_change_lock(tablet_id);
 }
 
 void StorageEngine::release_schema_change_lock(TTabletId tablet_id) {
-    VLOG(3) << "release_schema_change_lock begin. tablet_id=" << tablet_id;
-    _tablet_map_lock.rdlock();
-
-    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
-    if (it == _tablet_map.end()) {
-        OLAP_LOG_WARNING("tablet does not exists. [tablet=%ld]", tablet_id);
-    } else {
-        it->second.schema_change_lock.unlock();
-    }
-
-    _tablet_map_lock.unlock();
-    VLOG(3) << "release_schema_change_lock end. tablet_id=" << tablet_id;
-}
-
-void StorageEngine::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* 
tablet_info) {
-    tablet_info->tablet_id = tablet->tablet_id();
-    tablet_info->schema_hash = tablet->schema_hash();
-
-    tablet->obtain_header_rdlock();
-    tablet_info->row_count = tablet->get_num_rows();
-    tablet_info->data_size = tablet->get_data_size();
-    const PDelta* last_file_version = tablet->lastest_version();
-    if (last_file_version == NULL) {
-        tablet_info->version = -1;
-        tablet_info->version_hash = 0;
-    } else {
-        // report the version before first missing version
-        vector<Version> missing_versions;
-        tablet->get_missing_versions_with_header_locked(
-                last_file_version->end_version(), &missing_versions);
-        const PDelta* least_complete_version =
-            tablet->least_complete_version(missing_versions);
-        if (least_complete_version == NULL) {
-            tablet_info->version = -1;
-            tablet_info->version_hash = 0;
-        } else {
-            tablet_info->version = least_complete_version->end_version();
-            tablet_info->version_hash = least_complete_version->version_hash();
-        }
-    }
-    tablet->release_header_lock();
+    
+    _tablet_mgr.release_schema_change_lock(tablet_id);
 }
 
 OLAPStatus StorageEngine::report_tablet_info(TTabletInfo* tablet_info) {
-    DorisMetrics::report_tablet_requests_total.increment(1);
-    LOG(INFO) << "begin to process report tablet info."
-              << "tablet_id=" << tablet_info->tablet_id
-              << ", schema_hash=" << tablet_info->schema_hash;
-
-    OLAPStatus res = OLAP_SUCCESS;
-
-    TabletSharedPtr tablet = get_tablet(
-            tablet_info->tablet_id, tablet_info->schema_hash);
-    if (tablet.get() == NULL) {
-        OLAP_LOG_WARNING("can't find tablet. [tablet=%ld schema_hash=%d]",
-                         tablet_info->tablet_id, tablet_info->schema_hash);
-        return OLAP_ERR_TABLE_NOT_FOUND;
-    }
-
-    _build_tablet_info(tablet, tablet_info);
-    LOG(INFO) << "success to process report tablet info.";
-    return res;
+    
+    return _tablet_mgr.report_tablet_info(tablet_info);
 }
 
 OLAPStatus StorageEngine::report_all_tablets_info(std::map<TTabletId, 
TTablet>* tablets_info) {
-    LOG(INFO) << "begin to process report all tablets info.";
-    DorisMetrics::report_all_tablets_requests_total.increment(1);
-
-    if (tablets_info == NULL) {
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    _tablet_map_lock.rdlock();
-    for (const auto& item : _tablet_map) {
-        if (item.second.table_arr.size() == 0) {
-            continue;
-        }
-
-        TTablet tablet;
-        for (TabletSharedPtr tablet_ptr : item.second.table_arr) {
-            if (tablet_ptr == NULL) {
-                continue;
-            }
-
-            TTabletInfo tablet_info;
-            _build_tablet_info(tablet_ptr, &tablet_info);
-
-            // report expire transaction
-            vector<int64_t> transaction_ids;
-            tablet_ptr->get_expire_pending_data(&transaction_ids);
-            tablet_info.__set_transaction_ids(transaction_ids);
-
-            if (_available_storage_medium_type_count > 1) {
-                
tablet_info.__set_storage_medium(tablet_ptr->store()->storage_medium());
-            }
-
-            tablet_info.__set_version_count(tablet_ptr->file_delta_size());
-            tablet_info.__set_path_hash(tablet_ptr->store()->path_hash());
-
-            tablet.tablet_infos.push_back(tablet_info);
-        }
-
-        if (tablet.tablet_infos.size() != 0) {
-            tablets_info->insert(pair<TTabletId, 
TTablet>(tablet.tablet_infos[0].tablet_id, tablet));
-        }
-    }
-    _tablet_map_lock.unlock();
-
-    LOG(INFO) << "success to process report all tablets info. tablet_num=" << 
tablets_info->size();
-    return OLAP_SUCCESS;
-}
-
-void StorageEngine::get_tablet_stat(TTabletStatResult& result) {
-    VLOG(3) << "begin to get all tablet stat.";
-
-    // get current time
-    int64_t current_time = UnixMillis();
     
-    _tablet_map_lock.wrlock();
-    // update cache if too old
-    if (current_time - _tablet_stat_cache_update_time_ms > 
-        config::tablet_stat_cache_update_interval_second * 1000) {
-        VLOG(3) << "update tablet stat.";
-        _build_tablet_stat();
-    }
-
-    result.__set_tablets_stats(_tablet_stat_cache);
-
-    _tablet_map_lock.unlock();
+    return _tablet_mgr.report_all_tablets_info(tablets_info);
 }
 
-void StorageEngine::_build_tablet_stat() {
-    _tablet_stat_cache.clear();
-    for (const auto& item : _tablet_map) {
-        if (item.second.table_arr.size() == 0) {
-            continue;
-        }
-
-        TTabletStat stat;
-        stat.tablet_id = item.first;
-        for (TabletSharedPtr tablet : item.second.table_arr) {
-            if (tablet.get() == NULL) {
-                continue;
-            }
-                
-            // we only get base tablet's stat
-            stat.__set_data_size(tablet->get_data_size());
-            stat.__set_row_num(tablet->get_num_rows());
-            VLOG(3) << "tablet_id=" << item.first 
-                    << ", data_size=" << tablet->get_data_size()
-                    << ", row_num:" << tablet->get_num_rows();
-            break;
-        }
-
-        _tablet_stat_cache.emplace(item.first, stat);
-    }
-
-    _tablet_stat_cache_update_time_ms = UnixMillis();
-}
-
-bool StorageEngine::_can_do_compaction(TabletSharedPtr tablet) {
-    // 如果table正在做schema change,则通过选路判断数据是否转换完成
-    // 如果选路成功,则转换完成,可以进行BE
-    // 如果选路失败,则转换未完成,不能进行BE
-    tablet->obtain_header_rdlock();
-    const PDelta* lastest_version = tablet->lastest_version();
-    if (lastest_version == NULL) {
-        tablet->release_header_lock();
-        return false;
-    }
-
-    if (tablet->is_schema_changing()) {
-        Version test_version = Version(0, lastest_version->end_version());
-        vector<Version> path_versions;
-        if (OLAP_SUCCESS != tablet->select_versions_to_span(test_version, 
&path_versions)) {
-            tablet->release_header_lock();
-            return false;
-        }
-    }
-    tablet->release_header_lock();
-
-    return true;
+void StorageEngine::get_tablet_stat(TTabletStatResult& result) {
+    _tablet_mgr.get_tablet_stat(result);
 }
 
 void StorageEngine::start_clean_fd_cache() {
@@ -1592,7 +919,7 @@ void StorageEngine::start_clean_fd_cache() {
 }
 
 void StorageEngine::perform_cumulative_compaction() {
-    TabletSharedPtr best_tablet = 
_find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION);
+    TabletSharedPtr best_tablet = 
_tablet_mgr.find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION);
     if (best_tablet == nullptr) { return; }
 
     CumulativeCompaction cumulative_compaction;
@@ -1610,7 +937,7 @@ void StorageEngine::perform_cumulative_compaction() {
 }
 
 void StorageEngine::perform_base_compaction() {
-    TabletSharedPtr best_tablet = 
_find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION);
+    TabletSharedPtr best_tablet = 
_tablet_mgr.find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION);
     if (best_tablet == nullptr) { return; }
 
     BaseCompaction base_compaction;
@@ -1628,32 +955,6 @@ void StorageEngine::perform_base_compaction() {
     }
 }
 
-TabletSharedPtr StorageEngine::_find_best_tablet_to_compaction(CompactionType 
compaction_type) {
-    ReadLock tablet_map_rdlock(&_tablet_map_lock);
-    uint32_t highest_score = 0;
-    TabletSharedPtr best_tablet;
-    for (tablet_map_t::value_type& table_ins : _tablet_map){
-        for (TabletSharedPtr& table_ptr : table_ins.second.table_arr) {
-            if (!table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) {
-                continue;
-            }
-
-            ReadLock rdlock(table_ptr->get_header_lock_ptr());
-            uint32_t table_score = 0;
-            if (compaction_type == CompactionType::BASE_COMPACTION) {
-                table_score = table_ptr->get_base_compaction_score();
-            } else if (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION) {
-                table_score = table_ptr->get_cumulative_compaction_score();
-            }
-            if (table_score > highest_score) {
-                highest_score = table_score;
-                best_tablet = table_ptr;
-            }
-        }
-    }
-    return best_tablet;
-}
-
 void StorageEngine::get_cache_status(rapidjson::Document* document) const {
     return _index_stream_lru_cache->get_cache_status(document);
 }
@@ -1709,16 +1010,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* 
usage) {
     }
 
     // clear expire incremental segment_group
-    _tablet_map_lock.rdlock();
-    for (const auto& item : _tablet_map) {
-        for (TabletSharedPtr tablet : item.second.table_arr) {
-            if (tablet.get() == NULL) {
-                continue;
-            }
-            tablet->delete_expire_incremental_data();
-        }
-    }
-    _tablet_map_lock.unlock();
+    _tablet_mgr.start_trash_sweep();
 
     return res;
 }
@@ -1763,226 +1055,6 @@ OLAPStatus StorageEngine::_do_sweep(
     return res;
 }
 
-OLAPStatus StorageEngine::_create_new_tablet_header(
-        const TCreateTabletReq& request,
-        OlapStore* store,
-        const bool is_schema_change_tablet,
-        const TabletSharedPtr ref_tablet,
-        TabletMeta* header) {
-    uint64_t shard = 0;
-    OLAPStatus res = store->get_shard(&shard);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to get root path shard. res=" << res;
-        return res;
-    }
-    stringstream schema_hash_dir_stream;
-    schema_hash_dir_stream << store->path()
-                      << DATA_PREFIX
-                      << "/" << shard
-                      << "/" << request.tablet_id
-                      << "/" << request.tablet_schema.schema_hash;
-    string schema_hash_dir = schema_hash_dir_stream.str();
-    if (check_dir_existed(schema_hash_dir)) {
-        LOG(WARNING) << "failed to create the dir that existed. path=" << 
schema_hash_dir;
-        return OLAP_ERR_CANNOT_CREATE_DIR;
-    }
-    res = create_dirs(schema_hash_dir);
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "create dir fail. [res=" << res << " path:" << 
schema_hash_dir;
-        return res;
-    }
-    
-    // set basic information
-    
header->set_num_short_key_fields(request.tablet_schema.short_key_column_count);
-    header->set_compress_kind(COMPRESS_LZ4);
-    if (request.tablet_schema.keys_type == TKeysType::DUP_KEYS) {
-        header->set_keys_type(KeysType::DUP_KEYS);
-    } else if (request.tablet_schema.keys_type == TKeysType::UNIQUE_KEYS) {
-        header->set_keys_type(KeysType::UNIQUE_KEYS);
-    } else {
-        header->set_keys_type(KeysType::AGG_KEYS);
-    }
-    DCHECK(request.tablet_schema.storage_type == TStorageType::COLUMN);
-    header->set_data_file_type(COLUMN_ORIENTED_FILE);
-    header->set_segment_size(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE);
-    
header->set_num_rows_per_data_block(config::default_num_rows_per_column_file_block);
-
-    // set column information
-    uint32_t i = 0;
-    uint32_t key_count = 0;
-    bool has_bf_columns = false;
-    uint32_t next_unique_id = 0;
-    if (true == is_schema_change_tablet) {
-        next_unique_id = ref_tablet->next_unique_id();
-    }
-    for (TColumn column : request.tablet_schema.columns) {
-        if (column.column_type.type == TPrimitiveType::VARCHAR
-                && i < request.tablet_schema.short_key_column_count - 1) {
-            LOG(WARNING) << "varchar type column should be the last short 
key.";
-            return OLAP_ERR_SCHEMA_SCHEMA_INVALID;
-        }
-        header->add_column();
-        if (true == is_schema_change_tablet) {
-            /*
-             * for schema change, compare old_tablet and new_tablet
-             * 1. if column in both new_tablet and old_tablet,
-             * assign unique_id of old_tablet to the column of new_tablet
-             * 2. if column exists only in new_tablet, assign next_unique_id 
of old_tablet
-             * to the new column
-             *
-            */
-            size_t field_num = ref_tablet->tablet_schema().size();
-            size_t field_off = 0;
-            for (field_off = 0; field_off < field_num; ++field_off) {
-                if (ref_tablet->tablet_schema()[field_off].name == 
column.column_name) {
-                    uint32_t unique_id = 
ref_tablet->tablet_schema()[field_off].unique_id;
-                    header->mutable_column(i)->set_unique_id(unique_id);
-                    break;
-                }
-            }
-            if (field_off == field_num) {
-                header->mutable_column(i)->set_unique_id(next_unique_id++);
-            }
-        } else {
-            header->mutable_column(i)->set_unique_id(i);
-        }
-        header->mutable_column(i)->set_name(column.column_name);
-        header->mutable_column(i)->set_is_root_column(true);
-        string data_type;
-        EnumToString(TPrimitiveType, column.column_type.type, data_type);
-        header->mutable_column(i)->set_type(data_type);
-        if (column.column_type.type == TPrimitiveType::DECIMAL) {
-            if (column.column_type.__isset.precision && 
column.column_type.__isset.scale) {
-                
header->mutable_column(i)->set_precision(column.column_type.precision);
-                header->mutable_column(i)->set_frac(column.column_type.scale);
-            } else {
-                LOG(WARNING) << "decimal type column should set precision and 
frac.";
-                return OLAP_ERR_SCHEMA_SCHEMA_INVALID;
-            }
-        }
-        if (column.column_type.type == TPrimitiveType::CHAR
-                || column.column_type.type == TPrimitiveType::VARCHAR || 
column.column_type.type == TPrimitiveType::HLL) {
-            if (!column.column_type.__isset.len) {
-                LOG(WARNING) << "CHAR or VARCHAR should specify length. type=" 
<< column.column_type.type;
-                return OLAP_ERR_INPUT_PARAMETER_ERROR;
-            }
-        }
-        uint32_t length = FieldInfo::get_field_length_by_type(
-                column.column_type.type, column.column_type.len);
-        header->mutable_column(i)->set_length(length);
-        header->mutable_column(i)->set_index_length(length);
-        if (column.column_type.type == TPrimitiveType::VARCHAR || 
column.column_type.type == TPrimitiveType::HLL) {
-            if (!column.column_type.__isset.index_len) {
-                header->mutable_column(i)->set_index_length(10);
-            } else {
-                
header->mutable_column(i)->set_index_length(column.column_type.index_len);
-            }
-        }
-        if (!column.is_key) {
-            header->mutable_column(i)->set_is_key(false);
-            string aggregation_type;
-            EnumToString(TAggregationType, column.aggregation_type, 
aggregation_type);
-            header->mutable_column(i)->set_aggregation(aggregation_type);
-        } else {
-            ++key_count;
-            header->add_selectivity(1);
-            header->mutable_column(i)->set_is_key(true);
-            header->mutable_column(i)->set_aggregation("NONE");
-        }
-        if (column.__isset.default_value) {
-            header->mutable_column(i)->set_default_value(column.default_value);
-        }
-        if (column.__isset.is_allow_null) {
-            header->mutable_column(i)->set_is_allow_null(column.is_allow_null);
-        } else {
-            header->mutable_column(i)->set_is_allow_null(false);
-        }
-        if (column.__isset.is_bloom_filter_column) {
-            
header->mutable_column(i)->set_is_bf_column(column.is_bloom_filter_column);
-            has_bf_columns = true;
-        }
-        ++i;
-    }
-    if (true == is_schema_change_tablet){
-        /*
-         * for schema change, next_unique_id of new tablet should be greater 
than
-         * next_unique_id of old tablet
-         * */
-        header->set_next_column_unique_id(next_unique_id);
-    } else {
-        header->set_next_column_unique_id(i);
-    }
-    if (has_bf_columns && request.tablet_schema.__isset.bloom_filter_fpp) {
-        header->set_bf_fpp(request.tablet_schema.bloom_filter_fpp);
-    }
-    if (key_count < request.tablet_schema.short_key_column_count) {
-        LOG(WARNING) << "short key num should not large than key num. "
-                << "key_num=" << key_count << " short_key_num=" << 
request.tablet_schema.short_key_column_count;
-        return OLAP_ERR_INPUT_PARAMETER_ERROR;
-    }
-
-    header->set_creation_time(time(NULL));
-    header->set_cumulative_layer_point(-1);
-    header->set_tablet_id(request.tablet_id);
-    header->set_schema_hash(request.tablet_schema.schema_hash);
-    header->set_shard(shard);
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus StorageEngine::_check_existed_or_else_create_dir(const string& 
path) {
-    if (check_dir_existed(path)) {
-        LOG(WARNING) << "failed to create the dir that existed. [path='" << 
path << "']";
-        return OLAP_ERR_CANNOT_CREATE_DIR;
-    }
-
-    return create_dirs(path);
-}
-
-void StorageEngine::_cancel_unfinished_schema_change() {
-    // Schema Change在引擎退出时schemachange信息还保存在在Header里,
-    // 引擎重启后,需清除schemachange信息,上层会重做
-    uint64_t canceled_num = 0;
-    LOG(INFO) << "begin to cancel unfinished schema change.";
-
-    SchemaChangeHandler schema_change_handler;
-    TTabletId tablet_id;
-    TSchemaHash schema_hash;
-    vector<Version> schema_change_versions;
-    AlterTabletType type;
-
-    for (const auto& tablet_instance : _tablet_map) {
-        for (TabletSharedPtr tablet : tablet_instance.second.table_arr) {
-            if (tablet.get() == NULL) {
-                OLAP_LOG_WARNING("get empty TabletSharedPtr. [tablet_id=%ld]", 
tablet_instance.first);
-                continue;
-            }
-
-            bool ret = tablet->get_schema_change_request(
-                    &tablet_id, &schema_hash, &schema_change_versions, &type);
-            if (!ret) {
-                continue;
-            }
-
-            TabletSharedPtr new_tablet = get_tablet(tablet_id, schema_hash, 
false);
-            if (new_tablet.get() == NULL) {
-                OLAP_LOG_WARNING("the tablet referenced by schema change 
cannot be found. "
-                                 "schema change cancelled. [tablet='%s']",
-                                 tablet->full_name().c_str());
-                continue;
-            }
-
-            // DORIS-3741. Upon restart, it should not clear schema change 
request.
-            new_tablet->set_schema_change_status(
-                    ALTER_TABLE_FAILED, new_tablet->schema_hash(), -1);
-            tablet->set_schema_change_status(
-                    ALTER_TABLE_FAILED, tablet->schema_hash(), -1);
-            VLOG(3) << "cancel unfinished schema change. tablet=" << 
tablet->full_name();
-            ++canceled_num;
-        }
-    }
-
-    LOG(INFO) << "finish to cancel unfinished schema change! canceled_num=" << 
canceled_num;
-}
 
 void StorageEngine::start_delete_unused_index() {
     _gc_mutex.lock();
@@ -2021,138 +1093,17 @@ void StorageEngine::add_unused_index(SegmentGroup* 
segment_group) {
     _gc_mutex.unlock();
 }
 
-OLAPStatus StorageEngine::_create_init_version(
-        TabletSharedPtr tablet, const TCreateTabletReq& request) {
-    OLAPStatus res = OLAP_SUCCESS;
-
-    if (request.version < 1) {
-        OLAP_LOG_WARNING("init version of tablet should at least 1.");
-        return OLAP_ERR_CE_CMD_PARAMS_ERROR;
-    } else {
-        Version init_base_version(0, request.version);
-        res = create_init_version(
-                request.tablet_id, request.tablet_schema.schema_hash,
-                init_base_version, request.version_hash);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to create init base version. [res=%d 
version=%ld]",
-                    res, request.version);
-            return res;
-        }
-
-        Version init_delta_version(request.version + 1, request.version + 1);
-        res = create_init_version(
-                request.tablet_id, request.tablet_schema.schema_hash,
-                init_delta_version, 0);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to create init delta version. [res=%d 
version=%ld]",
-                    res, request.version + 1);
-            return res;
-        }
-    }
-
-    tablet->obtain_header_wrlock();
-    tablet->set_cumulative_layer_point(request.version + 1);
-    res = tablet->save_tablet_meta();
-    tablet->release_header_lock();
-    if (res != OLAP_SUCCESS) {
-        LOG(WARNING) << "fail to save header. [tablet=" << tablet->full_name() 
<< "]";
-    }
-
-    return res;
-}
-
 // TODO(zc): refactor this funciton
 OLAPStatus StorageEngine::create_tablet(const TCreateTabletReq& request) {
-    OLAPStatus res = OLAP_SUCCESS;
-    bool is_tablet_added = false;
-
-    LOG(INFO) << "begin to process create tablet. tablet=" << request.tablet_id
-              << ", schema_hash=" << request.tablet_schema.schema_hash;
-
-    DorisMetrics::create_tablet_requests_total.increment(1);
-
-    // 1. Make sure create_tablet operation is idempotent:
-    //    return success if tablet with same tablet_id and schema_hash exist,
-    //           false if tablet with same tablet_id but different schema_hash 
exist
-    if (check_tablet_id_exist(request.tablet_id)) {
-        TabletSharedPtr tablet = get_tablet(
-                request.tablet_id, request.tablet_schema.schema_hash);
-        if (tablet.get() != NULL) {
-            LOG(INFO) << "create tablet success for tablet already exist.";
-            return OLAP_SUCCESS;
-        } else {
-            OLAP_LOG_WARNING("tablet with different schema hash already 
exists.");
-            return OLAP_ERR_CE_TABLET_ID_EXIST;
-        }
-    }
-
-    // 2. Lock to ensure that all create_tablet operation execute in serial
-    static Mutex create_tablet_lock;
-    MutexLock auto_lock(&create_tablet_lock);
-
-    TabletSharedPtr tablet;
-    do {
-        // 3. Create tablet with only header, no deltas
-        tablet = create_tablet(request, NULL, false, NULL);
-        if (tablet == NULL) {
-            res = OLAP_ERR_CE_CMD_PARAMS_ERROR;
-            OLAP_LOG_WARNING("fail to create tablet. [res=%d]", res);
-            break;
-        }
-
-        // 4. Add tablet to StorageEngine will make it visiable to user
-        res = add_tablet(
-                request.tablet_id, request.tablet_schema.schema_hash, tablet);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", 
res);
-            break;
-        }
-        is_tablet_added = true;
-
-        TabletSharedPtr tablet_ptr = get_tablet(
-                request.tablet_id, request.tablet_schema.schema_hash);
-        if (tablet_ptr.get() == NULL) {
-            res = OLAP_ERR_TABLE_NOT_FOUND;
-            OLAP_LOG_WARNING("fail to get tablet. [res=%d]", res);
-            break;
-        }
-
-        // 5. Register tablet into StorageEngine, so that we can manage tablet 
from
-        // the perspective of root path.
-        // Example: unregister all tables when a bad disk found.
-        res = register_tablet_into_root_path(tablet_ptr.get());
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to register tablet into StorageEngine. 
[res=%d, root_path=%s]",
-                    res, tablet_ptr->storage_root_path_name().c_str());
-            break;
-        }
-
-        // 6. Create init version if this is not a restore mode replica and 
request.version is set
-        // bool in_restore_mode = request.__isset.in_restore_mode && 
request.in_restore_mode;
-        // if (!in_restore_mode && request.__isset.version) {
-        res = _create_init_version(tablet_ptr, request);
-        if (res != OLAP_SUCCESS) {
-            OLAP_LOG_WARNING("fail to create initial version for tablet. 
[res=%d]", res);
-        }
-        // }
-    } while (0);
-
-    // 7. clear environment
-    if (res != OLAP_SUCCESS) {
-        DorisMetrics::create_tablet_requests_failed.increment(1);
-        if (is_tablet_added) {
-            OLAPStatus status = drop_tablet(
-                    request.tablet_id, request.tablet_schema.schema_hash);
-            if (status !=  OLAP_SUCCESS) {
-                OLAP_LOG_WARNING("fail to drop tablet when create tablet 
failed. [res=%d]", res);
-            }
-        } else if (NULL != tablet) {
-            tablet->delete_all_files();
-        }
+    
+    // Get all available stores, use ref_root_path if the caller specified
+    std::vector<OlapStore*> stores;
+    stores = get_stores_for_create_tablet(request.storage_medium);
+    if (stores.empty()) {
+        LOG(WARNING) << "there is no available disk that can be used to create 
tablet.";
+        return OLAP_ERR_CE_CMD_PARAMS_ERROR;
     }
-
-    LOG(INFO) << "finish to process create tablet. res=" << res;
-    return res;
+    return _tablet_mgr.create_tablet(request, stores);
 }
 
 OLAPStatus StorageEngine::schema_change(const TAlterTabletReq& request) {
@@ -2181,50 +1132,15 @@ OLAPStatus StorageEngine::schema_change(const 
TAlterTabletReq& request) {
 }
 
 OLAPStatus StorageEngine::create_rollup_tablet(const TAlterTabletReq& request) 
{
-    LOG(INFO) << "begin to create rollup tablet. "
-              << "old_tablet_id=" << request.base_tablet_id
-              << ", new_tablet_id=" << request.new_tablet_req.tablet_id;
-
-    DorisMetrics::create_rollup_requests_total.increment(1);
-
-    OLAPStatus res = OLAP_SUCCESS;
-
-    SchemaChangeHandler handler;
-    res = handler.process_alter_tablet(ALTER_TABLET_CREATE_ROLLUP_TABLE, 
request);
-
-    if (res != OLAP_SUCCESS) {
-        OLAP_LOG_WARNING("failed to do rollup. "
-                         "[base_tablet=%ld new_tablet=%ld] [res=%d]",
-                         request.base_tablet_id, 
request.new_tablet_req.tablet_id, res);
-        DorisMetrics::create_rollup_requests_failed.increment(1);
-        return res;
-    }
-
-    LOG(INFO) << "success to create rollup tablet. res=" << res
-              << ", old_tablet_id=" << request.base_tablet_id 
-              << ", new_tablet_id=" << request.new_tablet_req.tablet_id;
-    return res;
+    
+    return _tablet_mgr.create_rollup_tablet(request);
 }
 
 AlterTableStatus StorageEngine::show_alter_tablet_status(
         TTabletId tablet_id,
         TSchemaHash schema_hash) {
-    LOG(INFO) << "begin to process show alter tablet status."
-              << "tablet_id" << tablet_id
-              << ", schema_hash" << schema_hash;
-
-    AlterTableStatus status = ALTER_TABLE_FINISHED;
 
-    TabletSharedPtr tablet = 
StorageEngine::get_instance()->get_tablet(tablet_id, schema_hash);
-    if (tablet.get() == NULL) {
-        OLAP_LOG_WARNING("fail to get tablet. [tablet=%ld schema_hash=%d]",
-                         tablet_id, schema_hash);
-        status = ALTER_TABLE_FAILED;
-    } else {
-        status = tablet->schema_change_status().status;
-    }
-
-    return status;
+    return _tablet_mgr.show_alter_tablet_status(tablet_id, schema_hash);
 }
 
 OLAPStatus StorageEngine::compute_checksum(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index d87db2a2..199c86c6 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -43,6 +43,7 @@
 #include "olap/tablet.h"
 #include "olap/olap_meta.h"
 #include "olap/options.h"
+#include "olap/tablet_manager.h"
 #include "olap/txn_manager.h"
 
 namespace doris {
@@ -50,21 +51,6 @@ namespace doris {
 class Tablet;
 class OlapStore;
 
-struct RootPathInfo {
-    RootPathInfo():
-            capacity(1),
-            available(0),
-            data_used_capacity(0),
-            is_used(false) { }
-
-    std::string path;
-    int64_t path_hash;
-    int64_t capacity;                  // 总空间,单位字节
-    int64_t available;                 // 可用空间,单位字节
-    int64_t data_used_capacity;
-    bool is_used;                       // 是否可用标识
-    TStorageMedium::type storage_medium;  // 存储介质类型:SSD|HDD
-};
 
 // StorageEngine singleton to manage all Table pointers.
 // Providing add/drop/get operations.
@@ -137,13 +123,6 @@ class StorageEngine {
 
     OLAPStatus clone_full_data(TabletSharedPtr tablet, TabletMeta& 
clone_header);
 
-    // Add empty data for Tablet
-    //
-    // Return OLAP_SUCCESS, if run ok
-    OLAPStatus create_init_version(
-            TTabletId tablet_id, SchemaHash schema_hash,
-            Version version, VersionHash version_hash);
-
     // Drop a tablet by description
     // If set keep_files == true, files will NOT be deleted when 
deconstruction.
     // Return OLAP_SUCCESS, if run ok
@@ -152,12 +131,6 @@ class StorageEngine {
     OLAPStatus drop_tablet(
             TTabletId tablet_id, SchemaHash schema_hash, bool keep_files = 
false);
 
-    // Drop tablet directly with check schema change info.
-    OLAPStatus _drop_tablet_directly(TTabletId tablet_id, TSchemaHash 
schema_hash, bool keep_files = false);
-    OLAPStatus _drop_tablet_directly_unlocked(TTabletId tablet_id, TSchemaHash 
schema_hash, bool keep_files = false);
-
-    OLAPStatus drop_tables_on_error_root_path(const std::vector<TabletInfo>& 
tablet_info_vec);
-
     // Prevent schema change executed concurrently.
     bool try_schema_change_lock(TTabletId tablet_id);
     void release_schema_change_lock(TTabletId tablet_id);
@@ -214,8 +187,6 @@ class StorageEngine {
 
     void get_all_available_root_path(std::vector<std::string>* 
available_paths);
 
-    OLAPStatus register_tablet_into_root_path(Tablet* tablet);
-
     // 磁盘状态监测。监测unused_flag路劲新的对应root_path unused标识位,
     // 当检测到有unused标识时,从内存中删除对应表信息,磁盘数据不动。
     // 当磁盘状态为不可用,但未检测到unused标识时,需要从root_path上
@@ -448,18 +419,7 @@ class StorageEngine {
 
     OLAPStatus _start_bg_worker();
 
-    OLAPStatus _create_init_version(TabletSharedPtr tablet, const 
TCreateTabletReq& request);
-
 private:
-    struct TableInstances {
-        Mutex schema_change_lock;
-        std::list<TabletSharedPtr> table_arr;
-    };
-
-    enum CompactionType {
-        BASE_COMPACTION = 1,
-        CUMULATIVE_COMPACTION = 2
-    };
 
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t 
tablet_id_, uint32_t index_) :
@@ -489,38 +449,16 @@ class StorageEngine {
         bool is_used;
     };
 
-    typedef std::map<int64_t, TableInstances> tablet_map_t;
     typedef std::map<std::string, uint32_t> file_system_task_count_t;
 
-    TabletSharedPtr _get_tablet_with_no_lock(TTabletId tablet_id, SchemaHash 
schema_hash);
-
-    // 遍历root所指定目录, 通过dirs返回此目录下所有有文件夹的名字, files返回所有文件的名字
-    OLAPStatus _dir_walk(const std::string& root,
-                     std::set<std::string>* dirs,
-                     std::set<std::string>* files);
-
     // 扫描目录, 加载表
     OLAPStatus _load_store(OlapStore* store);
 
-    OLAPStatus _create_new_tablet_header(const TCreateTabletReq& request,
-                                             OlapStore* store,
-                                             const bool 
is_schema_change_tablet,
-                                             const TabletSharedPtr ref_tablet,
-                                             TabletMeta* header);
-
-    OLAPStatus _check_existed_or_else_create_dir(const std::string& path);
-
     TabletSharedPtr _find_best_tablet_to_compaction(CompactionType 
compaction_type);
-    bool _can_do_compaction(TabletSharedPtr tablet);
-
-    void _cancel_unfinished_schema_change();
 
     OLAPStatus _do_sweep(
             const std::string& scan_root, const time_t& local_tm_now, const 
uint32_t expire);
 
-    void _build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tablet_info);
-    void _build_tablet_stat();
-
     EngineOptions _options;
     std::mutex _store_lock;
     std::map<std::string, OlapStore*> _store_map;
@@ -532,10 +470,6 @@ class StorageEngine {
 
     // 错误磁盘所在百分比,超过设定的值,则engine需要退出运行
     uint32_t _min_percentage_of_error_disk;
-
-    RWMutex _tablet_map_lock;
-    tablet_map_t _tablet_map;
-    size_t _global_tablet_id;
     Cache* _file_descriptor_lru_cache;
     Cache* _index_stream_lru_cache;
     uint32_t _max_base_compaction_task_per_disk;
@@ -545,12 +479,6 @@ class StorageEngine {
     file_system_task_count_t _fs_base_compaction_task_num_map;
     std::vector<CompactionCandidate> _cumulative_compaction_candidate;
 
-    // cache to save tablets' statistics, such as data size and row
-    // TODO(cmy): for now, this is a naive implementation
-    std::map<int64_t, TTabletStat> _tablet_stat_cache;
-    // last update time of tablet stat cache
-    int64_t _tablet_stat_cache_update_time_ms;
-
     static StorageEngine* _s_instance;
 
     // snapshot
@@ -605,6 +533,7 @@ class StorageEngine {
     std::atomic_bool _is_report_disk_state_already;
     std::atomic_bool _is_report_tablet_already;
     TxnManager _txn_mgr;
+    TabletManager _tablet_mgr;
 };
 
 }  // namespace doris
diff --git a/be/src/olap/store.cpp b/be/src/olap/store.cpp
index 97953cd9..501cd3f3 100755
--- a/be/src/olap/store.cpp
+++ b/be/src/olap/store.cpp
@@ -71,7 +71,7 @@ OlapStore::~OlapStore() {
     }
 }
 
-Status OlapStore::load() {
+Status OlapStore::init() {
     _rand_seed = static_cast<uint32_t>(time(NULL));
     if (posix_memalign((void**)&_test_file_write_buf,
                        DIRECT_IO_ALIGNMENT,
@@ -437,18 +437,19 @@ OLAPStatus OlapStore::deregister_tablet(Tablet* tablet) {
     return OLAP_SUCCESS;
 }
 
-std::string OlapStore::get_shard_path_from_header(const std::string& 
shard_string) {
+std::string OlapStore::get_absolute_shard_path(const std::string& 
shard_string) {
     return _path + DATA_PREFIX + "/" + shard_string;
 }
 
-std::string OlapStore::get_tablet_schema_hash_path_from_header(TabletMeta* 
header) {
-    return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
+std::string OlapStore::get_absolute_tablet_path(TabletMeta* header, bool 
with_schema_hash) {
+    if (with_schema_hash) {
+        return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
             + "/" + std::to_string(header->tablet_id()) + "/" + 
std::to_string(header->schema_hash());
-}
 
-std::string OlapStore::get_tablet_path_from_header(TabletMeta* header) {
-    return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
+    } else {
+        return _path + DATA_PREFIX + "/" + std::to_string(header->shard())
             + "/" + std::to_string(header->tablet_id());
+    }
 }
 
 void OlapStore::find_tablet_in_trash(int64_t tablet_id, 
std::vector<std::string>* paths) {
@@ -522,7 +523,7 @@ OLAPStatus 
OlapStore::_load_tablet_from_header(StorageEngine* engine, TTabletId
         LOG(WARNING) << "failed to add tablet. tablet=" << tablet->full_name();
         return res;
     }
-    res = engine->register_tablet_into_root_path(tablet.get());
+    res = tablet->register_tablet_into_dir();
     if (res != OLAP_SUCCESS) {
         LOG(WARNING) << "fail to register tablet into root path. root_path=" 
<< tablet->storage_root_path_name();
 
@@ -539,7 +540,7 @@ OLAPStatus 
OlapStore::_load_tablet_from_header(StorageEngine* engine, TTabletId
     return OLAP_SUCCESS;
 }
 
-OLAPStatus OlapStore::load_tables(StorageEngine* engine) {
+OLAPStatus OlapStore::load_tablets(StorageEngine* engine) {
     auto load_tablet_func = [this, engine](long tablet_id,
             long schema_hash, const std::string& value) -> bool {
         OLAPStatus status = _load_tablet_from_header(engine, tablet_id, 
schema_hash, value);
diff --git a/be/src/olap/store.h b/be/src/olap/store.h
index 5efbc85f..22b04708 100644
--- a/be/src/olap/store.h
+++ b/be/src/olap/store.h
@@ -38,7 +38,7 @@ class OlapStore {
     OlapStore(const std::string& path, int64_t capacity_bytes = -1);
     ~OlapStore();
 
-    Status load();
+    Status init();
 
     const std::string& path() const { return _path; }
     const int64_t path_hash() const { return _path_hash; }
@@ -54,6 +54,9 @@ class OlapStore {
         return info;
     }
 
+    // save a cluster_id file under data path to prevent
+    // invalid be config for example two be use the same 
+    // data path
     Status set_cluster_id(int32_t cluster_id);
     void health_check();
 
@@ -64,22 +67,22 @@ class OlapStore {
     bool is_ssd_disk() const {
         return _storage_medium == TStorageMedium::SSD;
     }
+
     TStorageMedium::type storage_medium() const { return _storage_medium; }
 
     OLAPStatus register_tablet(Tablet* tablet);
     OLAPStatus deregister_tablet(Tablet* tablet);
 
-    std::string get_tablet_schema_hash_path_from_header(TabletMeta* header);
-
-    std::string get_tablet_path_from_header(TabletMeta* header);
+    std::string get_absolute_tablet_path(TabletMeta* header, bool 
with_schema_hash);
 
-    std::string get_shard_path_from_header(const std::string& shard_string);
+    std::string get_absolute_shard_path(const std::string& shard_string);
 
     void find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* 
paths);
 
     static std::string get_root_path_from_schema_hash_path_in_trash(const 
std::string& schema_hash_dir_in_trash);
 
-    OLAPStatus load_tables(StorageEngine* engine);
+    OLAPStatus load_tablets(StorageEngine* engine);
+
 private:
     std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
     Status _init_cluster_id();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8e707cc4..1cabe53a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -298,6 +298,30 @@ OLAPStatus Tablet::load() {
     return res;
 }
 
+bool Tablet::can_do_compaction() {
+    // 如果table正在做schema change,则通过选路判断数据是否转换完成
+    // 如果选路成功,则转换完成,可以进行BE
+    // 如果选路失败,则转换未完成,不能进行BE
+    this->obtain_header_rdlock();
+    const PDelta* lastest_version = this->lastest_version();
+    if (lastest_version == NULL) {
+        this->release_header_lock();
+        return false;
+    }
+
+    if (this->is_schema_changing()) {
+        Version test_version = Version(0, lastest_version->end_version());
+        vector<Version> path_versions;
+        if (OLAP_SUCCESS != this->select_versions_to_span(test_version, 
&path_versions)) {
+            this->release_header_lock();
+            return false;
+        }
+    }
+    this->release_header_lock();
+
+    return true;
+}
+
 OLAPStatus Tablet::load_indices() {
     OLAPStatus res = OLAP_SUCCESS;
     ReadLock rdlock(&_header_lock);
@@ -2182,4 +2206,8 @@ OLAPStatus Tablet::test_version(const Version& version) {
     return res;
 }
 
+OLAPStatus Tablet::register_tablet_into_dir() {
+    return _store->register_tablet(this);
+}
+
 }  // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 8d422a61..d3fdfa05 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -69,6 +69,8 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
 
     virtual ~Tablet();
 
+
+    bool can_do_compaction();
     // Initializes tablet and loads indices for all versions.
     // Returns OLAP_SUCCESS on success.
     OLAPStatus load();
@@ -599,6 +601,8 @@ class Tablet : public std::enable_shared_from_this<Tablet> {
 
     OLAPStatus recover_tablet_until_specfic_version(const int64_t& 
until_version,
                                                     const int64_t& 
version_hash);
+    
+    OLAPStatus register_tablet_into_dir();
 private:
     // used for hash-struct of hash_map<Version, SegmentGroup*>.
     struct HashOfVersion {
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
new file mode 100644
index 00000000..814e467d
--- /dev/null
+++ b/be/src/olap/tablet_manager.cpp
@@ -0,0 +1,1242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/tablet_manager.h"
+
+#include <signal.h>
+
+#include <algorithm>
+#include <cstdio>
+#include <new>
+#include <queue>
+#include <set>
+#include <random>
+
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <boost/filesystem.hpp>
+#include <rapidjson/document.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "agent/file_downloader.h"
+#include "olap/base_compaction.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/lru_cache.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
+#include "olap/push_handler.h"
+#include "olap/reader.h"
+#include "olap/schema_change.h"
+#include "olap/store.h"
+#include "olap/utils.h"
+#include "olap/data_writer.h"
+#include "util/time.h"
+#include "util/doris_metrics.h"
+#include "util/pretty_printer.h"
+
+using apache::thrift::ThriftDebugString;
+using boost::filesystem::canonical;
+using boost::filesystem::directory_iterator;
+using boost::filesystem::path;
+using boost::filesystem::recursive_directory_iterator;
+using std::back_inserter;
+using std::copy;
+using std::inserter;
+using std::list;
+using std::map;
+using std::nothrow;
+using std::pair;
+using std::priority_queue;
+using std::set;
+using std::set_difference;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace doris {
+
+bool _sort_tablet_by_create_time(const TabletSharedPtr& a, const 
TabletSharedPtr& b) {
+    return a->creation_time() < b->creation_time();
+}
+
+TabletManager::TabletManager()
+        : _available_storage_medium_type_count(0),
+        _global_tablet_id(0),
+        _tablet_stat_cache_update_time_ms(0) {
+}
+
+OLAPStatus TabletManager::add_tablet(TTabletId tablet_id, SchemaHash 
schema_hash,
+                                 const TabletSharedPtr& tablet, bool force) {
+    OLAPStatus res = OLAP_SUCCESS;
+    VLOG(3) << "begin to add tablet to TabletManager. "
+            << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
+            << ", force=" << force;
+    _tablet_map_lock.wrlock();
+
+    tablet->set_id(_global_tablet_id++);
+
+    TabletSharedPtr table_item;
+    for (TabletSharedPtr item : _tablet_map[tablet_id].table_arr) {
+        if (item->equal(tablet_id, schema_hash)) {
+            table_item = item;
+            break;
+        }
+    }
+
+    if (table_item.get() == NULL) {
+        _tablet_map[tablet_id].table_arr.push_back(tablet);
+        _tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_create_time);
+        _tablet_map_lock.unlock();
+
+        return res;
+    }
+    _tablet_map_lock.unlock();
+
+    if (!force) {
+        if (table_item->tablet_path() == tablet->tablet_path()) {
+            LOG(WARNING) << "add the same tablet twice! tablet_id="
+                << tablet_id << " schema_hash=" << tablet_id;
+            return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
+        }
+    }
+
+    table_item->obtain_header_rdlock();
+    int64_t old_time = table_item->lastest_version()->creation_time();
+    int64_t new_time = tablet->lastest_version()->creation_time();
+    int32_t old_version = table_item->lastest_version()->end_version();
+    int32_t new_version = tablet->lastest_version()->end_version();
+    table_item->release_header_lock();
+
+    /*
+     * In restore process, we replace all origin files in tablet dir with
+     * the downloaded snapshot files. Than we try to reload tablet header.
+     * force == true means we forcibly replace the Tablet in _tablet_map
+     * with the new one. But if we do so, the files in the tablet dir will be
+     * dropped when the origin Tablet deconstruct.
+     * So we set keep_files == true to not delete files when the
+     * origin Tablet deconstruct.
+     */
+    bool keep_files = force ? true : false;
+    if (force || (new_version > old_version
+            || (new_version == old_version && new_time > old_time))) {
+        drop_tablet(tablet_id, schema_hash, keep_files);
+        _tablet_map_lock.wrlock();
+        _tablet_map[tablet_id].table_arr.push_back(tablet);
+        _tablet_map[tablet_id].table_arr.sort(_sort_tablet_by_create_time);
+        _tablet_map_lock.unlock();
+    } else {
+        tablet->mark_dropped();
+        res = OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
+    }
+    LOG(WARNING) << "add duplicated tablet. force=" << force << ", res=" << res
+            << ", tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
+            << ", old_version=" << old_version << ", new_version=" << 
new_version
+            << ", old_time=" << old_time << ", new_time=" << new_time
+            << ", old_tablet_path=" << table_item->tablet_path()
+            << ", new_tablet_path=" << tablet->tablet_path();
+
+    return res;
+} // add_tablet
+
+void TabletManager::cancel_unfinished_schema_change() {
+    // Schema Change在引擎退出时schemachange信息还保存在在Header里,
+    // 引擎重启后,需清除schemachange信息,上层会重做
+    uint64_t canceled_num = 0;
+    LOG(INFO) << "begin to cancel unfinished schema change.";
+
+    TTabletId tablet_id;
+    TSchemaHash schema_hash;
+    vector<Version> schema_change_versions;
+    AlterTabletType type;
+
+    for (const auto& tablet_instance : _tablet_map) {
+        for (TabletSharedPtr tablet : tablet_instance.second.table_arr) {
+            if (tablet.get() == NULL) {
+                OLAP_LOG_WARNING("get empty TabletSharedPtr. [tablet_id=%ld]", 
tablet_instance.first);
+                continue;
+            }
+
+            bool ret = tablet->get_schema_change_request(
+                    &tablet_id, &schema_hash, &schema_change_versions, &type);
+            if (!ret) {
+                continue;
+            }
+
+            TabletSharedPtr new_tablet = get_tablet(tablet_id, schema_hash, 
false);
+            if (new_tablet.get() == NULL) {
+                OLAP_LOG_WARNING("the tablet referenced by schema change 
cannot be found. "
+                                 "schema change cancelled. [tablet='%s']",
+                                 tablet->full_name().c_str());
+                continue;
+            }
+
+            // DORIS-3741. Upon restart, it should not clear schema change 
request.
+            new_tablet->set_schema_change_status(
+                    ALTER_TABLE_FAILED, new_tablet->schema_hash(), -1);
+            tablet->set_schema_change_status(
+                    ALTER_TABLE_FAILED, tablet->schema_hash(), -1);
+            VLOG(3) << "cancel unfinished schema change. tablet=" << 
tablet->full_name();
+            ++canceled_num;
+        }
+    }
+
+    LOG(INFO) << "finish to cancel unfinished schema change! canceled_num=" << 
canceled_num;
+}
+
+bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) {
+    bool is_exist = false;
+    _tablet_map_lock.rdlock();
+
+    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
+    if (it != _tablet_map.end() && it->second.table_arr.size() != 0) {
+        is_exist = true;
+    }
+
+    _tablet_map_lock.unlock();
+    return is_exist;
+} // check_tablet_id_exist
+
+void TabletManager::clear() {
+    _tablet_map.clear();
+} // clear
+
+OLAPStatus TabletManager::create_init_version(TTabletId tablet_id, SchemaHash 
schema_hash,
+                                           Version version, VersionHash 
version_hash) {
+    VLOG(3) << "begin to create init version. "
+            << "begin=" << version.first << ", end=" << version.second;
+    TabletSharedPtr tablet;
+    ColumnDataWriter* writer = NULL;
+    SegmentGroup* new_segment_group = NULL;
+    OLAPStatus res = OLAP_SUCCESS;
+    std::vector<SegmentGroup*> index_vec;
+
+    do {
+        if (version.first > version.second) {
+            OLAP_LOG_WARNING("begin should not larger than end. [begin=%d 
end=%d]",
+                             version.first, version.second);
+            res = OLAP_ERR_INPUT_PARAMETER_ERROR;
+            break;
+        }
+
+        // Get tablet and generate new index
+        tablet = get_tablet(tablet_id, schema_hash);
+        if (tablet.get() == NULL) {
+            OLAP_LOG_WARNING("fail to find tablet. [tablet=%ld]", tablet_id);
+            res = OLAP_ERR_TABLE_NOT_FOUND;
+            break;
+        }
+
+        new_segment_group = new(nothrow) SegmentGroup(tablet.get(), version, 
version_hash, false, 0, 0);
+        if (new_segment_group == NULL) {
+            LOG(WARNING) << "fail to malloc index. [tablet=" << 
tablet->full_name() << "]";
+            res = OLAP_ERR_MALLOC_ERROR;
+            break;
+        }
+
+        // Create writer, which write nothing to tablet, to generate empty 
data file
+        writer = ColumnDataWriter::create(tablet, new_segment_group, false);
+        if (writer == NULL) {
+            LOG(WARNING) << "fail to create writer. [tablet=" << 
tablet->full_name() << "]";
+            res = OLAP_ERR_MALLOC_ERROR;
+            break;
+        }
+
+        res = writer->finalize();
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to finalize writer. [tablet=" << 
tablet->full_name() << "]";
+            break;
+        }
+
+        // Load new index and add to tablet
+        res = new_segment_group->load();
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to load new index. [tablet=" << 
tablet->full_name() << "]";
+            break;
+        }
+
+        WriteLock wrlock(tablet->get_header_lock_ptr());
+        index_vec.push_back(new_segment_group);
+        res = tablet->register_data_source(index_vec);
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to register index to data sources. 
[tablet=%s]",
+                             tablet->full_name().c_str());
+            break;
+        }
+
+        res = tablet->save_tablet_meta();
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to save header. [tablet=" << 
tablet->full_name() << "]";
+            break;
+        }
+    } while (0);
+
+    // Unregister index and delete files(index and data) if failed
+    if (res != OLAP_SUCCESS && tablet.get() != NULL) {
+        std::vector<SegmentGroup*> unused_index;
+        tablet->obtain_header_wrlock();
+        tablet->unregister_data_source(version, &unused_index);
+        tablet->release_header_lock();
+
+        for (SegmentGroup* index : index_vec) {
+            index->delete_all_files();
+            SAFE_DELETE(index);
+        }
+    }
+
+    VLOG(3) << "create init version end. res=" << res;
+    SAFE_DELETE(writer);
+    return res;
+} // create_init_version
+
+OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, 
+    std::vector<OlapStore*> stores) {
+    OLAPStatus res = OLAP_SUCCESS;
+    bool is_tablet_added = false;
+
+    LOG(INFO) << "begin to process create tablet. tablet=" << request.tablet_id
+              << ", schema_hash=" << request.tablet_schema.schema_hash;
+
+    DorisMetrics::create_tablet_requests_total.increment(1);
+
+    // 1. Make sure create_tablet operation is idempotent:
+    //    return success if tablet with same tablet_id and schema_hash exist,
+    //           false if tablet with same tablet_id but different schema_hash 
exist
+    if (check_tablet_id_exist(request.tablet_id)) {
+        TabletSharedPtr tablet = get_tablet(
+                request.tablet_id, request.tablet_schema.schema_hash);
+        if (tablet.get() != NULL) {
+            LOG(INFO) << "create tablet success for tablet already exist.";
+            return OLAP_SUCCESS;
+        } else {
+            OLAP_LOG_WARNING("tablet with different schema hash already 
exists.");
+            return OLAP_ERR_CE_TABLET_ID_EXIST;
+        }
+    }
+
+    // 2. Lock to ensure that all create_tablet operation execute in serial
+    static Mutex create_tablet_lock;
+    MutexLock auto_lock(&create_tablet_lock);
+
+    TabletSharedPtr tablet;
+    do {
+        // 3. Create tablet with only header, no deltas
+        tablet = create_tablet(request, NULL, false, NULL, stores);
+        if (tablet == NULL) {
+            res = OLAP_ERR_CE_CMD_PARAMS_ERROR;
+            OLAP_LOG_WARNING("fail to create tablet. [res=%d]", res);
+            break;
+        }
+
+        // 4. Add tablet to StorageEngine will make it visiable to user
+        res = add_tablet(
+                request.tablet_id, request.tablet_schema.schema_hash, tablet);
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to add tablet to StorageEngine. [res=%d]", 
res);
+            break;
+        }
+        is_tablet_added = true;
+
+        TabletSharedPtr tablet_ptr = get_tablet(
+                request.tablet_id, request.tablet_schema.schema_hash);
+        if (tablet_ptr.get() == NULL) {
+            res = OLAP_ERR_TABLE_NOT_FOUND;
+            OLAP_LOG_WARNING("fail to get tablet. [res=%d]", res);
+            break;
+        }
+
+        // 5. Register tablet into StorageEngine, so that we can manage tablet 
from
+        // the perspective of root path.
+        // Example: unregister all tables when a bad disk found.
+        res = tablet_ptr->register_tablet_into_dir();
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to register tablet into StorageEngine. 
[res=%d, root_path=%s]",
+                    res, tablet_ptr->storage_root_path_name().c_str());
+            break;
+        }
+
+        // 6. Create init version if this is not a restore mode replica and 
request.version is set
+        // bool in_restore_mode = request.__isset.in_restore_mode && 
request.in_restore_mode;
+        // if (!in_restore_mode && request.__isset.version) {
+        res = _create_init_version(tablet_ptr, request);
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to create initial version for tablet. 
[res=%d]", res);
+        }
+        // }
+    } while (0);
+
+    // 7. clear environment
+    if (res != OLAP_SUCCESS) {
+        DorisMetrics::create_tablet_requests_failed.increment(1);
+        if (is_tablet_added) {
+            OLAPStatus status = drop_tablet(
+                    request.tablet_id, request.tablet_schema.schema_hash);
+            if (status !=  OLAP_SUCCESS) {
+                OLAP_LOG_WARNING("fail to drop tablet when create tablet 
failed. [res=%d]", res);
+            }
+        } else if (NULL != tablet) {
+            tablet->delete_all_files();
+        }
+    }
+
+    LOG(INFO) << "finish to process create tablet. res=" << res;
+    return res;
+} // create_tablet
+
+TabletSharedPtr TabletManager::create_tablet(
+        const TCreateTabletReq& request, const string* ref_root_path, 
+        const bool is_schema_change_tablet, const TabletSharedPtr ref_tablet, 
+        std::vector<OlapStore*> stores) {
+
+    TabletSharedPtr tablet;
+    // Try to create tablet on each of all_available_root_path, util success
+    for (auto& store : stores) {
+        TabletMeta* header = new TabletMeta();
+        OLAPStatus res = _create_new_tablet_header(request, store, 
is_schema_change_tablet, ref_tablet, header);
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to create tablet header. [res=" << res << " 
root=" << store->path();
+            break;
+        }
+
+        tablet = Tablet::create_from_header(header, store);
+        if (tablet == nullptr) {
+            LOG(WARNING) << "fail to load tablet from header. root_path:%s" << 
store->path();
+            break;
+        }
+
+        // commit header finally
+        res = TabletMetaManager::save(store, request.tablet_id, 
request.tablet_schema.schema_hash, header);
+        if (res != OLAP_SUCCESS) {
+            LOG(WARNING) << "fail to save header. [res=" << res << " root=" << 
store->path();
+            break;
+        }
+        break;
+    }
+
+    return tablet;
+} // create_tablet
+
+OLAPStatus TabletManager::create_rollup_tablet(const TAlterTabletReq& request) 
{
+    LOG(INFO) << "begin to create rollup tablet. "
+              << "old_tablet_id=" << request.base_tablet_id
+              << ", new_tablet_id=" << request.new_tablet_req.tablet_id;
+
+    DorisMetrics::create_rollup_requests_total.increment(1);
+
+    OLAPStatus res = OLAP_SUCCESS;
+
+    SchemaChangeHandler handler;
+    res = handler.process_alter_tablet(ALTER_TABLET_CREATE_ROLLUP_TABLE, 
request);
+
+    if (res != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("failed to do rollup. "
+                         "[base_tablet=%ld new_tablet=%ld] [res=%d]",
+                         request.base_tablet_id, 
request.new_tablet_req.tablet_id, res);
+        DorisMetrics::create_rollup_requests_failed.increment(1);
+        return res;
+    }
+
+    LOG(INFO) << "success to create rollup tablet. res=" << res
+              << ", old_tablet_id=" << request.base_tablet_id 
+              << ", new_tablet_id=" << request.new_tablet_req.tablet_id;
+    return res;
+} // create_rollup_tablet
+
+// Drop tablet specified, the main logical is as follows:
+// 1. tablet not in schema change:
+//      drop specified tablet directly;
+// 2. tablet in schema change:
+//      a. schema change not finished && dropped tablet is base :
+//          base tablet cannot be dropped;
+//      b. other cases:
+//          drop specified tablet and clear schema change info.
+OLAPStatus TabletManager::drop_tablet(
+        TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
+    LOG(INFO) << "begin to process drop tablet."
+        << "tablet=" << tablet_id << ", schema_hash=" << schema_hash;
+    DorisMetrics::drop_tablet_requests_total.increment(1);
+
+    OLAPStatus res = OLAP_SUCCESS;
+
+    // Get tablet which need to be droped
+    _tablet_map_lock.rdlock();
+    TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
+    _tablet_map_lock.unlock();
+    if (dropped_tablet.get() == NULL) {
+        OLAP_LOG_WARNING("fail to drop not existed tablet. [tablet_id=%ld 
schema_hash=%d]",
+                         tablet_id, schema_hash);
+        return OLAP_ERR_TABLE_NOT_FOUND;
+    }
+
+    // Try to get schema change info
+    AlterTabletType type;
+    TTabletId related_tablet_id;
+    TSchemaHash related_schema_hash;
+    vector<Version> schema_change_versions;
+    dropped_tablet->obtain_header_rdlock();
+    bool ret = dropped_tablet->get_schema_change_request(
+            &related_tablet_id, &related_schema_hash, &schema_change_versions, 
&type);
+    dropped_tablet->release_header_lock();
+
+    // Drop tablet directly when not in schema change
+    if (!ret) {
+        return _drop_tablet_directly(tablet_id, schema_hash, keep_files);
+    }
+
+    // Check tablet is in schema change or not, is base tablet or not
+    bool is_schema_change_finished = true;
+    if (schema_change_versions.size() != 0) {
+        is_schema_change_finished = false;
+    }
+
+    bool is_drop_base_tablet = false;
+    _tablet_map_lock.rdlock();
+    TabletSharedPtr related_tablet = _get_tablet_with_no_lock(
+            related_tablet_id, related_schema_hash);
+    _tablet_map_lock.unlock();
+    if (related_tablet.get() == NULL) {
+        OLAP_LOG_WARNING("drop tablet directly when related tablet not found. "
+                         "[tablet_id=%ld schema_hash=%d]",
+                         related_tablet_id, related_schema_hash);
+        return _drop_tablet_directly(tablet_id, schema_hash, keep_files);
+    }
+
+    if (dropped_tablet->creation_time() < related_tablet->creation_time()) {
+        is_drop_base_tablet = true;
+    }
+
+    if (is_drop_base_tablet && !is_schema_change_finished) {
+        OLAP_LOG_WARNING("base tablet in schema change cannot be droped. 
[tablet=%s]",
+                         dropped_tablet->full_name().c_str());
+        return OLAP_ERR_PREVIOUS_SCHEMA_CHANGE_NOT_FINISHED;
+    }
+
+    // Drop specified tablet and clear schema change info
+    _tablet_map_lock.wrlock();
+    related_tablet->obtain_header_wrlock();
+    related_tablet->clear_schema_change_request();
+    res = related_tablet->save_tablet_meta();
+    if (res != OLAP_SUCCESS) {
+        LOG(FATAL) << "fail to save tablet header. res=" << res
+                   << ", tablet=" << related_tablet->full_name();
+    }
+
+    res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, keep_files);
+    related_tablet->release_header_lock();
+    _tablet_map_lock.unlock();
+    if (res != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to drop tablet which in schema change. 
[tablet=%s]",
+                         dropped_tablet->full_name().c_str());
+        return res;
+    }
+
+    LOG(INFO) << "finish to drop tablet. res=" << res;
+    return res;
+} // drop_tablet
+
+OLAPStatus TabletManager::drop_tablets_on_error_root_path(
+        const vector<TabletInfo>& tablet_info_vec) {
+    OLAPStatus res = OLAP_SUCCESS;
+
+    _tablet_map_lock.wrlock();
+
+    for (const TabletInfo& tablet_info : tablet_info_vec) {
+        TTabletId tablet_id = tablet_info.tablet_id;
+        TSchemaHash schema_hash = tablet_info.schema_hash;
+        VLOG(3) << "drop_tablet begin. tablet_id=" << tablet_id
+                << ", schema_hash=" << schema_hash;
+        TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
+        if (dropped_tablet.get() == NULL) {
+            OLAP_LOG_WARNING("dropping tablet not exist. [tablet=%ld 
schema_hash=%d]",
+                             tablet_id, schema_hash);
+            continue;
+        } else {
+            for (list<TabletSharedPtr>::iterator it = 
_tablet_map[tablet_id].table_arr.begin();
+                    it != _tablet_map[tablet_id].table_arr.end();) {
+                if ((*it)->equal(tablet_id, schema_hash)) {
+                    it = _tablet_map[tablet_id].table_arr.erase(it);
+                } else {
+                    ++it;
+                }
+            }
+
+            if (_tablet_map[tablet_id].table_arr.empty()) {
+                _tablet_map.erase(tablet_id);
+            }
+        }
+    }
+
+    _tablet_map_lock.unlock();
+
+    return res;
+} // drop_tablets_on_error_root_path
+
+TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash 
schema_hash, bool load_tablet) {
+    _tablet_map_lock.rdlock();
+    TabletSharedPtr tablet;
+    tablet = _get_tablet_with_no_lock(tablet_id, schema_hash);
+    _tablet_map_lock.unlock();
+
+    if (tablet.get() != NULL) {
+        if (!tablet->is_used()) {
+            OLAP_LOG_WARNING("tablet cannot be used. [tablet=%ld]", tablet_id);
+            tablet.reset();
+        } else if (load_tablet && !tablet->is_loaded()) {
+            if (tablet->load() != OLAP_SUCCESS) {
+                OLAP_LOG_WARNING("fail to load tablet. [tablet=%ld]", 
tablet_id);
+                tablet.reset();
+            }
+        }
+    }
+
+    return tablet;
+} // get_tablet
+
+OLAPStatus TabletManager::get_tablets_by_id(
+        TTabletId tablet_id,
+        list<TabletSharedPtr>* tablet_list) {
+    OLAPStatus res = OLAP_SUCCESS;
+    VLOG(3) << "begin to get tables by id. tablet_id=" << tablet_id;
+
+    _tablet_map_lock.rdlock();
+    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
+    if (it != _tablet_map.end()) {
+        for (TabletSharedPtr tablet : it->second.table_arr) {
+            tablet_list->push_back(tablet);
+        }
+    }
+    _tablet_map_lock.unlock();
+
+    if (tablet_list->size() == 0) {
+        OLAP_LOG_WARNING("there is no tablet with specified id. [tablet=%ld]", 
tablet_id);
+        return OLAP_ERR_TABLE_NOT_FOUND;
+    }
+
+    for (std::list<TabletSharedPtr>::iterator it = tablet_list->begin();
+            it != tablet_list->end();) {
+        if (!(*it)->is_loaded()) {
+            if ((*it)->load() != OLAP_SUCCESS) {
+                OLAP_LOG_WARNING("fail to load tablet. [tablet='%s']",
+                                 (*it)->full_name().c_str());
+                it = tablet_list->erase(it);
+                continue;
+            }
+        }
+        ++it;
+    }
+
+    VLOG(3) << "success to get tables by id. table_num=" << 
tablet_list->size();
+    return res;
+}
+
+void TabletManager::get_tablet_stat(TTabletStatResult& result) {
+    VLOG(3) << "begin to get all tablet stat.";
+
+    // get current time
+    int64_t current_time = UnixMillis();
+    
+    _tablet_map_lock.wrlock();
+    // update cache if too old
+    if (current_time - _tablet_stat_cache_update_time_ms > 
+        config::tablet_stat_cache_update_interval_second * 1000) {
+        VLOG(3) << "update tablet stat.";
+        _build_tablet_stat();
+    }
+
+    result.__set_tablets_stats(_tablet_stat_cache);
+
+    _tablet_map_lock.unlock();
+} // get_tablet_stat
+
+TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType 
compaction_type) {
+    ReadLock tablet_map_rdlock(&_tablet_map_lock);
+    uint32_t highest_score = 0;
+    TabletSharedPtr best_tablet;
+    for (tablet_map_t::value_type& table_ins : _tablet_map){
+        for (TabletSharedPtr& table_ptr : table_ins.second.table_arr) {
+            if (!table_ptr->is_loaded() || !table_ptr->can_do_compaction()) {
+                continue;
+            }
+
+            ReadLock rdlock(table_ptr->get_header_lock_ptr());
+            uint32_t table_score = 0;
+            if (compaction_type == CompactionType::BASE_COMPACTION) {
+                table_score = table_ptr->get_base_compaction_score();
+            } else if (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION) {
+                table_score = table_ptr->get_cumulative_compaction_score();
+            }
+            if (table_score > highest_score) {
+                highest_score = table_score;
+                best_tablet = table_ptr;
+            }
+        }
+    }
+    return best_tablet;
+}
+
+OLAPStatus TabletManager::load_one_tablet(
+        OlapStore* store, TTabletId tablet_id, SchemaHash schema_hash,
+        const string& schema_hash_path, bool force) {
+    stringstream header_name_stream;
+    header_name_stream << schema_hash_path << "/" << tablet_id << ".hdr";
+    string header_path = header_name_stream.str();
+    path boost_schema_hash_path(schema_hash_path);
+
+    if (access(header_path.c_str(), F_OK) != 0) {
+        LOG(WARNING) << "fail to find header file. [header_path=" << 
header_path << "]";
+        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
+        return OLAP_ERR_FILE_NOT_EXIST;
+    }
+
+    auto tablet = Tablet::create_from_header_file(
+            tablet_id, schema_hash, header_path, store);
+    if (tablet == NULL) {
+        LOG(WARNING) << "fail to load tablet. [header_path=" << header_path << 
"]";
+        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
+        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
+    }
+
+    if (tablet->lastest_version() == NULL && !tablet->is_schema_changing()) {
+        OLAP_LOG_WARNING("tablet not in schema change state without delta is 
invalid. "
+                         "[header_path=%s]",
+                         header_path.c_str());
+        move_to_trash(boost_schema_hash_path, boost_schema_hash_path);
+        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
+    }
+
+    // 这里不需要SAFE_DELETE(tablet),因为tablet指针已经在add_table中托管到smart pointer中
+    OLAPStatus res = OLAP_SUCCESS;
+    string table_name = tablet->full_name();
+    res = add_tablet(tablet_id, schema_hash, tablet, force);
+    if (res != OLAP_SUCCESS) {
+        // 插入已经存在的table时返回成功
+        if (res == OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE) {
+            return OLAP_SUCCESS;
+        }
+
+        LOG(WARNING) << "failed to add tablet. [tablet=" << table_name << "]";
+        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
+    }
+
+    if (tablet->register_tablet_into_dir() != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to register tablet into root path. 
[root_path=%s]",
+                         schema_hash_path.c_str());
+
+        if (StorageEngine::get_instance()->drop_tablet(tablet_id, schema_hash) 
!= OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to drop tablet when create tablet failed. "
+                             "[tablet=%ld schema_hash=%d]",
+                             tablet_id, schema_hash);
+        }
+
+        return OLAP_ERR_ENGINE_LOAD_INDEX_TABLE_ERROR;
+    }
+
+    // load pending data (for realtime push), will add transaction 
relationship into engine
+    tablet->load_pending_data();
+
+    VLOG(3) << "succeed to add tablet. tablet=" << tablet->full_name()
+            << ", path=" << schema_hash_path;
+    return OLAP_SUCCESS;
+} // load_one_tablet
+
+void TabletManager::release_schema_change_lock(TTabletId tablet_id) {
+    VLOG(3) << "release_schema_change_lock begin. tablet_id=" << tablet_id;
+    _tablet_map_lock.rdlock();
+
+    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
+    if (it == _tablet_map.end()) {
+        OLAP_LOG_WARNING("tablet does not exists. [tablet=%ld]", tablet_id);
+    } else {
+        it->second.schema_change_lock.unlock();
+    }
+
+    _tablet_map_lock.unlock();
+    VLOG(3) << "release_schema_change_lock end. tablet_id=" << tablet_id;
+} // release_schema_change_lock
+
+OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) {
+    DorisMetrics::report_tablet_requests_total.increment(1);
+    LOG(INFO) << "begin to process report tablet info."
+              << "tablet_id=" << tablet_info->tablet_id
+              << ", schema_hash=" << tablet_info->schema_hash;
+
+    OLAPStatus res = OLAP_SUCCESS;
+
+    TabletSharedPtr tablet = get_tablet(
+            tablet_info->tablet_id, tablet_info->schema_hash);
+    if (tablet.get() == NULL) {
+        OLAP_LOG_WARNING("can't find tablet. [tablet=%ld schema_hash=%d]",
+                         tablet_info->tablet_id, tablet_info->schema_hash);
+        return OLAP_ERR_TABLE_NOT_FOUND;
+    }
+
+    _build_tablet_info(tablet, tablet_info);
+    LOG(INFO) << "success to process report tablet info.";
+    return res;
+} // report_tablet_info
+
+OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, 
TTablet>* tablets_info) {
+    LOG(INFO) << "begin to process report all tablets info.";
+    DorisMetrics::report_all_tablets_requests_total.increment(1);
+
+    if (tablets_info == NULL) {
+        return OLAP_ERR_INPUT_PARAMETER_ERROR;
+    }
+
+    _tablet_map_lock.rdlock();
+    for (const auto& item : _tablet_map) {
+        if (item.second.table_arr.size() == 0) {
+            continue;
+        }
+
+        TTablet tablet;
+        for (TabletSharedPtr tablet_ptr : item.second.table_arr) {
+            if (tablet_ptr == NULL) {
+                continue;
+            }
+
+            TTabletInfo tablet_info;
+            _build_tablet_info(tablet_ptr, &tablet_info);
+
+            // report expire transaction
+            vector<int64_t> transaction_ids;
+            tablet_ptr->get_expire_pending_data(&transaction_ids);
+            tablet_info.__set_transaction_ids(transaction_ids);
+
+            if (_available_storage_medium_type_count > 1) {
+                
tablet_info.__set_storage_medium(tablet_ptr->store()->storage_medium());
+            }
+
+            tablet_info.__set_version_count(tablet_ptr->file_delta_size());
+            tablet_info.__set_path_hash(tablet_ptr->store()->path_hash());
+
+            tablet.tablet_infos.push_back(tablet_info);
+        }
+
+        if (tablet.tablet_infos.size() != 0) {
+            tablets_info->insert(pair<TTabletId, 
TTablet>(tablet.tablet_infos[0].tablet_id, tablet));
+        }
+    }
+    _tablet_map_lock.unlock();
+
+    LOG(INFO) << "success to process report all tablets info. tablet_num=" << 
tablets_info->size();
+    return OLAP_SUCCESS;
+} // report_all_tablets_info
+
+AlterTableStatus TabletManager::show_alter_tablet_status(
+        TTabletId tablet_id,
+        TSchemaHash schema_hash) {
+    LOG(INFO) << "begin to process show alter tablet status."
+              << "tablet_id" << tablet_id
+              << ", schema_hash" << schema_hash;
+
+    AlterTableStatus status = ALTER_TABLE_FINISHED;
+
+    TabletSharedPtr tablet = get_tablet(tablet_id, schema_hash);
+    if (tablet.get() == NULL) {
+        OLAP_LOG_WARNING("fail to get tablet. [tablet=%ld schema_hash=%d]",
+                         tablet_id, schema_hash);
+        status = ALTER_TABLE_FAILED;
+    } else {
+        status = tablet->schema_change_status().status;
+    }
+
+    return status;
+} // show_alter_tablet_status
+
+OLAPStatus TabletManager::start_trash_sweep() {
+    _tablet_map_lock.rdlock();
+    for (const auto& item : _tablet_map) {
+        for (TabletSharedPtr tablet : item.second.table_arr) {
+            if (tablet.get() == NULL) {
+                continue;
+            }
+            tablet->delete_expire_incremental_data();
+        }
+    }
+    _tablet_map_lock.unlock();
+    return OLAP_SUCCESS;
+} // start_trash_sweep
+
+bool TabletManager::try_schema_change_lock(TTabletId tablet_id) {
+    bool res = false;
+    VLOG(3) << "try_schema_change_lock begin. table_id=" << tablet_id;
+    _tablet_map_lock.rdlock();
+
+    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
+    if (it == _tablet_map.end()) {
+        OLAP_LOG_WARNING("tablet does not exists. [tablet=%ld]", tablet_id);
+    } else {
+        res = (it->second.schema_change_lock.trylock() == OLAP_SUCCESS);
+    }
+
+    _tablet_map_lock.unlock();
+    VLOG(3) << "try_schema_change_lock end. table_id=" <<  tablet_id;
+    return res;
+} // try_schema_change_lock
+
+void TabletManager::update_root_path_info(std::map<std::string, RootPathInfo>* 
path_map, 
+    int* tablet_counter) {
+    _tablet_map_lock.rdlock();
+    for (auto& entry : _tablet_map) {
+        TableInstances& instance = entry.second;
+        for (auto& tablet : instance.table_arr) {
+            (*tablet_counter) ++ ;
+            int64_t data_size = tablet->get_data_size();
+            auto find = path_map->find(tablet->storage_root_path_name()); 
+            if (find == path_map->end()) {
+                continue;
+            }
+            if (find->second.is_used) {
+                find->second.data_used_capacity += data_size;
+            }
+        } 
+    }
+    _tablet_map_lock.unlock();
+} // update_root_path_info
+
+void TabletManager::update_storage_medium_type_count(uint32_t 
storage_medium_type_count) {
+    _available_storage_medium_type_count = storage_medium_type_count;
+}
+
+void TabletManager::_build_tablet_info(TabletSharedPtr tablet, TTabletInfo* 
tablet_info) {
+    tablet_info->tablet_id = tablet->tablet_id();
+    tablet_info->schema_hash = tablet->schema_hash();
+
+    tablet->obtain_header_rdlock();
+    tablet_info->row_count = tablet->get_num_rows();
+    tablet_info->data_size = tablet->get_data_size();
+    const PDelta* last_file_version = tablet->lastest_version();
+    if (last_file_version == NULL) {
+        tablet_info->version = -1;
+        tablet_info->version_hash = 0;
+    } else {
+        // report the version before first missing version
+        vector<Version> missing_versions;
+        tablet->get_missing_versions_with_header_locked(
+                last_file_version->end_version(), &missing_versions);
+        const PDelta* least_complete_version =
+            tablet->least_complete_version(missing_versions);
+        if (least_complete_version == NULL) {
+            tablet_info->version = -1;
+            tablet_info->version_hash = 0;
+        } else {
+            tablet_info->version = least_complete_version->end_version();
+            tablet_info->version_hash = least_complete_version->version_hash();
+        }
+    }
+    tablet->release_header_lock();
+}
+
+void TabletManager::_build_tablet_stat() {
+    _tablet_stat_cache.clear();
+    for (const auto& item : _tablet_map) {
+        if (item.second.table_arr.size() == 0) {
+            continue;
+        }
+
+        TTabletStat stat;
+        stat.tablet_id = item.first;
+        for (TabletSharedPtr tablet : item.second.table_arr) {
+            if (tablet.get() == NULL) {
+                continue;
+            }
+                
+            // we only get base tablet's stat
+            stat.__set_data_size(tablet->get_data_size());
+            stat.__set_row_num(tablet->get_num_rows());
+            VLOG(3) << "tablet_id=" << item.first 
+                    << ", data_size=" << tablet->get_data_size()
+                    << ", row_num:" << tablet->get_num_rows();
+            break;
+        }
+
+        _tablet_stat_cache.emplace(item.first, stat);
+    }
+
+    _tablet_stat_cache_update_time_ms = UnixMillis();
+}
+
+OLAPStatus TabletManager::_create_init_version(
+        TabletSharedPtr tablet, const TCreateTabletReq& request) {
+    OLAPStatus res = OLAP_SUCCESS;
+
+    if (request.version < 1) {
+        OLAP_LOG_WARNING("init version of tablet should at least 1.");
+        return OLAP_ERR_CE_CMD_PARAMS_ERROR;
+    } else {
+        Version init_base_version(0, request.version);
+        res = create_init_version(
+                request.tablet_id, request.tablet_schema.schema_hash,
+                init_base_version, request.version_hash);
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to create init base version. [res=%d 
version=%ld]",
+                    res, request.version);
+            return res;
+        }
+
+        Version init_delta_version(request.version + 1, request.version + 1);
+        res = create_init_version(
+                request.tablet_id, request.tablet_schema.schema_hash,
+                init_delta_version, 0);
+        if (res != OLAP_SUCCESS) {
+            OLAP_LOG_WARNING("fail to create init delta version. [res=%d 
version=%ld]",
+                    res, request.version + 1);
+            return res;
+        }
+    }
+
+    tablet->obtain_header_wrlock();
+    tablet->set_cumulative_layer_point(request.version + 1);
+    res = tablet->save_tablet_meta();
+    tablet->release_header_lock();
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to save header. [tablet=" << tablet->full_name() 
<< "]";
+    }
+
+    return res;
+}
+
+OLAPStatus TabletManager::_create_new_tablet_header(
+        const TCreateTabletReq& request,
+        OlapStore* store,
+        const bool is_schema_change_tablet,
+        const TabletSharedPtr ref_tablet,
+        TabletMeta* header) {
+    uint64_t shard = 0;
+    OLAPStatus res = store->get_shard(&shard);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to get root path shard. res=" << res;
+        return res;
+    }
+    stringstream schema_hash_dir_stream;
+    schema_hash_dir_stream << store->path()
+                      << DATA_PREFIX
+                      << "/" << shard
+                      << "/" << request.tablet_id
+                      << "/" << request.tablet_schema.schema_hash;
+    string schema_hash_dir = schema_hash_dir_stream.str();
+    if (check_dir_existed(schema_hash_dir)) {
+        LOG(WARNING) << "failed to create the dir that existed. path=" << 
schema_hash_dir;
+        return OLAP_ERR_CANNOT_CREATE_DIR;
+    }
+    res = create_dirs(schema_hash_dir);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "create dir fail. [res=" << res << " path:" << 
schema_hash_dir;
+        return res;
+    }
+    
+    // set basic information
+    
header->set_num_short_key_fields(request.tablet_schema.short_key_column_count);
+    header->set_compress_kind(COMPRESS_LZ4);
+    if (request.tablet_schema.keys_type == TKeysType::DUP_KEYS) {
+        header->set_keys_type(KeysType::DUP_KEYS);
+    } else if (request.tablet_schema.keys_type == TKeysType::UNIQUE_KEYS) {
+        header->set_keys_type(KeysType::UNIQUE_KEYS);
+    } else {
+        header->set_keys_type(KeysType::AGG_KEYS);
+    }
+    DCHECK(request.tablet_schema.storage_type == TStorageType::COLUMN);
+    header->set_data_file_type(COLUMN_ORIENTED_FILE);
+    header->set_segment_size(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE);
+    
header->set_num_rows_per_data_block(config::default_num_rows_per_column_file_block);
+
+    // set column information
+    uint32_t i = 0;
+    uint32_t key_count = 0;
+    bool has_bf_columns = false;
+    uint32_t next_unique_id = 0;
+    if (true == is_schema_change_tablet) {
+        next_unique_id = ref_tablet->next_unique_id();
+    }
+    for (TColumn column : request.tablet_schema.columns) {
+        if (column.column_type.type == TPrimitiveType::VARCHAR
+                && i < request.tablet_schema.short_key_column_count - 1) {
+            LOG(WARNING) << "varchar type column should be the last short 
key.";
+            return OLAP_ERR_SCHEMA_SCHEMA_INVALID;
+        }
+        header->add_column();
+        if (true == is_schema_change_tablet) {
+            /*
+             * for schema change, compare old_tablet and new_tablet
+             * 1. if column in both new_tablet and old_tablet,
+             * assign unique_id of old_tablet to the column of new_tablet
+             * 2. if column exists only in new_tablet, assign next_unique_id 
of old_tablet
+             * to the new column
+             *
+            */
+            size_t field_num = ref_tablet->tablet_schema().size();
+            size_t field_off = 0;
+            for (field_off = 0; field_off < field_num; ++field_off) {
+                if (ref_tablet->tablet_schema()[field_off].name == 
column.column_name) {
+                    uint32_t unique_id = 
ref_tablet->tablet_schema()[field_off].unique_id;
+                    header->mutable_column(i)->set_unique_id(unique_id);
+                    break;
+                }
+            }
+            if (field_off == field_num) {
+                header->mutable_column(i)->set_unique_id(next_unique_id++);
+            }
+        } else {
+            header->mutable_column(i)->set_unique_id(i);
+        }
+        header->mutable_column(i)->set_name(column.column_name);
+        header->mutable_column(i)->set_is_root_column(true);
+        string data_type;
+        EnumToString(TPrimitiveType, column.column_type.type, data_type);
+        header->mutable_column(i)->set_type(data_type);
+        if (column.column_type.type == TPrimitiveType::DECIMAL) {
+            if (column.column_type.__isset.precision && 
column.column_type.__isset.scale) {
+                
header->mutable_column(i)->set_precision(column.column_type.precision);
+                header->mutable_column(i)->set_frac(column.column_type.scale);
+            } else {
+                LOG(WARNING) << "decimal type column should set precision and 
frac.";
+                return OLAP_ERR_SCHEMA_SCHEMA_INVALID;
+            }
+        }
+        if (column.column_type.type == TPrimitiveType::CHAR
+                || column.column_type.type == TPrimitiveType::VARCHAR || 
column.column_type.type == TPrimitiveType::HLL) {
+            if (!column.column_type.__isset.len) {
+                LOG(WARNING) << "CHAR or VARCHAR should specify length. type=" 
<< column.column_type.type;
+                return OLAP_ERR_INPUT_PARAMETER_ERROR;
+            }
+        }
+        uint32_t length = FieldInfo::get_field_length_by_type(
+                column.column_type.type, column.column_type.len);
+        header->mutable_column(i)->set_length(length);
+        header->mutable_column(i)->set_index_length(length);
+        if (column.column_type.type == TPrimitiveType::VARCHAR || 
column.column_type.type == TPrimitiveType::HLL) {
+            if (!column.column_type.__isset.index_len) {
+                header->mutable_column(i)->set_index_length(10);
+            } else {
+                
header->mutable_column(i)->set_index_length(column.column_type.index_len);
+            }
+        }
+        if (!column.is_key) {
+            header->mutable_column(i)->set_is_key(false);
+            string aggregation_type;
+            EnumToString(TAggregationType, column.aggregation_type, 
aggregation_type);
+            header->mutable_column(i)->set_aggregation(aggregation_type);
+        } else {
+            ++key_count;
+            header->add_selectivity(1);
+            header->mutable_column(i)->set_is_key(true);
+            header->mutable_column(i)->set_aggregation("NONE");
+        }
+        if (column.__isset.default_value) {
+            header->mutable_column(i)->set_default_value(column.default_value);
+        }
+        if (column.__isset.is_allow_null) {
+            header->mutable_column(i)->set_is_allow_null(column.is_allow_null);
+        } else {
+            header->mutable_column(i)->set_is_allow_null(false);
+        }
+        if (column.__isset.is_bloom_filter_column) {
+            
header->mutable_column(i)->set_is_bf_column(column.is_bloom_filter_column);
+            has_bf_columns = true;
+        }
+        ++i;
+    }
+    if (true == is_schema_change_tablet){
+        /*
+         * for schema change, next_unique_id of new tablet should be greater 
than
+         * next_unique_id of old tablet
+         * */
+        header->set_next_column_unique_id(next_unique_id);
+    } else {
+        header->set_next_column_unique_id(i);
+    }
+    if (has_bf_columns && request.tablet_schema.__isset.bloom_filter_fpp) {
+        header->set_bf_fpp(request.tablet_schema.bloom_filter_fpp);
+    }
+    if (key_count < request.tablet_schema.short_key_column_count) {
+        LOG(WARNING) << "short key num should not large than key num. "
+                << "key_num=" << key_count << " short_key_num=" << 
request.tablet_schema.short_key_column_count;
+        return OLAP_ERR_INPUT_PARAMETER_ERROR;
+    }
+
+    header->set_creation_time(time(NULL));
+    header->set_cumulative_layer_point(-1);
+    header->set_tablet_id(request.tablet_id);
+    header->set_schema_hash(request.tablet_schema.schema_hash);
+    header->set_shard(shard);
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus TabletManager::_drop_tablet_directly(
+        TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
+    _tablet_map_lock.wrlock();
+    OLAPStatus res = _drop_tablet_directly_unlocked(tablet_id, schema_hash, 
keep_files);
+    _tablet_map_lock.unlock();
+    return res;
+} // _drop_tablet_directly
+
+OLAPStatus TabletManager::_drop_tablet_directly_unlocked(
+        TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) {
+    OLAPStatus res = OLAP_SUCCESS;
+
+    TabletSharedPtr dropped_tablet = _get_tablet_with_no_lock(tablet_id, 
schema_hash);
+    if (dropped_tablet.get() == NULL) {
+        OLAP_LOG_WARNING("fail to drop not existed tablet. [tablet_id=%ld 
schema_hash=%d]",
+                         tablet_id, schema_hash);
+        return OLAP_ERR_TABLE_NOT_FOUND;
+    }
+
+    for (list<TabletSharedPtr>::iterator it = 
_tablet_map[tablet_id].table_arr.begin();
+            it != _tablet_map[tablet_id].table_arr.end();) {
+        if ((*it)->equal(tablet_id, schema_hash)) {
+            if (!keep_files) {
+                (*it)->mark_dropped();
+            }
+            it = _tablet_map[tablet_id].table_arr.erase(it);
+        } else {
+            ++it;
+        }
+    }
+
+    if (_tablet_map[tablet_id].table_arr.empty()) {
+        _tablet_map.erase(tablet_id);
+    }
+
+    res = dropped_tablet->store()->deregister_tablet(dropped_tablet.get());
+    if (res != OLAP_SUCCESS) {
+        OLAP_LOG_WARNING("fail to unregister from root path. [res=%d 
tablet=%ld]",
+                         res, tablet_id);
+    }
+
+    return res;
+} // _drop_tablet_directly_unlocked
+
+TabletSharedPtr TabletManager::_get_tablet_with_no_lock(TTabletId tablet_id, 
SchemaHash schema_hash) {
+    VLOG(3) << "begin to get tablet. tablet_id=" << tablet_id;
+    tablet_map_t::iterator it = _tablet_map.find(tablet_id);
+    if (it != _tablet_map.end()) {
+        for (TabletSharedPtr tablet : it->second.table_arr) {
+            if (tablet->equal(tablet_id, schema_hash)) {
+                VLOG(3) << "get tablet success. tablet_id=" << tablet_id;
+                return tablet;
+            }
+        }
+    }
+
+    VLOG(3) << "fail to get tablet. tablet_id=" << tablet_id;
+    // Return empty tablet if fail
+    TabletSharedPtr tablet;
+    return tablet;
+} // _get_tablet_with_no_lock
+
+} // doris
\ No newline at end of file
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
new file mode 100644
index 00000000..b2f5b7c9
--- /dev/null
+++ b/be/src/olap/tablet_manager.h
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_TABLET_MANAGER_H
+#define DORIS_BE_SRC_OLAP_TABLET_MANAGER_H
+
+#include <ctime>
+#include <list>
+#include <map>
+#include <mutex>
+#include <condition_variable>
+#include <set>
+#include <string>
+#include <vector>
+#include <thread>
+
+#include <rapidjson/document.h>
+#include <pthread.h>
+
+#include "agent/status.h"
+#include "common/status.h"
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/BackendService_types.h"
+#include "gen_cpp/MasterService_types.h"
+#include "olap/atomic.h"
+#include "olap/lru_cache.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/tablet.h"
+#include "olap/olap_meta.h"
+#include "olap/options.h"
+
+namespace doris {
+
+class Tablet;
+class OlapStore;
+
+// TabletManager provides get,add, delete tablet method for storage engine
+class TabletManager {
+public:
+    TabletManager();
+    ~TabletManager() {
+        _tablet_map.clear();
+        _global_tablet_id = 0;
+    }
+
+    // Add a tablet pointer to StorageEngine
+    // If force, drop the existing tablet add this new one
+    //
+    // Return OLAP_SUCCESS, if run ok
+    //        OLAP_ERR_TABLE_INSERT_DUPLICATION_ERROR, if find duplication
+    //        OLAP_ERR_NOT_INITED, if not inited
+    OLAPStatus add_tablet(TTabletId tablet_id, SchemaHash schema_hash,
+                         const TabletSharedPtr& tablet, bool force = false);
+
+    void cancel_unfinished_schema_change();
+
+    bool check_tablet_id_exist(TTabletId tablet_id);
+
+    void clear();
+    
+    // Add empty data for Tablet
+    //
+    // Return OLAP_SUCCESS, if run ok
+    OLAPStatus create_init_version(
+            TTabletId tablet_id, SchemaHash schema_hash,
+            Version version, VersionHash version_hash);
+
+    OLAPStatus create_tablet(const TCreateTabletReq& request, 
+                             std::vector<OlapStore*> stores);
+
+    // Create new tablet for StorageEngine
+    //
+    // Return Tablet *  succeeded; Otherwise, return NULL if failed
+    TabletSharedPtr create_tablet(const TCreateTabletReq& request,
+                              const std::string* ref_root_path, 
+                              const bool is_schema_change_tablet,
+                              const TabletSharedPtr ref_tablet, 
+                              std::vector<OlapStore*> stores);
+
+    // ######################### ALTER TABLE BEGIN #########################
+    // The following interfaces are all about alter tablet operation, 
+    // the main logical is that generating a new tablet with different
+    // schema on base tablet.
+    
+    // Create rollup tablet on base tablet, after create_rollup_tablet,
+    // both base tablet and new tablet is effective.
+    //
+    // @param [in] request specify base tablet, new tablet and its schema
+    // @return OLAP_SUCCESS if submit success
+    OLAPStatus create_rollup_tablet(const TAlterTabletReq& request);
+
+    // Show status of all alter tablet operation.
+    // 
+    // @param [in] tablet_id & schema_hash specify a tablet
+    // @return alter tablet status
+    AlterTableStatus show_alter_tablet_status(TTabletId tablet_id, TSchemaHash 
schema_hash);
+
+    // Drop a tablet by description
+    // If set keep_files == true, files will NOT be deleted when 
deconstruction.
+    // Return OLAP_SUCCESS, if run ok
+    //        OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist
+    //        OLAP_ERR_NOT_INITED, if not inited
+    OLAPStatus drop_tablet(
+            TTabletId tablet_id, SchemaHash schema_hash, bool keep_files = 
false);
+
+    OLAPStatus drop_tablets_on_error_root_path(const std::vector<TabletInfo>& 
tablet_info_vec);
+
+    TabletSharedPtr find_best_tablet_to_compaction(CompactionType 
compaction_type);
+
+    // Get tablet pointer
+    TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash, 
bool load_tablet = true);
+
+    OLAPStatus get_tablets_by_id(TTabletId tablet_id, 
std::list<TabletSharedPtr>* tablet_list);  
+
+    void get_tablet_stat(TTabletStatResult& result);
+
+    OLAPStatus load_one_tablet(OlapStore* store,
+                               TTabletId tablet_id,
+                               SchemaHash schema_hash,
+                               const std::string& schema_hash_path,
+                               bool force = false);
+    
+    void release_schema_change_lock(TTabletId tablet_id);
+    
+    // 获取所有tables的名字
+    //
+    // Return OLAP_SUCCESS, if run ok
+    //        OLAP_ERR_INPUT_PARAMETER_ERROR, if tables is null
+    OLAPStatus report_tablet_info(TTabletInfo* tablet_info);
+
+    OLAPStatus report_all_tablets_info(std::map<TTabletId, TTablet>* 
tablets_info);
+
+    OLAPStatus start_trash_sweep();
+    // Prevent schema change executed concurrently.
+    bool try_schema_change_lock(TTabletId tablet_id);
+
+    void update_root_path_info(std::map<std::string, RootPathInfo>* path_map, 
int* tablet_counter);
+
+    void update_storage_medium_type_count(uint32_t storage_medium_type_count);
+
+private:
+    void _build_tablet_info(TabletSharedPtr tablet, TTabletInfo* tablet_info);
+    
+    void _build_tablet_stat();
+    
+    OLAPStatus _create_init_version(TabletSharedPtr tablet, const 
TCreateTabletReq& request);
+    
+
+    OLAPStatus _create_new_tablet_header(const TCreateTabletReq& request,
+                                             OlapStore* store,
+                                             const bool 
is_schema_change_tablet,
+                                             const TabletSharedPtr ref_tablet,
+                                             TabletMeta* header);
+
+    // Drop tablet directly with check schema change info.
+    OLAPStatus _drop_tablet_directly(TTabletId tablet_id, TSchemaHash 
schema_hash, bool keep_files = false);
+    
+    OLAPStatus _drop_tablet_directly_unlocked(TTabletId tablet_id, TSchemaHash 
schema_hash, bool keep_files = false);
+
+    TabletSharedPtr _get_tablet_with_no_lock(TTabletId tablet_id, SchemaHash 
schema_hash);
+
+private:
+    struct TableInstances {
+        Mutex schema_change_lock;
+        std::list<TabletSharedPtr> table_arr;
+    };
+    typedef std::map<int64_t, TableInstances> tablet_map_t;
+    RWMutex _tablet_map_lock;
+    tablet_map_t _tablet_map;
+    size_t _global_tablet_id;
+    std::map<std::string, OlapStore*> _store_map;
+
+    // cache to save tablets' statistics, such as data size and row
+    // TODO(cmy): for now, this is a naive implementation
+    std::map<int64_t, TTabletStat> _tablet_stat_cache;
+    // last update time of tablet stat cache
+    int64_t _tablet_stat_cache_update_time_ms;
+
+    uint32_t _available_storage_medium_type_count;
+};
+
+}  // namespace doris
+
+#endif // DORIS_BE_SRC_OLAP_TABLET_MANAGER_H
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 336703bf..d5003b2b 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -76,7 +76,7 @@ OLAPStatus TxnManager::add_txn(
 
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash);
-    WriteLock wrlock(&_transaction_tablet_map_lock);
+    WriteLock wrlock(&_txn_map_lock);
     auto it = _transaction_tablet_map.find(key);
     if (it != _transaction_tablet_map.end()) {
         auto load_info = it->second.find(tablet_info);
@@ -107,7 +107,7 @@ OLAPStatus TxnManager::delete_txn(
 
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash);
-    WriteLock wrlock(&_transaction_tablet_map_lock);
+    WriteLock wrlock(&_txn_map_lock);
 
     auto it = _transaction_tablet_map.find(key);
     if (it != _transaction_tablet_map.end()) {
@@ -144,7 +144,7 @@ void TxnManager::get_tablet_related_txns(TabletSharedPtr 
tablet, int64_t* partit
     }
 
     TabletInfo tablet_info(tablet->tablet_id(), tablet->schema_hash());
-    ReadLock rdlock(&_transaction_tablet_map_lock);
+    ReadLock rdlock(&_txn_map_lock);
     for (auto& it : _transaction_tablet_map) {
         if (it.second.find(tablet_info) != it.second.end()) {
             *partition_id = it.first.first;
@@ -163,16 +163,16 @@ void TxnManager::get_txn_related_tablets(const 
TTransactionId transaction_id,
     // get tablets in this transaction
     pair<int64_t, int64_t> key(partition_id, transaction_id);
 
-    _transaction_tablet_map_lock.rdlock();
+    _txn_map_lock.rdlock();
     auto it = _transaction_tablet_map.find(key);
     if (it == _transaction_tablet_map.end()) {
         OLAP_LOG_WARNING("could not find tablet for [partition_id=%ld 
transaction_id=%ld]",
                             partition_id, transaction_id);
-        _transaction_tablet_map_lock.unlock();
+        _txn_map_lock.unlock();
         return;
     }
     std::map<TabletInfo, std::vector<PUniqueId>> load_info_map = it->second;
-    _transaction_tablet_map_lock.unlock();
+    _txn_map_lock.unlock();
 
     // each tablet
     for (auto& load_info : load_info_map) {
@@ -188,11 +188,11 @@ bool TxnManager::has_txn(TPartitionId partition_id, 
TTransactionId transaction_i
     pair<int64_t, int64_t> key(partition_id, transaction_id);
     TabletInfo tablet_info(tablet_id, schema_hash);
 
-    _transaction_tablet_map_lock.rdlock();
+    _txn_map_lock.rdlock();
     auto it = _transaction_tablet_map.find(key);
     bool found = it != _transaction_tablet_map.end()
                  && it->second.find(tablet_info) != it->second.end();
-    _transaction_tablet_map_lock.unlock();
+    _txn_map_lock.unlock();
 
     return found;
 }
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index d65d4d3a..77cc3c0f 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -73,7 +73,7 @@ class TxnManager {
                          TTabletId tablet_id, SchemaHash schema_hash);
 
 private:
-    RWMutex _transaction_tablet_map_lock;
+    RWMutex _txn_map_lock;
     using TxnKey = std::pair<int64_t, int64_t>; // partition_id, 
transaction_id;
     std::map<TxnKey, std::map<TabletInfo, std::vector<PUniqueId>>> 
_transaction_tablet_map;
 };  // TxnManager
diff --git a/be/src/tools/meta_tool.cpp b/be/src/tools/meta_tool.cpp
index 35f32266..dc6cf61a 100644
--- a/be/src/tools/meta_tool.cpp
+++ b/be/src/tools/meta_tool.cpp
@@ -88,7 +88,7 @@ int main(int argc, char** argv) {
         std::cout << "new store failed" << std::endl;
         return -1;
     }
-    Status st = store->load();
+    Status st = store->init();
     if (!st.ok()) {
         std::cout << "store load failed" << std::endl;
         return -1;
diff --git a/be/test/olap/tablet_meta_manager_test.cpp 
b/be/test/olap/tablet_meta_manager_test.cpp
index c7108be6..af5a044d 100755
--- a/be/test/olap/tablet_meta_manager_test.cpp
+++ b/be/test/olap/tablet_meta_manager_test.cpp
@@ -47,7 +47,7 @@ class TabletMetaManagerTest : public testing::Test {
         ASSERT_TRUE(boost::filesystem::create_directory(root_path));
         _store = new(std::nothrow) OlapStore(root_path);
         ASSERT_NE(nullptr, _store);
-        Status st = _store->load();
+        Status st = _store->init();
         ASSERT_TRUE(st.ok());
         ASSERT_TRUE(boost::filesystem::exists("./store/meta"));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to