This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c45da40ed7 [refactor-WIP](TaskWorkerPool) add specific classes for 
ALTER_TABLE, CLONE, STORAGE_MEDIUM_MIGRATE task (#20140)
c45da40ed7 is described below

commit c45da40ed71c9eb608a1e1859d84df88395110e7
Author: bobhan1 <[email protected]>
AuthorDate: Sun May 28 19:27:08 2023 +0800

    [refactor-WIP](TaskWorkerPool) add specific classes for ALTER_TABLE, CLONE, 
STORAGE_MEDIUM_MIGRATE task (#20140)
---
 be/src/agent/agent_server.cpp     |  11 +-
 be/src/agent/task_worker_pool.cpp | 545 +++++++++++++++++++-------------------
 be/src/agent/task_worker_pool.h   |  38 ++-
 3 files changed, 318 insertions(+), 276 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9bbe4d862e..a7f2f9aa09 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -104,11 +104,16 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
                                                 
TaskWorkerPool::ThreadModel::MULTI_THREADS,
                                                 
PushTaskPool::PushWokerType::DELETE));
     _push_delete_workers->start();
+    _alter_tablet_workers.reset(
+            new AlterTableTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _alter_tablet_workers->start();
+    _clone_workers.reset(new CloneTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _clone_workers->start();
+    _storage_medium_migrate_workers.reset(
+            new StorageMediumMigrateTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _storage_medium_migrate_workers->start();
 #endif
-    CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
     CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers);
-    CREATE_AND_START_POOL(CLONE, _clone_workers);
-    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE, 
_storage_medium_migrate_workers);
     CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
     CREATE_AND_START_POOL(UPLOAD, _upload_workers);
     CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 61bdec6862..c3ec02f08d 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -153,21 +153,14 @@ void TaskWorkerPool::start() {
     case TaskWorkerType::DELETE:
         break;
     case TaskWorkerType::ALTER_TABLE:
-        _worker_count = config::alter_tablet_worker_count;
-        _cb = 
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
         break;
     case TaskWorkerType::ALTER_INVERTED_INDEX:
         _worker_count = config::alter_inverted_index_worker_count;
         _cb = 
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::CLONE:
-        _worker_count = config::clone_worker_count;
-        _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
-        _worker_count = config::storage_medium_migrate_count;
-        _cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
-                              this);
         break;
     case TaskWorkerType::CHECK_CONSISTENCY:
         _worker_count = config::check_consistency_worker_count;
@@ -395,114 +388,6 @@ void 
TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
     }
 }
 
-void TaskWorkerPool::_alter_tablet_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            _tasks.pop_front();
-        }
-        int64_t signature = agent_task_req.signature;
-        LOG(INFO) << "get alter table task, signature: " << 
agent_task_req.signature;
-        bool is_task_timeout = false;
-        if (agent_task_req.__isset.recv_time) {
-            int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
-            if (time_elapsed > config::report_task_interval_seconds * 20) {
-                LOG(INFO) << "task elapsed " << time_elapsed
-                          << " seconds since it is inserted to queue, it is 
timeout";
-                is_task_timeout = true;
-            }
-        }
-        if (!is_task_timeout) {
-            TFinishTaskRequest finish_task_request;
-            TTaskType::type task_type = agent_task_req.task_type;
-            switch (task_type) {
-            case TTaskType::ALTER:
-                _alter_tablet(agent_task_req, signature, task_type, 
&finish_task_request);
-                break;
-            default:
-                // pass
-                break;
-            }
-            _finish_task(finish_task_request);
-        }
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, 
int64_t signature,
-                                   const TTaskType::type task_type,
-                                   TFinishTaskRequest* finish_task_request) {
-    Status status;
-
-    string process_name;
-    switch (task_type) {
-    case TTaskType::ALTER:
-        process_name = "alter tablet";
-        break;
-    default:
-        std::string task_name;
-        EnumToString(TTaskType, task_type, task_name);
-        LOG(WARNING) << "schema change type invalid. type: " << task_name
-                     << ", signature: " << signature;
-        status = Status::NotSupported("Schema change type invalid");
-        break;
-    }
-
-    // Check last schema change status, if failed delete tablet file
-    // Do not need to adjust delete success or not
-    // Because if delete failed create rollup will failed
-    TTabletId new_tablet_id = 0;
-    TSchemaHash new_schema_hash = 0;
-    if (status.ok()) {
-        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
-        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
-        EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
-        status = _env->storage_engine()->execute_task(&engine_task);
-    }
-
-    if (status.ok()) {
-        ++_s_report_version;
-    }
-
-    // Return result to fe
-    finish_task_request->__set_backend(BackendOptions::get_local_backend());
-    finish_task_request->__set_report_version(_s_report_version);
-    finish_task_request->__set_task_type(task_type);
-    finish_task_request->__set_signature(signature);
-
-    std::vector<TTabletInfo> finish_tablet_infos;
-    if (status.ok()) {
-        TTabletInfo tablet_info;
-        status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, 
&tablet_info);
-        if (status.ok()) {
-            finish_tablet_infos.push_back(tablet_info);
-        }
-    }
-
-    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
-        LOG_WARNING("failed to {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id)
-                .error(status);
-    } else {
-        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
-        LOG_INFO("successfully {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id);
-    }
-    finish_task_request->__set_task_status(status.to_thrift());
-}
-
 void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
