This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c45da40ed7 [refactor-WIP](TaskWorkerPool) add specific classes for
ALTER_TABLE, CLONE, STORAGE_MEDIUM_MIGRATE task (#20140)
c45da40ed7 is described below
commit c45da40ed71c9eb608a1e1859d84df88395110e7
Author: bobhan1 <[email protected]>
AuthorDate: Sun May 28 19:27:08 2023 +0800
[refactor-WIP](TaskWorkerPool) add specific classes for ALTER_TABLE, CLONE,
STORAGE_MEDIUM_MIGRATE task (#20140)
---
be/src/agent/agent_server.cpp | 11 +-
be/src/agent/task_worker_pool.cpp | 545 +++++++++++++++++++-------------------
be/src/agent/task_worker_pool.h | 38 ++-
3 files changed, 318 insertions(+), 276 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 9bbe4d862e..a7f2f9aa09 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -104,11 +104,16 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
TaskWorkerPool::ThreadModel::MULTI_THREADS,
PushTaskPool::PushWokerType::DELETE));
_push_delete_workers->start();
+ _alter_tablet_workers.reset(
+ new AlterTableTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _alter_tablet_workers->start();
+ _clone_workers.reset(new CloneTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _clone_workers->start();
+ _storage_medium_migrate_workers.reset(
+ new StorageMediumMigrateTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _storage_medium_migrate_workers->start();
#endif
- CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers);
- CREATE_AND_START_POOL(CLONE, _clone_workers);
- CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE,
_storage_medium_migrate_workers);
CREATE_AND_START_POOL(CHECK_CONSISTENCY, _check_consistency_workers);
CREATE_AND_START_POOL(UPLOAD, _upload_workers);
CREATE_AND_START_POOL(DOWNLOAD, _download_workers);
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 61bdec6862..c3ec02f08d 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -153,21 +153,14 @@ void TaskWorkerPool::start() {
case TaskWorkerType::DELETE:
break;
case TaskWorkerType::ALTER_TABLE:
- _worker_count = config::alter_tablet_worker_count;
- _cb =
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::ALTER_INVERTED_INDEX:
_worker_count = config::alter_inverted_index_worker_count;
_cb =
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback,
this);
break;
case TaskWorkerType::CLONE:
- _worker_count = config::clone_worker_count;
- _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback,
this);
break;
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
- _worker_count = config::storage_medium_migrate_count;
- _cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
- this);
break;
case TaskWorkerType::CHECK_CONSISTENCY:
_worker_count = config::check_consistency_worker_count;
@@ -395,114 +388,6 @@ void
TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
}
}
-void TaskWorkerPool::_alter_tablet_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- _tasks.pop_front();
- }
- int64_t signature = agent_task_req.signature;
- LOG(INFO) << "get alter table task, signature: " <<
agent_task_req.signature;
- bool is_task_timeout = false;
- if (agent_task_req.__isset.recv_time) {
- int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
- if (time_elapsed > config::report_task_interval_seconds * 20) {
- LOG(INFO) << "task elapsed " << time_elapsed
- << " seconds since it is inserted to queue, it is
timeout";
- is_task_timeout = true;
- }
- }
- if (!is_task_timeout) {
- TFinishTaskRequest finish_task_request;
- TTaskType::type task_type = agent_task_req.task_type;
- switch (task_type) {
- case TTaskType::ALTER:
- _alter_tablet(agent_task_req, signature, task_type,
&finish_task_request);
- break;
- default:
- // pass
- break;
- }
- _finish_task(finish_task_request);
- }
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
-void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req,
int64_t signature,
- const TTaskType::type task_type,
- TFinishTaskRequest* finish_task_request) {
- Status status;
-
- string process_name;
- switch (task_type) {
- case TTaskType::ALTER:
- process_name = "alter tablet";
- break;
- default:
- std::string task_name;
- EnumToString(TTaskType, task_type, task_name);
- LOG(WARNING) << "schema change type invalid. type: " << task_name
- << ", signature: " << signature;
- status = Status::NotSupported("Schema change type invalid");
- break;
- }
-
- // Check last schema change status, if failed delete tablet file
- // Do not need to adjust delete success or not
- // Because if delete failed create rollup will failed
- TTabletId new_tablet_id = 0;
- TSchemaHash new_schema_hash = 0;
- if (status.ok()) {
- new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
- new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
- EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
- status = _env->storage_engine()->execute_task(&engine_task);
- }
-
- if (status.ok()) {
- ++_s_report_version;
- }
-
- // Return result to fe
- finish_task_request->__set_backend(BackendOptions::get_local_backend());
- finish_task_request->__set_report_version(_s_report_version);
- finish_task_request->__set_task_type(task_type);
- finish_task_request->__set_signature(signature);
-
- std::vector<TTabletInfo> finish_tablet_infos;
- if (status.ok()) {
- TTabletInfo tablet_info;
- status = _get_tablet_info(new_tablet_id, new_schema_hash, signature,
&tablet_info);
- if (status.ok()) {
- finish_tablet_infos.push_back(tablet_info);
- }
- }
-
- if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
- LOG_WARNING("failed to {}", process_name)
- .tag("signature", agent_task_req.signature)
- .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
- .tag("new_tablet_id", new_tablet_id)
- .error(status);
- } else {
- finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
- LOG_INFO("successfully {}", process_name)
- .tag("signature", agent_task_req.signature)
- .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
- .tag("new_tablet_id", new_tablet_id);
- }
- finish_task_request->__set_task_status(status.to_thrift());
-}
-
void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
@@ -592,157 +477,6 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
}
}
-void TaskWorkerPool::_clone_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TCloneReq clone_req;
-
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- clone_req = agent_task_req.clone_req;
- _tasks.pop_front();
- }
-
- DorisMetrics::instance()->clone_requests_total->increment(1);
- LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
-
- std::vector<TTabletInfo> tablet_infos;
- EngineCloneTask engine_task(clone_req, _master_info,
agent_task_req.signature,
- &tablet_infos);
- auto status = _env->storage_engine()->execute_task(&engine_task);
- // Return result to fe
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- finish_task_request.__set_task_status(status.to_thrift());
-
- if (!status.ok()) {
- DorisMetrics::instance()->clone_requests_failed->increment(1);
- LOG_WARNING("failed to clone tablet")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", clone_req.tablet_id)
- .error(status);
- } else {
- LOG_INFO("successfully clone tablet")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", clone_req.tablet_id);
- finish_task_request.__set_finish_tablet_infos(tablet_infos);
- }
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
-void TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TStorageMediumMigrateReq storage_medium_migrate_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- storage_medium_migrate_req =
agent_task_req.storage_medium_migrate_req;
- _tasks.pop_front();
- }
-
- // check request and get info
- TabletSharedPtr tablet;
- DataDir* dest_store = nullptr;
-
- auto status = _check_migrate_request(storage_medium_migrate_req,
tablet, &dest_store);
- if (status.ok()) {
- EngineStorageMigrationTask engine_task(tablet, dest_store);
- status = _env->storage_engine()->execute_task(&engine_task);
- }
- if (!status.ok()) {
- LOG_WARNING("failed to migrate storage medium")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", storage_medium_migrate_req.tablet_id)
- .error(status);
- } else {
- LOG_INFO("successfully migrate storage medium")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", storage_medium_migrate_req.tablet_id);
- }
-
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- finish_task_request.__set_task_status(status.to_thrift());
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
-Status TaskWorkerPool::_check_migrate_request(const TStorageMediumMigrateReq&
req,
- TabletSharedPtr& tablet,
DataDir** dest_store) {
- int64_t tablet_id = req.tablet_id;
- tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
- if (tablet == nullptr) {
- return Status::InternalError("could not find tablet {}", tablet_id);
- }
-
- if (req.__isset.data_dir) {
- // request specify the data dir
- *dest_store = StorageEngine::instance()->get_store(req.data_dir);
- if (*dest_store == nullptr) {
- return Status::InternalError("could not find data dir {}",
req.data_dir);
- }
- } else {
- // this is a storage medium
- // get data dir by storage medium
-
- // judge case when no need to migrate
- uint32_t count =
StorageEngine::instance()->available_storage_medium_type_count();
- if (count <= 1) {
- return Status::InternalError("available storage medium type count
is less than 1");
- }
- // check current tablet storage medium
- TStorageMedium::type storage_medium = req.storage_medium;
- TStorageMedium::type src_storage_medium =
tablet->data_dir()->storage_medium();
- if (src_storage_medium == storage_medium) {
- return Status::InternalError("tablet is already on specified
storage medium {}",
- storage_medium);
- }
- // get a random store of specified storage medium
- auto stores =
StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
- if (stores.empty()) {
- return Status::InternalError("failed to get root path for create
tablet");
- }
-
- *dest_store = stores[0];
- }
- if (tablet->data_dir()->path() == (*dest_store)->path()) {
- return Status::InternalError("tablet is already on specified path {}",
- tablet->data_dir()->path());
- }
-
- // check local disk capacity
- int64_t tablet_size = tablet->tablet_local_size();
- if ((*dest_store)->reach_capacity_limit(tablet_size)) {
- return Status::InternalError("reach the capacity limit of path {},
tablet_size={}",
- (*dest_store)->path(), tablet_size);
- }
-
- return Status::OK();
-}
-
void TaskWorkerPool::_check_consistency_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
@@ -1538,7 +1272,7 @@ void
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
agent_task_req = _tasks.front();
_tasks.pop_front();
}
- TDropTabletReq drop_tablet_req = agent_task_req.drop_tablet_req;
+ const TDropTabletReq& drop_tablet_req = agent_task_req.drop_tablet_req;
Status status;
TabletSharedPtr dropped_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, false);
@@ -1846,4 +1580,281 @@ void
ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback()
}
}
+AlterTableTaskPool::AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model)
+ : TaskWorkerPool(TaskWorkerType::ALTER_TABLE, env,
*env->master_info(), thread_model) {
+ _worker_count = config::alter_tablet_worker_count;
+ _cb = [this]() { _alter_tablet_worker_thread_callback(); };
+}
+
+void AlterTableTaskPool::_alter_tablet_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ int64_t signature = agent_task_req.signature;
+ LOG(INFO) << "get alter table task, signature: " << signature;
+ bool is_task_timeout = false;
+ if (agent_task_req.__isset.recv_time) {
+ int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
+ if (time_elapsed > config::report_task_interval_seconds * 20) {
+ LOG(INFO) << "task elapsed " << time_elapsed
+ << " seconds since it is inserted to queue, it is
timeout";
+ is_task_timeout = true;
+ }
+ }
+ if (!is_task_timeout) {
+ TFinishTaskRequest finish_task_request;
+ TTaskType::type task_type = agent_task_req.task_type;
+ switch (task_type) {
+ case TTaskType::ALTER:
+ _alter_tablet(agent_task_req, signature, task_type,
&finish_task_request);
+ break;
+ default:
+ // pass
+ break;
+ }
+ _finish_task(finish_task_request);
+ }
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest&
agent_task_req, int64_t signature,
+ const TTaskType::type task_type,
+ TFinishTaskRequest*
finish_task_request) {
+ Status status;
+
+ string process_name;
+ switch (task_type) {
+ case TTaskType::ALTER:
+ process_name = "alter tablet";
+ break;
+ default:
+ std::string task_name;
+ EnumToString(TTaskType, task_type, task_name);
+ LOG(WARNING) << "schema change type invalid. type: " << task_name
+ << ", signature: " << signature;
+ status = Status::NotSupported("Schema change type invalid");
+ break;
+ }
+
+ // Check last schema change status, if failed delete tablet file
+ // Do not need to adjust delete success or not
+ // Because if delete failed create rollup will failed
+ TTabletId new_tablet_id = 0;
+ TSchemaHash new_schema_hash = 0;
+ if (status.ok()) {
+ new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
+ new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
+ EngineAlterTabletTask engine_task(agent_task_req.alter_tablet_req_v2);
+ status = _env->storage_engine()->execute_task(&engine_task);
+ }
+
+ if (status.ok()) {
+ ++_s_report_version;
+ }
+
+ // Return result to fe
+ finish_task_request->__set_backend(BackendOptions::get_local_backend());
+ finish_task_request->__set_report_version(_s_report_version);
+ finish_task_request->__set_task_type(task_type);
+ finish_task_request->__set_signature(signature);
+
+ std::vector<TTabletInfo> finish_tablet_infos;
+ if (status.ok()) {
+ TTabletInfo tablet_info;
+ status = _get_tablet_info(new_tablet_id, new_schema_hash, signature,
&tablet_info);
+ if (status.ok()) {
+ finish_tablet_infos.push_back(tablet_info);
+ }
+ }
+
+ if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
+ LOG_WARNING("failed to {}", process_name)
+ .tag("signature", agent_task_req.signature)
+ .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+ .tag("new_tablet_id", new_tablet_id)
+ .error(status);
+ } else {
+ finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
+ LOG_INFO("successfully {}", process_name)
+ .tag("signature", agent_task_req.signature)
+ .tag("base_tablet_id",
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+ .tag("new_tablet_id", new_tablet_id);
+ }
+ finish_task_request->__set_task_status(status.to_thrift());
+}
+
+CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
+ : TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(),
thread_model) {
+ _worker_count = config::clone_worker_count;
+ _cb = [this]() { _clone_worker_thread_callback(); };
+}
+
+void CloneTaskPool::_clone_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ const TCloneReq& clone_req = agent_task_req.clone_req;
+
+ DorisMetrics::instance()->clone_requests_total->increment(1);
+ LOG(INFO) << "get clone task. signature=" << agent_task_req.signature;
+
+ std::vector<TTabletInfo> tablet_infos;
+ EngineCloneTask engine_task(clone_req, _master_info,
agent_task_req.signature,
+ &tablet_infos);
+ auto status = _env->storage_engine()->execute_task(&engine_task);
+ // Return result to fe
+ TFinishTaskRequest finish_task_request;
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+ finish_task_request.__set_task_status(status.to_thrift());
+
+ if (!status.ok()) {
+ DorisMetrics::instance()->clone_requests_failed->increment(1);
+ LOG_WARNING("failed to clone tablet")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", clone_req.tablet_id)
+ .error(status);
+ } else {
+ LOG_INFO("successfully clone tablet")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", clone_req.tablet_id);
+ finish_task_request.__set_finish_tablet_infos(tablet_infos);
+ }
+
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+StorageMediumMigrateTaskPool::StorageMediumMigrateTaskPool(ExecEnv* env,
ThreadModel thread_model)
+ : TaskWorkerPool(TaskWorkerType::STORAGE_MEDIUM_MIGRATE, env,
*env->master_info(),
+ thread_model) {
+ _worker_count = config::storage_medium_migrate_count;
+ _cb = [this]() { _storage_medium_migrate_worker_thread_callback(); };
+}
+
+void
StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ const TStorageMediumMigrateReq& storage_medium_migrate_req =
+ agent_task_req.storage_medium_migrate_req;
+
+ // check request and get info
+ TabletSharedPtr tablet;
+ DataDir* dest_store = nullptr;
+
+ auto status = _check_migrate_request(storage_medium_migrate_req,
tablet, &dest_store);
+ if (status.ok()) {
+ EngineStorageMigrationTask engine_task(tablet, dest_store);
+ status = _env->storage_engine()->execute_task(&engine_task);
+ }
+ if (!status.ok()) {
+ LOG_WARNING("failed to migrate storage medium")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", storage_medium_migrate_req.tablet_id)
+ .error(status);
+ } else {
+ LOG_INFO("successfully migrate storage medium")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", storage_medium_migrate_req.tablet_id);
+ }
+
+ TFinishTaskRequest finish_task_request;
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+ finish_task_request.__set_task_status(status.to_thrift());
+
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+Status StorageMediumMigrateTaskPool::_check_migrate_request(const
TStorageMediumMigrateReq& req,
+ TabletSharedPtr&
tablet,
+ DataDir**
dest_store) {
+ int64_t tablet_id = req.tablet_id;
+ tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ if (tablet == nullptr) {
+ return Status::InternalError("could not find tablet {}", tablet_id);
+ }
+
+ if (req.__isset.data_dir) {
+ // request specify the data dir
+ *dest_store = StorageEngine::instance()->get_store(req.data_dir);
+ if (*dest_store == nullptr) {
+ return Status::InternalError("could not find data dir {}",
req.data_dir);
+ }
+ } else {
+ // this is a storage medium
+ // get data dir by storage medium
+
+ // judge case when no need to migrate
+ uint32_t count =
StorageEngine::instance()->available_storage_medium_type_count();
+ if (count <= 1) {
+ return Status::InternalError("available storage medium type count
is less than 1");
+ }
+ // check current tablet storage medium
+ TStorageMedium::type storage_medium = req.storage_medium;
+ TStorageMedium::type src_storage_medium =
tablet->data_dir()->storage_medium();
+ if (src_storage_medium == storage_medium) {
+ return Status::InternalError("tablet is already on specified
storage medium {}",
+ storage_medium);
+ }
+ // get a random store of specified storage medium
+ auto stores =
StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
+ if (stores.empty()) {
+ return Status::InternalError("failed to get root path for create
tablet");
+ }
+
+ *dest_store = stores[0];
+ }
+ if (tablet->data_dir()->path() == (*dest_store)->path()) {
+ return Status::InternalError("tablet is already on specified path {}",
+ tablet->data_dir()->path());
+ }
+
+ // check local disk capacity
+ int64_t tablet_size = tablet->tablet_local_size();
+ if ((*dest_store)->reach_capacity_limit(tablet_size)) {
+ return Status::InternalError("reach the capacity limit of path {},
tablet_size={}",
+ (*dest_store)->path(), tablet_size);
+ }
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index d24ba647cf..a33704480d 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -183,10 +183,7 @@ protected:
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
- void _alter_tablet_worker_thread_callback();
void _alter_inverted_index_worker_thread_callback();
- void _clone_worker_thread_callback();
- void _storage_medium_migrate_worker_thread_callback();
void _check_consistency_worker_thread_callback();
void _report_task_worker_thread_callback();
void _report_disk_state_worker_thread_callback();
@@ -203,6 +200,10 @@ protected:
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t
signature,
const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
+ void _alter_inverted_index(const TAgentTaskRequest&
alter_inverted_index_request,
+ int64_t signature, const TTaskType::type
task_type,
+ TFinishTaskRequest* finish_task_request);
+
void _handle_report(const TReportRequest& request, ReportType type);
Status _get_tablet_info(const TTabletId tablet_id, const TSchemaHash
schema_hash,
@@ -211,9 +212,6 @@ protected:
Status _move_dir(const TTabletId tablet_id, const std::string& src,
int64_t job_id,
bool overwrite);
- Status _check_migrate_request(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
- DataDir** dest_store);
-
// random sleep 1~second seconds
void _random_sleep(int second);
@@ -298,4 +296,32 @@ public:
DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool);
};
+class AlterTableTaskPool : public TaskWorkerPool {
+public:
+ AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model);
+ void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t
signature,
+ const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
+ void _alter_tablet_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(AlterTableTaskPool);
+};
+
+class CloneTaskPool : public TaskWorkerPool {
+public:
+ CloneTaskPool(ExecEnv* env, ThreadModel thread_model);
+ void _clone_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(CloneTaskPool);
+};
+
+class StorageMediumMigrateTaskPool : public TaskWorkerPool {
+public:
+ StorageMediumMigrateTaskPool(ExecEnv* env, ThreadModel thread_model);
+ Status _check_migrate_request(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
+ DataDir** dest_store);
+ void _storage_medium_migrate_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(StorageMediumMigrateTaskPool);
+};
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]