This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a5b590873 [refactor-WIP](TaskWorkerPool) add CreateTableTaskPool
class for CREATE_TABLE task (#19734)
6a5b590873 is described below
commit 6a5b590873312b743acabd1e0f0df4fab66e4240
Author: bobhan1 <[email protected]>
AuthorDate: Thu May 18 11:43:09 2023 +0800
[refactor-WIP](TaskWorkerPool) add CreateTableTaskPool class for
CREATE_TABLE task (#19734)
---
be/src/agent/agent_server.cpp | 4 +-
be/src/agent/task_worker_pool.cpp | 201 +++++++++++++++++++-------------------
be/src/agent/task_worker_pool.h | 14 ++-
3 files changed, 115 insertions(+), 104 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index cb84e8b2e0..174baa0323 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -82,7 +82,9 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
#define CREATE_AND_START_THREAD(type, pool_name)
#endif // BE_TEST
- CREATE_AND_START_POOL(CREATE_TABLE, _create_tablet_workers);
+ _create_tablet_workers.reset(
+ new CreateTableTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _create_tablet_workers->start();
CREATE_AND_START_POOL(DROP_TABLE, _drop_tablet_workers);
// Both PUSH and REALTIME_PUSH type use _push_workers
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index aaa4222faa..1544071ae7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -135,99 +135,98 @@ void TaskWorkerPool::start() {
if (_thread_model == ThreadModel::SINGLE_THREAD) {
_worker_count = 1;
}
- std::function<void()> cb;
switch (_task_worker_type) {
case TaskWorkerType::CREATE_TABLE:
- _worker_count = config::create_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_create_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::DROP_TABLE:
_worker_count = config::drop_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_drop_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::PUSH:
case TaskWorkerType::REALTIME_PUSH:
_worker_count =
config::push_worker_count_normal_priority +
config::push_worker_count_high_priority;
- cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
break;
case TaskWorkerType::PUBLISH_VERSION:
_worker_count = config::publish_version_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
break;
case TaskWorkerType::CLEAR_TRANSACTION_TASK:
_worker_count = config::clear_transaction_task_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
+ this);
break;
case TaskWorkerType::DELETE:
_worker_count = config::delete_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
break;
case TaskWorkerType::ALTER_TABLE:
_worker_count = config::alter_tablet_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_alter_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::ALTER_INVERTED_INDEX:
_worker_count = config::alter_inverted_index_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_alter_inverted_index_worker_thread_callback,
this);
break;
case TaskWorkerType::CLONE:
_worker_count = config::clone_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_clone_worker_thread_callback,
this);
break;
case TaskWorkerType::STORAGE_MEDIUM_MIGRATE:
_worker_count = config::storage_medium_migrate_count;
- cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_worker_thread_callback,
+ this);
break;
case TaskWorkerType::CHECK_CONSISTENCY:
_worker_count = config::check_consistency_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_check_consistency_worker_thread_callback,
this);
break;
case TaskWorkerType::REPORT_TASK:
- cb =
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_task_worker_thread_callback, this);
break;
case TaskWorkerType::REPORT_DISK_STATE:
- cb =
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_disk_state_worker_thread_callback,
this);
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
- cb =
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_report_tablet_worker_thread_callback, this);
break;
case TaskWorkerType::UPLOAD:
_worker_count = config::upload_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_upload_worker_thread_callback,
this);
break;
case TaskWorkerType::DOWNLOAD:
_worker_count = config::download_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_download_worker_thread_callback, this);
break;
case TaskWorkerType::MAKE_SNAPSHOT:
_worker_count = config::make_snapshot_worker_count;
- cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback,
this);
+ _cb = std::bind<void>(&TaskWorkerPool::_make_snapshot_thread_callback,
this);
break;
case TaskWorkerType::RELEASE_SNAPSHOT:
_worker_count = config::release_snapshot_worker_count;
- cb =
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_release_snapshot_thread_callback, this);
break;
case TaskWorkerType::MOVE:
_worker_count = 1;
- cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback, this);
+ _cb = std::bind<void>(&TaskWorkerPool::_move_dir_thread_callback,
this);
break;
case TaskWorkerType::UPDATE_TABLET_META_INFO:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_update_tablet_meta_worker_thread_callback,
this);
break;
case TaskWorkerType::SUBMIT_TABLE_COMPACTION:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
- this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
+ this);
break;
case TaskWorkerType::PUSH_STORAGE_POLICY:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_push_storage_policy_worker_thread_callback,
this);
break;
case TaskWorkerType::PUSH_COOLDOWN_CONF:
_worker_count = 1;
- cb =
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback,
this);
+ _cb =
std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback,
this);
break;
default:
// pass
@@ -242,7 +241,7 @@ void TaskWorkerPool::start() {
.build(&_thread_pool);
for (int i = 0; i < _worker_count; i++) {
- auto st = _thread_pool->submit_func(cb);
+ auto st = _thread_pool->submit_func(_cb);
CHECK(st.ok()) << st;
}
#endif
@@ -368,80 +367,6 @@ uint32_t TaskWorkerPool::_get_next_task_index(int32_t
thread_count,
return index;
}
-void TaskWorkerPool::_create_tablet_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TCreateTabletReq create_tablet_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- create_tablet_req = agent_task_req.create_tablet_req;
- _tasks.pop_front();
- }
-
- scoped_refptr<Trace> trace(new Trace);
- MonotonicStopWatch watch;
- watch.start();
- SCOPED_CLEANUP({
- if (watch.elapsed_time() / 1e9 >
config::agent_task_trace_threshold_sec) {
- LOG(WARNING) << "Trace:" << std::endl <<
trace->DumpToString(Trace::INCLUDE_ALL);
- }
- });
- ADOPT_TRACE(trace.get());
-
- DorisMetrics::instance()->create_tablet_requests_total->increment(1);
- TRACE("start to create tablet $0", create_tablet_req.tablet_id);
-
- std::vector<TTabletInfo> finish_tablet_infos;
- VLOG_NOTICE << "create tablet: " << create_tablet_req;
- Status status =
_env->storage_engine()->create_tablet(create_tablet_req);
- if (!status.ok()) {
-
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
- LOG_WARNING("failed to create tablet")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", create_tablet_req.tablet_id)
- .error(status);
- } else {
- ++_s_report_version;
- // get path hash of the created tablet
- TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
- create_tablet_req.tablet_id);
- DCHECK(tablet != nullptr);
- TTabletInfo tablet_info;
- tablet_info.tablet_id = tablet->table_id();
- tablet_info.schema_hash = tablet->schema_hash();
- tablet_info.version = create_tablet_req.version;
- // Useless but it is a required field in TTabletInfo
- tablet_info.version_hash = 0;
- tablet_info.row_count = 0;
- tablet_info.data_size = 0;
- tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
- tablet_info.__set_replica_id(tablet->replica_id());
- finish_tablet_infos.push_back(tablet_info);
- LOG_INFO("successfully create tablet")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", create_tablet_req.tablet_id);
- }
-
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_report_version(_s_report_version);
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- finish_task_request.__set_task_status(status.to_thrift());
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
@@ -1880,4 +1805,78 @@ void
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
}
}
+CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, ThreadModel
thread_model)
+ : TaskWorkerPool(TaskWorkerType::CREATE_TABLE, env,
*env->master_info(), thread_model) {
+ _worker_count = config::create_tablet_worker_count;
+ _cb = [this]() { _create_tablet_worker_thread_callback(); };
+}
+
+void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ TCreateTabletReq create_tablet_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+ agent_task_req = _tasks.front();
+ create_tablet_req = agent_task_req.create_tablet_req;
+ _tasks.pop_front();
+ }
+ scoped_refptr<Trace> trace(new Trace);
+ MonotonicStopWatch watch;
+ watch.start();
+ SCOPED_CLEANUP({
+ if (watch.elapsed_time() / 1e9 >
config::agent_task_trace_threshold_sec) {
+ LOG(WARNING) << "Trace:" << std::endl <<
trace->DumpToString(Trace::INCLUDE_ALL);
+ }
+ });
+ ADOPT_TRACE(trace.get());
+ DorisMetrics::instance()->create_tablet_requests_total->increment(1);
+ TRACE("start to create tablet $0", create_tablet_req.tablet_id);
+ std::vector<TTabletInfo> finish_tablet_infos;
+ VLOG_NOTICE << "create tablet: " << create_tablet_req;
+ Status status =
_env->storage_engine()->create_tablet(create_tablet_req);
+ if (!status.ok()) {
+
DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
+ LOG_WARNING("failed to create tablet")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", create_tablet_req.tablet_id)
+ .error(status);
+ } else {
+ ++_s_report_version;
+ // get path hash of the created tablet
+ TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ create_tablet_req.tablet_id);
+ DCHECK(tablet != nullptr);
+ TTabletInfo tablet_info;
+ tablet_info.tablet_id = tablet->table_id();
+ tablet_info.schema_hash = tablet->schema_hash();
+ tablet_info.version = create_tablet_req.version;
+ // Useless but it is a required field in TTabletInfo
+ tablet_info.version_hash = 0;
+ tablet_info.row_count = 0;
+ tablet_info.data_size = 0;
+ tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
+ tablet_info.__set_replica_id(tablet->replica_id());
+ finish_tablet_infos.push_back(tablet_info);
+ LOG_INFO("successfully create tablet")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", create_tablet_req.tablet_id);
+ }
+ TFinishTaskRequest finish_task_request;
+ finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_report_version(_s_report_version);
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+ finish_task_request.__set_task_status(status.to_thrift());
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 2f90f7b685..651a0a6873 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -178,7 +178,7 @@ public:
// notify the worker. currently for task/disk/tablet report thread
void notify_thread();
-private:
+protected:
bool _register_task_info(const TTaskType::type task_type, int64_t
signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
@@ -228,7 +228,7 @@ private:
// random sleep 1~second seconds
void _random_sleep(int second);
-private:
+protected:
std::string _name;
// Reference to the ExecEnv::_master_info
@@ -255,6 +255,7 @@ private:
// Always 1 when _thread_model is SINGLE_THREAD
uint32_t _worker_count;
TaskWorkerType _task_worker_type;
+ std::function<void()> _cb;
static std::atomic_ulong _s_report_version;
@@ -263,4 +264,13 @@ private:
DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool
+
+class CreateTableTaskPool : public TaskWorkerPool {
+public:
+ CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model);
+ void _create_tablet_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(CreateTableTaskPool);
+};
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]