@@ -592,157 +477,6 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
     }
 }
 
-void TaskWorkerPool::_clone_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TCloneReq clone_req;
-
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            clone_req = agent_task_req.clone_req;
-            _tasks.pop_front();
-        }
-
-        DorisMetrics::instance()->clone_requests_total->increment(1);
-        LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
-
-        std::vector<TTabletInfo> tablet_infos;
-        EngineCloneTask engine_task(clone_req, _master_info, 
agent_task_req.signature,
-                                    &tablet_infos);
-        auto status = _env->storage_engine()->execute_task(&engine_task);
-        // Return result to fe
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-
-        if (!status.ok()) {
-            DorisMetrics::instance()->clone_requests_failed->increment(1);
-            LOG_WARNING("failed to clone tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", clone_req.tablet_id)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully clone tablet")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", clone_req.tablet_id);
-            finish_task_request.__set_finish_tablet_infos(tablet_infos);
-        }
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TStorageMediumMigrateReq storage_medium_migrate_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            storage_medium_migrate_req = 
agent_task_req.storage_medium_migrate_req;
-            _tasks.pop_front();
-        }
-
-        // check request and get info
-        TabletSharedPtr tablet;
-        DataDir* dest_store = nullptr;
-
-        auto status = _check_migrate_request(storage_medium_migrate_req, 
tablet, &dest_store);
-        if (status.ok()) {
-            EngineStorageMigrationTask engine_task(tablet, dest_store);
-            status = _env->storage_engine()->execute_task(&engine_task);
-        }
-        if (!status.ok()) {
-            LOG_WARNING("failed to migrate storage medium")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", storage_medium_migrate_req.tablet_id)
-                    .error(status);
-        } else {
-            LOG_INFO("successfully migrate storage medium")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", storage_medium_migrate_req.tablet_id);
-        }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(status.to_thrift());
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-Status TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq& 
req,
-                                              TabletSharedPtr& tablet, 
DataDir** dest_store) {
-    int64_t tablet_id = req.tablet_id;
-    tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
-    if (tablet == nullptr) {
-        return Status::InternalError("could not find tablet {}", tablet_id);
-    }
-
-    if (req.__isset.data_dir) {
-        // request specify the data dir
-        *dest_store = StorageEngine::instance()->get_store(req.data_dir);
-        if (*dest_store == nullptr) {
-            return Status::InternalError("could not find data dir {}", 
req.data_dir);
-        }
-    } else {
-        // this is a storage medium
-        // get data dir by storage medium
-
-        // judge case when no need to migrate
-        uint32_t count = 
StorageEngine::instance()->available_storage_medium_type_count();
-        if (count <= 1) {
-            return Status::InternalError("available storage medium type count 
is less than 1");
-        }
-        // check current tablet storage medium
-        TStorageMedium::type storage_medium = req.storage_medium;
-        TStorageMedium::type src_storage_medium = 
tablet->data_dir()->storage_medium();
-        if (src_storage_medium == storage_medium) {
-            return Status::InternalError("tablet is already on specified 
storage medium {}",
-                                         storage_medium);
-        }
-        // get a random store of specified storage medium
-        auto stores = 
StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
-        if (stores.empty()) {
-            return Status::InternalError("failed to get root path for create 
tablet");
-        }
-
-        *dest_store = stores[0];
-    }
-    if (tablet->data_dir()->path() == (*dest_store)->path()) {
-        return Status::InternalError("tablet is already on specified path {}",
-                                     tablet->data_dir()->path());
-    }
-
-    // check local disk capacity
-    int64_t tablet_size = tablet->tablet_local_size();
-    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
-        return Status::InternalError("reach the capacity limit of path {}, 
tablet_size={}",
-                                     (*dest_store)->path(), tablet_size);
-    }
-
-    return Status::OK();
-}
-
 void TaskWorkerPool::_check_consistency_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
@@ -1538,7 +1272,7 @@ void 
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
             agent_task_req = _tasks.front();
             _tasks.pop_front();
         }
