This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0434c6a738 [refactor-WIP](TaskWorkerPool) add specific classes for
PUSH, PUBLIC_VERION, CLEAR_TRANSACTION tasks (#19822)
0434c6a738 is described below
commit 0434c6a7381eca16d460ae48e5dce60b5e6532bf
Author: bobhan1 <[email protected]>
AuthorDate: Sat May 27 22:47:45 2023 +0800
[refactor-WIP](TaskWorkerPool) add specific classes for PUSH,
PUBLIC_VERION, CLEAR_TRANSACTION tasks (#19822)
---
be/src/agent/agent_server.cpp | 25 +-
be/src/agent/agent_server.h | 4 +-
be/src/agent/task_worker_pool.cpp | 561 +++++++++++------------
be/src/agent/task_worker_pool.h | 33 +-
be/src/olap/task/engine_publish_version_task.cpp | 6 +-
be/src/olap/task/engine_publish_version_task.h | 2 +-
6 files changed, 325 insertions(+), 306 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 199770ba4a..9bbe4d862e 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -82,6 +82,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
#define CREATE_AND_START_THREAD(type, pool_name)
#endif // BE_TEST
+#ifndef BE_TEST
_create_tablet_workers.reset(
new CreateTableTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
_create_tablet_workers->start();
@@ -89,11 +90,21 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
new DropTableTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
_drop_tablet_workers->start();
- // Both PUSH and REALTIME_PUSH type use _push_workers
- CREATE_AND_START_POOL(PUSH, _push_workers);
- CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
- CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK,
_clear_transaction_task_workers);
- CREATE_AND_START_POOL(DELETE, _delete_workers);
+ // Both PUSH and REALTIME_PUSH type use _push_load_workers
+ _push_load_workers.reset(new PushTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+
PushTaskPool::PushWokerType::LOAD_V2));
+ _push_load_workers->start();
+ _publish_version_workers.reset(
+ new PublishVersionTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _publish_version_workers->start();
+ _clear_transaction_task_workers.reset(
+ new ClearTransactionTaskPool(exec_env,
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+ _clear_transaction_task_workers->start();
+ _push_delete_workers.reset(new PushTaskPool(exec_env,
+
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+
PushTaskPool::PushWokerType::DELETE));
+ _push_delete_workers->start();
+#endif
CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers);
CREATE_AND_START_POOL(CLONE, _clone_workers);
@@ -185,9 +196,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
break;
}
if (task.push_req.push_type == TPushType::LOAD_V2) {
- _push_workers->submit_task(task);
+ _push_load_workers->submit_task(task);
} else if (task.push_req.push_type == TPushType::DELETE) {
- _delete_workers->submit_task(task);
+ _push_delete_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(
"task(signature={}, type={}, push_type={}) has wrong
push_type", signature,
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3aebc3d1f7..3d98fd025d 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -60,10 +60,10 @@ private:
std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
- std::unique_ptr<TaskWorkerPool> _push_workers;
+ std::unique_ptr<TaskWorkerPool> _push_load_workers;
std::unique_ptr<TaskWorkerPool> _publish_version_workers;
std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
- std::unique_ptr<TaskWorkerPool> _delete_workers;
+ std::unique_ptr<TaskWorkerPool> _push_delete_workers;
std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
std::unique_ptr<TaskWorkerPool> _alter_inverted_index_workers;
std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index ea8cbc633a..ff49f04ffe 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -144,22 +144,12 @@ void TaskWorkerPool::start() {
break;
case TaskWorkerType::PUSH:
case TaskWorkerType::REALTIME_PUSH:
- _worker_count =
- config::push_worker_count_normal_priority +
config::push_worker_count_high_priority;
- _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
break;
case TaskWorkerType::PUBLISH_VERSION:
- _worker_count = config::publish_version_worker_count;
- _cb =
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
break;
case TaskWorkerType::CLEAR_TRANSACTION_TASK:
- _worker_count = config::clear_transaction_task_worker_count;
- _cb =
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
- this);
break;
case TaskWorkerType::DELETE:
- _worker_count = config::delete_worker_count;
- _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback,
this);
break;
case TaskWorkerType::ALTER_TABLE:
_worker_count = config::alter_tablet_worker_count;
@@ -341,32 +331,6 @@ void TaskWorkerPool::_finish_task(const
TFinishTaskRequest& finish_task_request)
TRACE("finish task");
}
-uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count,
- std::deque<TAgentTaskRequest>&
tasks,
- TPriority::type priority) {
- int32_t index = -1;
- std::deque<TAgentTaskRequest>::size_type task_count = tasks.size();
- for (uint32_t i = 0; i < task_count; ++i) {
- TAgentTaskRequest task = tasks[i];
- if (priority == TPriority::HIGH) {
- if (task.__isset.priority && task.priority == TPriority::HIGH) {
- index = i;
- break;
- }
- }
- }
-
- if (index == -1) {
- if (priority == TPriority::HIGH) {
- return index;
- }
-
- index = 0;
- }
-
- return index;
-}
-
void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
@@ -590,253 +554,6 @@ void TaskWorkerPool::_alter_tablet(const
TAgentTaskRequest& agent_task_req, int6
finish_task_request->__set_task_status(status.to_thrift());
}
-void TaskWorkerPool::_push_worker_thread_callback() {
- // gen high priority worker thread
- TPriority::type priority = TPriority::NORMAL;
- int32_t push_worker_count_high_priority =
config::push_worker_count_high_priority;
- static uint32_t s_worker_count = 0;
- {
- std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
- if (s_worker_count < push_worker_count_high_priority) {
- ++s_worker_count;
- priority = TPriority::HIGH;
- }
- }
-
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TPushReq push_req;
- int32_t index = 0;
- do {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- index =
_get_next_task_index(config::push_worker_count_normal_priority +
-
config::push_worker_count_high_priority,
- _tasks, priority);
-
- if (index < 0) {
- // there is no high priority task. notify other thread to
handle normal task
- _worker_thread_condition_variable.notify_all();
- break;
- }
-
- agent_task_req = _tasks[index];
- push_req = agent_task_req.push_req;
- _tasks.erase(_tasks.begin() + index);
- } while (false);
-
- if (index < 0) {
- // there is no high priority task in queue
- sleep(1);
- continue;
- }
-
- LOG(INFO) << "get push task. signature=" << agent_task_req.signature
- << ", priority=" << priority << " push_type=" <<
push_req.push_type;
- std::vector<TTabletInfo> tablet_infos;
-
- EngineBatchLoadTask engine_task(push_req, &tablet_infos);
- auto status = _env->storage_engine()->execute_task(&engine_task);
-
- // Return result to fe
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- if (push_req.push_type == TPushType::DELETE) {
- finish_task_request.__set_request_version(push_req.version);
- }
-
- if (status.ok()) {
- LOG_INFO("successfully execute push task")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", push_req.tablet_id)
- .tag("push_type", push_req.push_type);
- ++_s_report_version;
- finish_task_request.__set_finish_tablet_infos(tablet_infos);
- } else {
- LOG_WARNING("failed to execute push task")
- .tag("signature", agent_task_req.signature)
- .tag("tablet_id", push_req.tablet_id)
- .tag("push_type", push_req.push_type)
- .error(status);
- }
- finish_task_request.__set_task_status(status.to_thrift());
- finish_task_request.__set_report_version(_s_report_version);
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
-void TaskWorkerPool::_publish_version_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TPublishVersionRequest publish_version_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- publish_version_req = agent_task_req.publish_version_req;
- _tasks.pop_front();
- }
-
- DorisMetrics::instance()->publish_task_request_total->increment(1);
- VLOG_NOTICE << "get publish version task. signature=" <<
agent_task_req.signature;
-
- std::vector<TTabletId> error_tablet_ids;
- std::vector<TTabletId> succ_tablet_ids;
- uint32_t retry_time = 0;
- Status status;
- bool is_task_timeout = false;
- while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
- error_tablet_ids.clear();
- EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids,
- &succ_tablet_ids);
- status = _env->storage_engine()->execute_task(&engine_task);
- if (status.ok()) {
- break;
- } else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
- int64_t time_elapsed = time(nullptr) -
agent_task_req.recv_time;
- if (time_elapsed > PUBLISH_TIMEOUT_SEC) {
- LOG(INFO) << "task elapsed " << time_elapsed
- << " seconds since it is inserted to queue, it
is timeout";
- is_task_timeout = true;
- } else {
- // version not continuous, put to queue and wait pre
version publish
- // task execute
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _tasks.push_back(agent_task_req);
- _worker_thread_condition_variable.notify_one();
- }
- LOG_EVERY_SECOND(INFO) << "wait for previous publish version
task to be done"
- << "transaction_id: " <<
publish_version_req.transaction_id;
- break;
- } else {
- LOG_WARNING("failed to publish version")
- .tag("transaction_id",
publish_version_req.transaction_id)
- .tag("error_tablets_num", error_tablet_ids.size())
- .tag("retry_time", retry_time)
- .error(status);
- ++retry_time;
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- }
- if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
- continue;
- }
-
- TFinishTaskRequest finish_task_request;
- if (!status) {
- DorisMetrics::instance()->publish_task_failed_total->increment(1);
- // if publish failed, return failed, FE will ignore this error and
- // check error tablet ids and FE will also republish this task
- LOG_WARNING("failed to publish version")
- .tag("signature", agent_task_req.signature)
- .tag("transaction_id", publish_version_req.transaction_id)
- .tag("error_tablets_num", error_tablet_ids.size())
- .error(status);
- finish_task_request.__set_error_tablet_ids(error_tablet_ids);
- } else {
- if (!config::disable_auto_compaction) {
- for (int i = 0; i < succ_tablet_ids.size(); i++) {
- TabletSharedPtr tablet =
-
StorageEngine::instance()->tablet_manager()->get_tablet(
- succ_tablet_ids[i]);
- if (tablet != nullptr) {
- tablet->publised_count++;
- if (tablet->publised_count % 10 == 0) {
- StorageEngine::instance()->submit_compaction_task(
- tablet,
CompactionType::CUMULATIVE_COMPACTION, true);
- LOG(INFO) << "trigger compaction succ, tabletid:"
<< succ_tablet_ids[i]
- << ", publised:" <<
tablet->publised_count;
- }
- } else {
- LOG(WARNING)
- << "trigger compaction failed, tabletid:" <<
succ_tablet_ids[i];
- }
- }
- }
- LOG_INFO("successfully publish version")
- .tag("signature", agent_task_req.signature)
- .tag("transaction_id", publish_version_req.transaction_id)
- .tag("tablets_num", succ_tablet_ids.size());
- }
-
- status.to_thrift(&finish_task_request.task_status);
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- finish_task_request.__set_report_version(_s_report_version);
- finish_task_request.__set_error_tablet_ids(error_tablet_ids);
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
-void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
- while (_is_work) {
- TAgentTaskRequest agent_task_req;
- TClearTransactionTaskRequest clear_transaction_task_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- _worker_thread_condition_variable.wait(
- worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
- if (!_is_work) {
- return;
- }
-
- agent_task_req = _tasks.front();
- clear_transaction_task_req =
agent_task_req.clear_transaction_task_req;
- _tasks.pop_front();
- }
- LOG(INFO) << "get clear transaction task. signature=" <<
agent_task_req.signature
- << ", transaction_id=" <<
clear_transaction_task_req.transaction_id
- << ", partition_id_size=" <<
clear_transaction_task_req.partition_id.size();
-
- Status status;
-
- if (clear_transaction_task_req.transaction_id > 0) {
- // transaction_id should be greater than zero.
- // If it is not greater than zero, no need to execute
- // the following clear_transaction_task() function.
- if (!clear_transaction_task_req.partition_id.empty()) {
- _env->storage_engine()->clear_transaction_task(
- clear_transaction_task_req.transaction_id,
- clear_transaction_task_req.partition_id);
- } else {
- _env->storage_engine()->clear_transaction_task(
- clear_transaction_task_req.transaction_id);
- }
- LOG(INFO) << "finish to clear transaction task. signature=" <<
agent_task_req.signature
- << ", transaction_id=" <<
clear_transaction_task_req.transaction_id;
- } else {
- LOG(WARNING) << "invalid transaction id " <<
clear_transaction_task_req.transaction_id
- << ". signature= " << agent_task_req.signature;
- }
-
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_task_status(status.to_thrift());
- finish_task_request.__set_backend(BackendOptions::get_local_backend());
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
-
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
- }
-}
-
void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
@@ -1788,7 +1505,6 @@ CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env,
ThreadModel thread_model)
void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TCreateTabletReq create_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1797,9 +1513,9 @@ void
CreateTableTaskPool::_create_tablet_worker_thread_callback() {
return;
}
agent_task_req = _tasks.front();
- create_tablet_req = agent_task_req.create_tablet_req;
_tasks.pop_front();
}
+ const TCreateTabletReq& create_tablet_req =
agent_task_req.create_tablet_req;
scoped_refptr<Trace> trace(new Trace);
MonotonicStopWatch watch;
watch.start();
@@ -1862,7 +1578,6 @@ DropTableTaskPool::DropTableTaskPool(ExecEnv* env,
ThreadModel thread_model)
void DropTableTaskPool::_drop_tablet_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
- TDropTabletReq drop_tablet_req;
{
std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
_worker_thread_condition_variable.wait(
@@ -1872,10 +1587,9 @@ void
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
}
agent_task_req = _tasks.front();
- drop_tablet_req = agent_task_req.drop_tablet_req;
_tasks.pop_front();
}
-
+ TDropTabletReq drop_tablet_req = agent_task_req.drop_tablet_req;
Status status;
TabletSharedPtr dropped_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
drop_tablet_req.tablet_id, false);
@@ -1912,4 +1626,275 @@ void
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
}
}
+PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model,
PushWokerType type)
+ : TaskWorkerPool(
+ type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH :
TaskWorkerType::DELETE,
+ env, *env->master_info(), thread_model),
+ _push_worker_type(type) {
+ if (_push_worker_type == PushWokerType::LOAD_V2) {
+ _worker_count =
+ config::push_worker_count_normal_priority +
config::push_worker_count_high_priority;
+
+ } else {
+ _worker_count = config::delete_worker_count;
+ }
+ _cb = [this]() { _push_worker_thread_callback(); };
+}
+
+void PushTaskPool::_push_worker_thread_callback() {
+ // gen high priority worker thread
+ TPriority::type priority = TPriority::NORMAL;
+ int32_t push_worker_count_high_priority =
config::push_worker_count_high_priority;
+ if (_push_worker_type == PushWokerType::LOAD_V2) {
+ static uint32_t s_worker_count = 0;
+ std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
+ if (s_worker_count < push_worker_count_high_priority) {
+ ++s_worker_count;
+ priority = TPriority::HIGH;
+ }
+ }
+
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ if (priority == TPriority::HIGH) {
+ const auto it = std::find_if(
+ _tasks.cbegin(), _tasks.cend(), [](const
TAgentTaskRequest& req) {
+ return req.__isset.priority && req.priority ==
TPriority::HIGH;
+ });
+
+ if (it == _tasks.cend()) {
+ // there is no high priority task. notify other thread to
handle normal task
+ _worker_thread_condition_variable.notify_all();
+ sleep(1);
+ continue;
+ }
+ agent_task_req = *it;
+ _tasks.erase(it);
+ } else {
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ }
+ TPushReq& push_req = agent_task_req.push_req;
+
+ LOG(INFO) << "get push task. signature=" << agent_task_req.signature
+ << ", priority=" << priority << " push_type=" <<
push_req.push_type;
+ std::vector<TTabletInfo> tablet_infos;
+
+ EngineBatchLoadTask engine_task(push_req, &tablet_infos);
+ auto status = _env->storage_engine()->execute_task(&engine_task);
+
+ // Return result to fe
+ TFinishTaskRequest finish_task_request;
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+ if (push_req.push_type == TPushType::DELETE) {
+ finish_task_request.__set_request_version(push_req.version);
+ }
+
+ if (status.ok()) {
+ LOG_INFO("successfully execute push task")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", push_req.tablet_id)
+ .tag("push_type", push_req.push_type);
+ ++_s_report_version;
+ finish_task_request.__set_finish_tablet_infos(tablet_infos);
+ } else {
+ LOG_WARNING("failed to execute push task")
+ .tag("signature", agent_task_req.signature)
+ .tag("tablet_id", push_req.tablet_id)
+ .tag("push_type", push_req.push_type)
+ .error(status);
+ }
+ finish_task_request.__set_task_status(status.to_thrift());
+ finish_task_request.__set_report_version(_s_report_version);
+
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+PublishVersionTaskPool::PublishVersionTaskPool(ExecEnv* env, ThreadModel
thread_model)
+ : TaskWorkerPool(TaskWorkerType::PUBLISH_VERSION, env,
*env->master_info(), thread_model) {
+ _worker_count = config::publish_version_worker_count;
+ _cb = [this]() { _publish_version_worker_thread_callback(); };
+}
+
+void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ const TPublishVersionRequest& publish_version_req =
agent_task_req.publish_version_req;
+ DorisMetrics::instance()->publish_task_request_total->increment(1);
+ VLOG_NOTICE << "get publish version task. signature=" <<
agent_task_req.signature;
+
+ std::vector<TTabletId> error_tablet_ids;
+ std::vector<TTabletId> succ_tablet_ids;
+ uint32_t retry_time = 0;
+ Status status;
+ bool is_task_timeout = false;
+ while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+ error_tablet_ids.clear();
+ EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids,
+ &succ_tablet_ids);
+ status = _env->storage_engine()->execute_task(&engine_task);
+ if (status.ok()) {
+ break;
+ } else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
+ int64_t time_elapsed = time(nullptr) -
agent_task_req.recv_time;
+ if (time_elapsed > PUBLISH_TIMEOUT_SEC) {
+ LOG(INFO) << "task elapsed " << time_elapsed
+ << " seconds since it is inserted to queue, it
is timeout";
+ is_task_timeout = true;
+ } else {
+ // version not continuous, put to queue and wait pre
version publish
+ // task execute
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _tasks.push_back(agent_task_req);
+ _worker_thread_condition_variable.notify_one();
+ }
+ LOG(INFO) << "wait for previous publish version task to be
done"
+ << "transaction_id: " <<
publish_version_req.transaction_id;
+ break;
+ } else {
+ LOG_WARNING("failed to publish version")
+ .tag("transaction_id",
publish_version_req.transaction_id)
+ .tag("error_tablets_num", error_tablet_ids.size())
+ .tag("retry_time", retry_time)
+ .error(status);
+ ++retry_time;
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ }
+ if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
+ continue;
+ }
+
+ TFinishTaskRequest finish_task_request;
+ if (!status) {
+ DorisMetrics::instance()->publish_task_failed_total->increment(1);
+ // if publish failed, return failed, FE will ignore this error and
+ // check error tablet ids and FE will also republish this task
+ LOG_WARNING("failed to publish version")
+ .tag("signature", agent_task_req.signature)
+ .tag("transaction_id", publish_version_req.transaction_id)
+ .tag("error_tablets_num", error_tablet_ids.size())
+ .error(status);
+ finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+ } else {
+ if (!config::disable_auto_compaction) {
+ for (int i = 0; i < succ_tablet_ids.size(); i++) {
+ TabletSharedPtr tablet =
+
StorageEngine::instance()->tablet_manager()->get_tablet(
+ succ_tablet_ids[i]);
+ if (tablet != nullptr) {
+ tablet->publised_count++;
+ if (tablet->publised_count % 10 == 0) {
+ StorageEngine::instance()->submit_compaction_task(
+ tablet,
CompactionType::CUMULATIVE_COMPACTION, true);
+ LOG(INFO) << "trigger compaction succ, tabletid:"
<< succ_tablet_ids[i]
+ << ", publised:" <<
tablet->publised_count;
+ }
+ } else {
+ LOG(WARNING)
+ << "trigger compaction failed, tabletid:" <<
succ_tablet_ids[i];
+ }
+ }
+ }
+ LOG_INFO("successfully publish version")
+ .tag("signature", agent_task_req.signature)
+ .tag("transaction_id", publish_version_req.transaction_id)
+ .tag("tablets_num", succ_tablet_ids.size());
+ }
+
+ status.to_thrift(&finish_task_request.task_status);
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+ finish_task_request.__set_report_version(_s_report_version);
+ finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+ClearTransactionTaskPool::ClearTransactionTaskPool(ExecEnv* env, ThreadModel
thread_model)
+ : TaskWorkerPool(TaskWorkerType::CLEAR_TRANSACTION_TASK, env,
*env->master_info(),
+ thread_model) {
+ _worker_count = config::clear_transaction_task_worker_count;
+ _cb = [this]() { _clear_transaction_task_worker_thread_callback(); };
+}
+
+void
ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ _worker_thread_condition_variable.wait(
+ worker_thread_lock, [this]() { return !_is_work ||
!_tasks.empty(); });
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ const TClearTransactionTaskRequest& clear_transaction_task_req =
+ agent_task_req.clear_transaction_task_req;
+ LOG(INFO) << "get clear transaction task. signature=" <<
agent_task_req.signature
+ << ", transaction_id=" <<
clear_transaction_task_req.transaction_id
+ << ", partition_id_size=" <<
clear_transaction_task_req.partition_id.size();
+
+ Status status;
+
+ if (clear_transaction_task_req.transaction_id > 0) {
+ // transaction_id should be greater than zero.
+ // If it is not greater than zero, no need to execute
+ // the following clear_transaction_task() function.
+ if (!clear_transaction_task_req.partition_id.empty()) {
+ _env->storage_engine()->clear_transaction_task(
+ clear_transaction_task_req.transaction_id,
+ clear_transaction_task_req.partition_id);
+ } else {
+ _env->storage_engine()->clear_transaction_task(
+ clear_transaction_task_req.transaction_id);
+ }
+ LOG(INFO) << "finish to clear transaction task. signature=" <<
agent_task_req.signature
+ << ", transaction_id=" <<
clear_transaction_task_req.transaction_id;
+ } else {
+ LOG(WARNING) << "invalid transaction id " <<
clear_transaction_task_req.transaction_id
+ << ". signature= " << agent_task_req.signature;
+ }
+
+ TFinishTaskRequest finish_task_request;
+ finish_task_request.__set_task_status(status.to_thrift());
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(agent_task_req.task_type);
+ finish_task_request.__set_signature(agent_task_req.signature);
+
+ _finish_task(finish_task_request);
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3e4116f420..7287da3f03 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -182,12 +182,7 @@ protected:
bool _register_task_info(const TTaskType::type task_type, int64_t
signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
- uint32_t _get_next_task_index(int32_t thread_count,
std::deque<TAgentTaskRequest>& tasks,
- TPriority::type priority);
- void _push_worker_thread_callback();
- void _publish_version_worker_thread_callback();
- void _clear_transaction_task_worker_thread_callback();
void _alter_tablet_worker_thread_callback();
void _alter_inverted_index_worker_thread_callback();
void _clone_worker_thread_callback();
@@ -279,4 +274,32 @@ public:
DISALLOW_COPY_AND_ASSIGN(DropTableTaskPool);
};
+class PushTaskPool : public TaskWorkerPool {
+public:
+ enum class PushWokerType { LOAD_V2, DELETE };
+ PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
+ void _push_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(PushTaskPool);
+
+private:
+ PushWokerType _push_worker_type;
+};
+
+class PublishVersionTaskPool : public TaskWorkerPool {
+public:
+ PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model);
+ void _publish_version_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(PublishVersionTaskPool);
+};
+
+class ClearTransactionTaskPool : public TaskWorkerPool {
+public:
+ ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model);
+ void _clear_transaction_task_worker_thread_callback();
+
+ DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool);
+};
+
} // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 50952f559a..d8fd5c1bbe 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -44,9 +44,9 @@ using namespace ErrorCode;
using std::map;
-EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest&
publish_version_req,
- std::vector<TTabletId>*
error_tablet_ids,
- std::vector<TTabletId>*
succ_tablet_ids)
+EnginePublishVersionTask::EnginePublishVersionTask(
+ const TPublishVersionRequest& publish_version_req,
std::vector<TTabletId>* error_tablet_ids,
+ std::vector<TTabletId>* succ_tablet_ids)
: _total_task_num(0),
_publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index eb90c9d7d8..7c163839bd 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -59,7 +59,7 @@ private:
class EnginePublishVersionTask : public EngineTask {
public:
- EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
+ EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req,
vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids =
nullptr);
~EnginePublishVersionTask() {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]