-        TDropTabletReq drop_tablet_req = agent_task_req.drop_tablet_req;
+        const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
         Status status;
         TabletSharedPtr dropped_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 drop_tablet_req.tablet_id, false);
@@ -1846,4 +1580,281 @@ void 
ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback()
     }
 }
 
+AlterTableTaskPool::AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model)
+        : TaskWorkerPool(TaskWorkerType::ALTER_TABLE, env, 
*env->master_info(), thread_model) {
+    _worker_count = config::alter_tablet_worker_count;
+    _cb = [this]() { _alter_tablet_worker_thread_callback(); };
+}
+
+void AlterTableTaskPool::_alter_tablet_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        int64_t signature = agent_task_req.signature;
+        LOG(INFO) << "get alter table task, signature: " << signature;
+        bool is_task_timeout = false;
+        if (agent_task_req.__isset.recv_time) {
+            int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
+            if (time_elapsed > config::report_task_interval_seconds * 20) {
+                LOG(INFO) << "task elapsed " << time_elapsed
+                          << " seconds since it is inserted to queue, it is 
timeout";
+                is_task_timeout = true;
+            }
+        }
+        if (!is_task_timeout) {
+            TFinishTaskRequest finish_task_request;
+            TTaskType::type task_type = agent_task_req.task_type;
+            switch (task_type) {
+            case TTaskType::ALTER:
+                _alter_tablet(agent_task_req, signature, task_type, 
&finish_task_request);
+                break;
+            default:
+                // pass
+                break;
+            }
+            _finish_task(finish_task_request);
+        }
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& 
agent_task_req, int64_t signature,
+                                       const TTaskType::type task_type,
+                                       TFinishTaskRequest* 
finish_task_request) {
+    Status status;
+
+    string process_name;
+    switch (task_type) {
+    case TTaskType::ALTER:
+        process_name = "alter tablet";
+        break;
+    default:
+        std::string task_name;
+        EnumToString(TTaskType, task_type, task_name);
+        LOG(WARNING) << "schema change type invalid. type: " << task_name
+                     << ", signature: " << signature;
+        status = Status::NotSupported("Schema change type invalid");
+        break;
+    }
+
+    // Check last schema change status, if failed delete tablet file
+    // Do not need to adjust delete success or not
+    // Because if delete failed create rollup will failed
+    TTabletId new_tablet_id = 0;
+    TSchemaHash new_schema_hash = 0;
+    if (status.ok()) {
+        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
+        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
+        EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
+        status = _env->storage_engine()->execute_task(&engine_task);
+    }
+
+    if (status.ok()) {
+        ++_s_report_version;
+    }
+
+    // Return result to fe
+    finish_task_request->__set_backend(BackendOptions::get_local_backend());
+    finish_task_request->__set_report_version(_s_report_version);
+    finish_task_request->__set_task_type(task_type);
+    finish_task_request->__set_signature(signature);
+
+    std::vector<TTabletInfo> finish_tablet_infos;
+    if (status.ok()) {
+        TTabletInfo tablet_info;
+        status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, 
&tablet_info);
+        if (status.ok()) {
+            finish_tablet_infos.push_back(tablet_info);
+        }
+    }
+
+    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
+        LOG_WARNING("failed to {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id)
+                .error(status);
+    } else {
+        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
+        LOG_INFO("successfully {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id);
+    }
+    finish_task_request->__set_task_status(status.to_thrift());
+}
+
+CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
+        : TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), 
thread_model) {
+    _worker_count = config::clone_worker_count;
+    _cb = [this]() { _clone_worker_thread_callback(); };
+}
+
+void CloneTaskPool::_clone_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        const TCloneReq& clone_req = agent_task_req.clone_req;
+
+        DorisMetrics::instance()->clone_requests_total->increment(1);
+        LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
+
+        std::vector<TTabletInfo> tablet_infos;
+        EngineCloneTask engine_task(clone_req, _master_info, 
agent_task_req.signature,
+                                    &tablet_infos);
+        auto status = _env->storage_engine()->execute_task(&engine_task);
+        // Return result to fe
+        TFinishTaskRequest finish_task_request;
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+        finish_task_request.__set_task_status(status.to_thrift());
+
+        if (!status.ok()) {
+            DorisMetrics::instance()->clone_requests_failed->increment(1);
+            LOG_WARNING("failed to clone tablet")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", clone_req.tablet_id)
+                    .error(status);
+        } else {
+            LOG_INFO("successfully clone tablet")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", clone_req.tablet_id);
+            finish_task_request.__set_finish_tablet_infos(tablet_infos);
+        }
+
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+StorageMediumMigrateTaskPool::StorageMediumMigrateTaskPool(ExecEnv* env, 
ThreadModel thread_model)
+        : TaskWorkerPool(TaskWorkerType::STORAGE_MEDIUM_MIGRATE, env, 
*env->master_info(),
+                         thread_model) {
+    _worker_count = config::storage_medium_migrate_count;
+    _cb = [this]() { _storage_medium_migrate_worker_thread_callback(); };
+}
+
+void 
StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        const TStorageMediumMigrateReq& storage_medium_migrate_req =
+                agent_task_req.storage_medium_migrate_req;
+
+        // check request and get info
+        TabletSharedPtr tablet;
+        DataDir* dest_store = nullptr;
+
+        auto status = _check_migrate_request(storage_medium_migrate_req, 
tablet, &dest_store);
+        if (status.ok()) {
+            EngineStorageMigrationTask engine_task(tablet, dest_store);
+            status = _env->storage_engine()->execute_task(&engine_task);
+        }
+        if (!status.ok()) {
+            LOG_WARNING("failed to migrate storage medium")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", storage_medium_migrate_req.tablet_id)
+                    .error(status);
+        } else {
+            LOG_INFO("successfully migrate storage medium")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", storage_medium_migrate_req.tablet_id);
+        }
+
+        TFinishTaskRequest finish_task_request;
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+        finish_task_request.__set_task_status(status.to_thrift());
+
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+Status StorageMediumMigrateTaskPool::_check_migrate_request(const 
TStorageMediumMigrateReq& req,
+                                                            TabletSharedPtr& 
tablet,
+                                                            DataDir** 
dest_store) {
+    int64_t tablet_id = req.tablet_id;
+    tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+    if (tablet == nullptr) {
+        return Status::InternalError("could not find tablet {}", tablet_id);
+    }
+
+    if (req.__isset.data_dir) {
+        // request specify the data dir
+        *dest_store = StorageEngine::instance()->get_store(req.data_dir);
+        if (*dest_store == nullptr) {
+            return Status::InternalError("could not find data dir {}", 
req.data_dir);
+        }
+    } else {
+        // this is a storage medium
+        // get data dir by storage medium
+
+        // judge case when no need to migrate
+        uint32_t count = 
StorageEngine::instance()->available_storage_medium_type_count();
+        if (count <= 1) {
+            return Status::InternalError("available storage medium type count 
is less than 1");
+        }
+        // check current tablet storage medium
+        TStorageMedium::type storage_medium = req.storage_medium;
+        TStorageMedium::type src_storage_medium = 
tablet->data_dir()->storage_medium();
+        if (src_storage_medium == storage_medium) {
+            return Status::InternalError("tablet is already on specified 
storage medium {}",
+                                         storage_medium);
+        }
+        // get a random store of specified storage medium
+        auto stores = 
StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
+        if (stores.empty()) {
+            return Status::InternalError("failed to get root path for create 
tablet");
+        }
+
+        *dest_store = stores[0];
+    }
+    if (tablet->data_dir()->path() == (*dest_store)->path()) {
+        return Status::InternalError("tablet is already on specified path {}",
+                                     tablet->data_dir()->path());
+    }
+
+    // check local disk capacity
+    int64_t tablet_size = tablet->tablet_local_size();
+    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
+        return Status::InternalError("reach the capacity limit of path {}, 
tablet_size={}",
+                                     (*dest_store)->path(), tablet_size);
+    }
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index d24ba647cf..a33704480d 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -183,10 +183,7 @@ protected:
     void _remove_task_info(const TTaskType::type task_type, int64_t signature);
     void _finish_task(const TFinishTaskRequest& finish_task_request);
 
-    void _alter_tablet_worker_thread_callback();
     void _alter_inverted_index_worker_thread_callback();
-    void _clone_worker_thread_callback();
-    void _storage_medium_migrate_worker_thread_callback();
     void _check_consistency_worker_thread_callback();
     void _report_task_worker_thread_callback();
     void _report_disk_state_worker_thread_callback();
@@ -203,6 +200,10 @@ protected:
 
     void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t 
signature,
                        const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request);
+    void _alter_inverted_index(const TAgentTaskRequest& 
alter_inverted_index_request,
+                               int64_t signature, const TTaskType::type 
task_type,
+                               TFinishTaskRequest* finish_task_request);
+
     void _handle_report(const TReportRequest& request, ReportType type);
 
     Status _get_tablet_info(const TTabletId tablet_id, const TSchemaHash 
schema_hash,
@@ -211,9 +212,6 @@ protected:
     Status _move_dir(const TTabletId tablet_id, const std::string& src, 
int64_t job_id,
                      bool overwrite);
 
-    Status _check_migrate_request(const TStorageMediumMigrateReq& req, 
TabletSharedPtr& tablet,
-                                  DataDir** dest_store);
-
     // random sleep 1~second seconds
     void _random_sleep(int second);
 
@@ -298,4 +296,32 @@ public:
     DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool);
 };
 
+class AlterTableTaskPool : public TaskWorkerPool {
+public:
+    AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model);
+    void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t 
signature,
+                       const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request);
+    void _alter_tablet_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(AlterTableTaskPool);
+};
+
+class CloneTaskPool : public TaskWorkerPool {
+public:
+    CloneTaskPool(ExecEnv* env, ThreadModel thread_model);
+    void _clone_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(CloneTaskPool);
+};
+
+class StorageMediumMigrateTaskPool : public TaskWorkerPool {
+public:
+    StorageMediumMigrateTaskPool(ExecEnv* env, ThreadModel thread_model);
+    Status _check_migrate_request(const TStorageMediumMigrateReq& req, 
TabletSharedPtr& tablet,
+                                  DataDir** dest_store);
+    void _storage_medium_migrate_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(StorageMediumMigrateTaskPool);
+};
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to