This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0434c6a738 [refactor-WIP](TaskWorkerPool) add specific classes for 
PUSH, PUBLIC_VERION, CLEAR_TRANSACTION tasks (#19822)
0434c6a738 is described below

commit 0434c6a7381eca16d460ae48e5dce60b5e6532bf
Author: bobhan1 <[email protected]>
AuthorDate: Sat May 27 22:47:45 2023 +0800

    [refactor-WIP](TaskWorkerPool) add specific classes for PUSH, 
PUBLIC_VERION, CLEAR_TRANSACTION tasks (#19822)
---
 be/src/agent/agent_server.cpp                    |  25 +-
 be/src/agent/agent_server.h                      |   4 +-
 be/src/agent/task_worker_pool.cpp                | 561 +++++++++++------------
 be/src/agent/task_worker_pool.h                  |  33 +-
 be/src/olap/task/engine_publish_version_task.cpp |   6 +-
 be/src/olap/task/engine_publish_version_task.h   |   2 +-
 6 files changed, 325 insertions(+), 306 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 199770ba4a..9bbe4d862e 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -82,6 +82,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
 #define CREATE_AND_START_THREAD(type, pool_name)
 #endif // BE_TEST
 
+#ifndef BE_TEST
     _create_tablet_workers.reset(
             new CreateTableTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
     _create_tablet_workers->start();
@@ -89,11 +90,21 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
             new DropTableTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
     _drop_tablet_workers->start();
 
-    // Both PUSH and REALTIME_PUSH type use _push_workers
-    CREATE_AND_START_POOL(PUSH, _push_workers);
-    CREATE_AND_START_POOL(PUBLISH_VERSION, _publish_version_workers);
-    CREATE_AND_START_POOL(CLEAR_TRANSACTION_TASK, 
_clear_transaction_task_workers);
-    CREATE_AND_START_POOL(DELETE, _delete_workers);
+    // Both PUSH and REALTIME_PUSH type use _push_load_workers
+    _push_load_workers.reset(new PushTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+                                              
PushTaskPool::PushWokerType::LOAD_V2));
+    _push_load_workers->start();
+    _publish_version_workers.reset(
+            new PublishVersionTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _publish_version_workers->start();
+    _clear_transaction_task_workers.reset(
+            new ClearTransactionTaskPool(exec_env, 
TaskWorkerPool::ThreadModel::MULTI_THREADS));
+    _clear_transaction_task_workers->start();
+    _push_delete_workers.reset(new PushTaskPool(exec_env,
+                                                
TaskWorkerPool::ThreadModel::MULTI_THREADS,
+                                                
PushTaskPool::PushWokerType::DELETE));
+    _push_delete_workers->start();
+#endif
     CREATE_AND_START_POOL(ALTER_TABLE, _alter_tablet_workers);
     CREATE_AND_START_POOL(ALTER_INVERTED_INDEX, _alter_inverted_index_workers);
     CREATE_AND_START_POOL(CLONE, _clone_workers);
@@ -185,9 +196,9 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
                 break;
             }
             if (task.push_req.push_type == TPushType::LOAD_V2) {
-                _push_workers->submit_task(task);
+                _push_load_workers->submit_task(task);
             } else if (task.push_req.push_type == TPushType::DELETE) {
-                _delete_workers->submit_task(task);
+                _push_delete_workers->submit_task(task);
             } else {
                 ret_st = Status::InvalidArgument(
                         "task(signature={}, type={}, push_type={}) has wrong 
push_type", signature,
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 3aebc3d1f7..3d98fd025d 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -60,10 +60,10 @@ private:
 
     std::unique_ptr<TaskWorkerPool> _create_tablet_workers;
     std::unique_ptr<TaskWorkerPool> _drop_tablet_workers;
-    std::unique_ptr<TaskWorkerPool> _push_workers;
+    std::unique_ptr<TaskWorkerPool> _push_load_workers;
     std::unique_ptr<TaskWorkerPool> _publish_version_workers;
     std::unique_ptr<TaskWorkerPool> _clear_transaction_task_workers;
-    std::unique_ptr<TaskWorkerPool> _delete_workers;
+    std::unique_ptr<TaskWorkerPool> _push_delete_workers;
     std::unique_ptr<TaskWorkerPool> _alter_tablet_workers;
     std::unique_ptr<TaskWorkerPool> _alter_inverted_index_workers;
     std::unique_ptr<TaskWorkerPool> _push_cooldown_conf_workers;
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index ea8cbc633a..ff49f04ffe 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -144,22 +144,12 @@ void TaskWorkerPool::start() {
         break;
     case TaskWorkerType::PUSH:
     case TaskWorkerType::REALTIME_PUSH:
-        _worker_count =
-                config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
-        _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::PUBLISH_VERSION:
-        _worker_count = config::publish_version_worker_count;
-        _cb = 
std::bind<void>(&TaskWorkerPool::_publish_version_worker_thread_callback, this);
         break;
     case TaskWorkerType::CLEAR_TRANSACTION_TASK:
-        _worker_count = config::clear_transaction_task_worker_count;
-        _cb = 
std::bind<void>(&TaskWorkerPool::_clear_transaction_task_worker_thread_callback,
-                              this);
         break;
     case TaskWorkerType::DELETE:
-        _worker_count = config::delete_worker_count;
-        _cb = std::bind<void>(&TaskWorkerPool::_push_worker_thread_callback, 
this);
         break;
     case TaskWorkerType::ALTER_TABLE:
         _worker_count = config::alter_tablet_worker_count;
@@ -341,32 +331,6 @@ void TaskWorkerPool::_finish_task(const 
TFinishTaskRequest& finish_task_request)
     TRACE("finish task");
 }
 
-uint32_t TaskWorkerPool::_get_next_task_index(int32_t thread_count,
-                                              std::deque<TAgentTaskRequest>& 
tasks,
-                                              TPriority::type priority) {
-    int32_t index = -1;
-    std::deque<TAgentTaskRequest>::size_type task_count = tasks.size();
-    for (uint32_t i = 0; i < task_count; ++i) {
-        TAgentTaskRequest task = tasks[i];
-        if (priority == TPriority::HIGH) {
-            if (task.__isset.priority && task.priority == TPriority::HIGH) {
-                index = i;
-                break;
-            }
-        }
-    }
-
-    if (index == -1) {
-        if (priority == TPriority::HIGH) {
-            return index;
-        }
-
-        index = 0;
-    }
-
-    return index;
-}
-
 void TaskWorkerPool::_alter_inverted_index_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
@@ -590,253 +554,6 @@ void TaskWorkerPool::_alter_tablet(const 
TAgentTaskRequest& agent_task_req, int6
     finish_task_request->__set_task_status(status.to_thrift());
 }
 
-void TaskWorkerPool::_push_worker_thread_callback() {
-    // gen high priority worker thread
-    TPriority::type priority = TPriority::NORMAL;
-    int32_t push_worker_count_high_priority = 
config::push_worker_count_high_priority;
-    static uint32_t s_worker_count = 0;
-    {
-        std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
-        if (s_worker_count < push_worker_count_high_priority) {
-            ++s_worker_count;
-            priority = TPriority::HIGH;
-        }
-    }
-
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TPushReq push_req;
-        int32_t index = 0;
-        do {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            index = 
_get_next_task_index(config::push_worker_count_normal_priority +
-                                                 
config::push_worker_count_high_priority,
-                                         _tasks, priority);
-
-            if (index < 0) {
-                // there is no high priority task. notify other thread to 
handle normal task
-                _worker_thread_condition_variable.notify_all();
-                break;
-            }
-
-            agent_task_req = _tasks[index];
-            push_req = agent_task_req.push_req;
-            _tasks.erase(_tasks.begin() + index);
-        } while (false);
-
-        if (index < 0) {
-            // there is no high priority task in queue
-            sleep(1);
-            continue;
-        }
-
-        LOG(INFO) << "get push task. signature=" << agent_task_req.signature
-                  << ", priority=" << priority << " push_type=" << 
push_req.push_type;
-        std::vector<TTabletInfo> tablet_infos;
-
-        EngineBatchLoadTask engine_task(push_req, &tablet_infos);
-        auto status = _env->storage_engine()->execute_task(&engine_task);
-
-        // Return result to fe
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        if (push_req.push_type == TPushType::DELETE) {
-            finish_task_request.__set_request_version(push_req.version);
-        }
-
-        if (status.ok()) {
-            LOG_INFO("successfully execute push task")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", push_req.tablet_id)
-                    .tag("push_type", push_req.push_type);
-            ++_s_report_version;
-            finish_task_request.__set_finish_tablet_infos(tablet_infos);
-        } else {
-            LOG_WARNING("failed to execute push task")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("tablet_id", push_req.tablet_id)
-                    .tag("push_type", push_req.push_type)
-                    .error(status);
-        }
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_report_version(_s_report_version);
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_publish_version_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TPublishVersionRequest publish_version_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            publish_version_req = agent_task_req.publish_version_req;
-            _tasks.pop_front();
-        }
-
-        DorisMetrics::instance()->publish_task_request_total->increment(1);
-        VLOG_NOTICE << "get publish version task. signature=" << 
agent_task_req.signature;
-
-        std::vector<TTabletId> error_tablet_ids;
-        std::vector<TTabletId> succ_tablet_ids;
-        uint32_t retry_time = 0;
-        Status status;
-        bool is_task_timeout = false;
-        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
-            error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, 
&error_tablet_ids,
-                                                 &succ_tablet_ids);
-            status = _env->storage_engine()->execute_task(&engine_task);
-            if (status.ok()) {
-                break;
-            } else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
-                int64_t time_elapsed = time(nullptr) - 
agent_task_req.recv_time;
-                if (time_elapsed > PUBLISH_TIMEOUT_SEC) {
-                    LOG(INFO) << "task elapsed " << time_elapsed
-                              << " seconds since it is inserted to queue, it 
is timeout";
-                    is_task_timeout = true;
-                } else {
-                    // version not continuous, put to queue and wait pre 
version publish
-                    // task execute
-                    std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-                    _tasks.push_back(agent_task_req);
-                    _worker_thread_condition_variable.notify_one();
-                }
-                LOG_EVERY_SECOND(INFO) << "wait for previous publish version 
task to be done"
-                                       << "transaction_id: " << 
publish_version_req.transaction_id;
-                break;
-            } else {
-                LOG_WARNING("failed to publish version")
-                        .tag("transaction_id", 
publish_version_req.transaction_id)
-                        .tag("error_tablets_num", error_tablet_ids.size())
-                        .tag("retry_time", retry_time)
-                        .error(status);
-                ++retry_time;
-                std::this_thread::sleep_for(std::chrono::seconds(1));
-            }
-        }
-        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
-            continue;
-        }
-
-        TFinishTaskRequest finish_task_request;
-        if (!status) {
-            DorisMetrics::instance()->publish_task_failed_total->increment(1);
-            // if publish failed, return failed, FE will ignore this error and
-            // check error tablet ids and FE will also republish this task
-            LOG_WARNING("failed to publish version")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("error_tablets_num", error_tablet_ids.size())
-                    .error(status);
-            finish_task_request.__set_error_tablet_ids(error_tablet_ids);
-        } else {
-            if (!config::disable_auto_compaction) {
-                for (int i = 0; i < succ_tablet_ids.size(); i++) {
-                    TabletSharedPtr tablet =
-                            
StorageEngine::instance()->tablet_manager()->get_tablet(
-                                    succ_tablet_ids[i]);
-                    if (tablet != nullptr) {
-                        tablet->publised_count++;
-                        if (tablet->publised_count % 10 == 0) {
-                            StorageEngine::instance()->submit_compaction_task(
-                                    tablet, 
CompactionType::CUMULATIVE_COMPACTION, true);
-                            LOG(INFO) << "trigger compaction succ, tabletid:" 
<< succ_tablet_ids[i]
-                                      << ", publised:" << 
tablet->publised_count;
-                        }
-                    } else {
-                        LOG(WARNING)
-                                << "trigger compaction failed, tabletid:" << 
succ_tablet_ids[i];
-                    }
-                }
-            }
-            LOG_INFO("successfully publish version")
-                    .tag("signature", agent_task_req.signature)
-                    .tag("transaction_id", publish_version_req.transaction_id)
-                    .tag("tablets_num", succ_tablet_ids.size());
-        }
-
-        status.to_thrift(&finish_task_request.task_status);
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_report_version(_s_report_version);
-        finish_task_request.__set_error_tablet_ids(error_tablet_ids);
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
-void TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
-    while (_is_work) {
-        TAgentTaskRequest agent_task_req;
-        TClearTransactionTaskRequest clear_transaction_task_req;
-        {
-            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            _worker_thread_condition_variable.wait(
-                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
-            if (!_is_work) {
-                return;
-            }
-
-            agent_task_req = _tasks.front();
-            clear_transaction_task_req = 
agent_task_req.clear_transaction_task_req;
-            _tasks.pop_front();
-        }
-        LOG(INFO) << "get clear transaction task. signature=" << 
agent_task_req.signature
-                  << ", transaction_id=" << 
clear_transaction_task_req.transaction_id
-                  << ", partition_id_size=" << 
clear_transaction_task_req.partition_id.size();
-
-        Status status;
-
-        if (clear_transaction_task_req.transaction_id > 0) {
-            // transaction_id should be greater than zero.
-            // If it is not greater than zero, no need to execute
-            // the following clear_transaction_task() function.
-            if (!clear_transaction_task_req.partition_id.empty()) {
-                _env->storage_engine()->clear_transaction_task(
-                        clear_transaction_task_req.transaction_id,
-                        clear_transaction_task_req.partition_id);
-            } else {
-                _env->storage_engine()->clear_transaction_task(
-                        clear_transaction_task_req.transaction_id);
-            }
-            LOG(INFO) << "finish to clear transaction task. signature=" << 
agent_task_req.signature
-                      << ", transaction_id=" << 
clear_transaction_task_req.transaction_id;
-        } else {
-            LOG(WARNING) << "invalid transaction id " << 
clear_transaction_task_req.transaction_id
-                         << ". signature= " << agent_task_req.signature;
-        }
-
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_task_status(status.to_thrift());
-        finish_task_request.__set_backend(BackendOptions::get_local_backend());
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
-    }
-}
-
 void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
@@ -1788,7 +1505,6 @@ CreateTableTaskPool::CreateTableTaskPool(ExecEnv* env, 
ThreadModel thread_model)
 void CreateTableTaskPool::_create_tablet_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TCreateTabletReq create_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1797,9 +1513,9 @@ void 
CreateTableTaskPool::_create_tablet_worker_thread_callback() {
                 return;
             }
             agent_task_req = _tasks.front();
-            create_tablet_req = agent_task_req.create_tablet_req;
             _tasks.pop_front();
         }
+        const TCreateTabletReq& create_tablet_req = 
agent_task_req.create_tablet_req;
         scoped_refptr<Trace> trace(new Trace);
         MonotonicStopWatch watch;
         watch.start();
@@ -1862,7 +1578,6 @@ DropTableTaskPool::DropTableTaskPool(ExecEnv* env, 
ThreadModel thread_model)
 void DropTableTaskPool::_drop_tablet_worker_thread_callback() {
     while (_is_work) {
         TAgentTaskRequest agent_task_req;
-        TDropTabletReq drop_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
             _worker_thread_condition_variable.wait(
@@ -1872,10 +1587,9 @@ void 
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
             }
 
             agent_task_req = _tasks.front();
-            drop_tablet_req = agent_task_req.drop_tablet_req;
             _tasks.pop_front();
         }
-
+        TDropTabletReq drop_tablet_req = agent_task_req.drop_tablet_req;
         Status status;
         TabletSharedPtr dropped_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 drop_tablet_req.tablet_id, false);
@@ -1912,4 +1626,275 @@ void 
DropTableTaskPool::_drop_tablet_worker_thread_callback() {
     }
 }
 
+PushTaskPool::PushTaskPool(ExecEnv* env, ThreadModel thread_model, 
PushWokerType type)
+        : TaskWorkerPool(
+                  type == PushWokerType::LOAD_V2 ? TaskWorkerType::PUSH : 
TaskWorkerType::DELETE,
+                  env, *env->master_info(), thread_model),
+          _push_worker_type(type) {
+    if (_push_worker_type == PushWokerType::LOAD_V2) {
+        _worker_count =
+                config::push_worker_count_normal_priority + 
config::push_worker_count_high_priority;
+
+    } else {
+        _worker_count = config::delete_worker_count;
+    }
+    _cb = [this]() { _push_worker_thread_callback(); };
+}
+
+void PushTaskPool::_push_worker_thread_callback() {
+    // gen high priority worker thread
+    TPriority::type priority = TPriority::NORMAL;
+    int32_t push_worker_count_high_priority = 
config::push_worker_count_high_priority;
+    if (_push_worker_type == PushWokerType::LOAD_V2) {
+        static uint32_t s_worker_count = 0;
+        std::lock_guard<std::mutex> worker_thread_lock(_worker_thread_lock);
+        if (s_worker_count < push_worker_count_high_priority) {
+            ++s_worker_count;
+            priority = TPriority::HIGH;
+        }
+    }
+
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            if (priority == TPriority::HIGH) {
+                const auto it = std::find_if(
+                        _tasks.cbegin(), _tasks.cend(), [](const 
TAgentTaskRequest& req) {
+                            return req.__isset.priority && req.priority == 
TPriority::HIGH;
+                        });
+
+                if (it == _tasks.cend()) {
+                    // there is no high priority task. notify other thread to 
handle normal task
+                    _worker_thread_condition_variable.notify_all();
+                    sleep(1);
+                    continue;
+                }
+                agent_task_req = *it;
+                _tasks.erase(it);
+            } else {
+                agent_task_req = _tasks.front();
+                _tasks.pop_front();
+            }
+        }
+        TPushReq& push_req = agent_task_req.push_req;
+
+        LOG(INFO) << "get push task. signature=" << agent_task_req.signature
+                  << ", priority=" << priority << " push_type=" << 
push_req.push_type;
+        std::vector<TTabletInfo> tablet_infos;
+
+        EngineBatchLoadTask engine_task(push_req, &tablet_infos);
+        auto status = _env->storage_engine()->execute_task(&engine_task);
+
+        // Return result to fe
+        TFinishTaskRequest finish_task_request;
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+        if (push_req.push_type == TPushType::DELETE) {
+            finish_task_request.__set_request_version(push_req.version);
+        }
+
+        if (status.ok()) {
+            LOG_INFO("successfully execute push task")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", push_req.tablet_id)
+                    .tag("push_type", push_req.push_type);
+            ++_s_report_version;
+            finish_task_request.__set_finish_tablet_infos(tablet_infos);
+        } else {
+            LOG_WARNING("failed to execute push task")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("tablet_id", push_req.tablet_id)
+                    .tag("push_type", push_req.push_type)
+                    .error(status);
+        }
+        finish_task_request.__set_task_status(status.to_thrift());
+        finish_task_request.__set_report_version(_s_report_version);
+
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+PublishVersionTaskPool::PublishVersionTaskPool(ExecEnv* env, ThreadModel 
thread_model)
+        : TaskWorkerPool(TaskWorkerType::PUBLISH_VERSION, env, 
*env->master_info(), thread_model) {
+    _worker_count = config::publish_version_worker_count;
+    _cb = [this]() { _publish_version_worker_thread_callback(); };
+}
+
+void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        const TPublishVersionRequest& publish_version_req = 
agent_task_req.publish_version_req;
+        DorisMetrics::instance()->publish_task_request_total->increment(1);
+        VLOG_NOTICE << "get publish version task. signature=" << 
agent_task_req.signature;
+
+        std::vector<TTabletId> error_tablet_ids;
+        std::vector<TTabletId> succ_tablet_ids;
+        uint32_t retry_time = 0;
+        Status status;
+        bool is_task_timeout = false;
+        while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+            error_tablet_ids.clear();
+            EnginePublishVersionTask engine_task(publish_version_req, 
&error_tablet_ids,
+                                                 &succ_tablet_ids);
+            status = _env->storage_engine()->execute_task(&engine_task);
+            if (status.ok()) {
+                break;
+            } else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
+                int64_t time_elapsed = time(nullptr) - 
agent_task_req.recv_time;
+                if (time_elapsed > PUBLISH_TIMEOUT_SEC) {
+                    LOG(INFO) << "task elapsed " << time_elapsed
+                              << " seconds since it is inserted to queue, it 
is timeout";
+                    is_task_timeout = true;
+                } else {
+                    // version not continuous, put to queue and wait pre 
version publish
+                    // task execute
+                    std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+                    _tasks.push_back(agent_task_req);
+                    _worker_thread_condition_variable.notify_one();
+                }
+                LOG(INFO) << "wait for previous publish version task to be 
done"
+                          << "transaction_id: " << 
publish_version_req.transaction_id;
+                break;
+            } else {
+                LOG_WARNING("failed to publish version")
+                        .tag("transaction_id", 
publish_version_req.transaction_id)
+                        .tag("error_tablets_num", error_tablet_ids.size())
+                        .tag("retry_time", retry_time)
+                        .error(status);
+                ++retry_time;
+                std::this_thread::sleep_for(std::chrono::seconds(1));
+            }
+        }
+        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
+            continue;
+        }
+
+        TFinishTaskRequest finish_task_request;
+        if (!status) {
+            DorisMetrics::instance()->publish_task_failed_total->increment(1);
+            // if publish failed, return failed, FE will ignore this error and
+            // check error tablet ids and FE will also republish this task
+            LOG_WARNING("failed to publish version")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("transaction_id", publish_version_req.transaction_id)
+                    .tag("error_tablets_num", error_tablet_ids.size())
+                    .error(status);
+            finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+        } else {
+            if (!config::disable_auto_compaction) {
+                for (int i = 0; i < succ_tablet_ids.size(); i++) {
+                    TabletSharedPtr tablet =
+                            
StorageEngine::instance()->tablet_manager()->get_tablet(
+                                    succ_tablet_ids[i]);
+                    if (tablet != nullptr) {
+                        tablet->publised_count++;
+                        if (tablet->publised_count % 10 == 0) {
+                            StorageEngine::instance()->submit_compaction_task(
+                                    tablet, 
CompactionType::CUMULATIVE_COMPACTION, true);
+                            LOG(INFO) << "trigger compaction succ, tabletid:" 
<< succ_tablet_ids[i]
+                                      << ", publised:" << 
tablet->publised_count;
+                        }
+                    } else {
+                        LOG(WARNING)
+                                << "trigger compaction failed, tabletid:" << 
succ_tablet_ids[i];
+                    }
+                }
+            }
+            LOG_INFO("successfully publish version")
+                    .tag("signature", agent_task_req.signature)
+                    .tag("transaction_id", publish_version_req.transaction_id)
+                    .tag("tablets_num", succ_tablet_ids.size());
+        }
+
+        status.to_thrift(&finish_task_request.task_status);
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+        finish_task_request.__set_report_version(_s_report_version);
+        finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+ClearTransactionTaskPool::ClearTransactionTaskPool(ExecEnv* env, ThreadModel 
thread_model)
+        : TaskWorkerPool(TaskWorkerType::CLEAR_TRANSACTION_TASK, env, 
*env->master_info(),
+                         thread_model) {
+    _worker_count = config::clear_transaction_task_worker_count;
+    _cb = [this]() { _clear_transaction_task_worker_thread_callback(); };
+}
+
+void 
ClearTransactionTaskPool::_clear_transaction_task_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        const TClearTransactionTaskRequest& clear_transaction_task_req =
+                agent_task_req.clear_transaction_task_req;
+        LOG(INFO) << "get clear transaction task. signature=" << 
agent_task_req.signature
+                  << ", transaction_id=" << 
clear_transaction_task_req.transaction_id
+                  << ", partition_id_size=" << 
clear_transaction_task_req.partition_id.size();
+
+        Status status;
+
+        if (clear_transaction_task_req.transaction_id > 0) {
+            // transaction_id should be greater than zero.
+            // If it is not greater than zero, no need to execute
+            // the following clear_transaction_task() function.
+            if (!clear_transaction_task_req.partition_id.empty()) {
+                _env->storage_engine()->clear_transaction_task(
+                        clear_transaction_task_req.transaction_id,
+                        clear_transaction_task_req.partition_id);
+            } else {
+                _env->storage_engine()->clear_transaction_task(
+                        clear_transaction_task_req.transaction_id);
+            }
+            LOG(INFO) << "finish to clear transaction task. signature=" << 
agent_task_req.signature
+                      << ", transaction_id=" << 
clear_transaction_task_req.transaction_id;
+        } else {
+            LOG(WARNING) << "invalid transaction id " << 
clear_transaction_task_req.transaction_id
+                         << ". signature= " << agent_task_req.signature;
+        }
+
+        TFinishTaskRequest finish_task_request;
+        finish_task_request.__set_task_status(status.to_thrift());
+        finish_task_request.__set_backend(BackendOptions::get_local_backend());
+        finish_task_request.__set_task_type(agent_task_req.task_type);
+        finish_task_request.__set_signature(agent_task_req.signature);
+
+        _finish_task(finish_task_request);
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3e4116f420..7287da3f03 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -182,12 +182,7 @@ protected:
     bool _register_task_info(const TTaskType::type task_type, int64_t 
signature);
     void _remove_task_info(const TTaskType::type task_type, int64_t signature);
     void _finish_task(const TFinishTaskRequest& finish_task_request);
-    uint32_t _get_next_task_index(int32_t thread_count, 
std::deque<TAgentTaskRequest>& tasks,
-                                  TPriority::type priority);
 
-    void _push_worker_thread_callback();
-    void _publish_version_worker_thread_callback();
-    void _clear_transaction_task_worker_thread_callback();
     void _alter_tablet_worker_thread_callback();
     void _alter_inverted_index_worker_thread_callback();
     void _clone_worker_thread_callback();
@@ -279,4 +274,32 @@ public:
     DISALLOW_COPY_AND_ASSIGN(DropTableTaskPool);
 };
 
+class PushTaskPool : public TaskWorkerPool {
+public:
+    enum class PushWokerType { LOAD_V2, DELETE };
+    PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
+    void _push_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(PushTaskPool);
+
+private:
+    PushWokerType _push_worker_type;
+};
+
+class PublishVersionTaskPool : public TaskWorkerPool {
+public:
+    PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model);
+    void _publish_version_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(PublishVersionTaskPool);
+};
+
+class ClearTransactionTaskPool : public TaskWorkerPool {
+public:
+    ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model);
+    void _clear_transaction_task_worker_thread_callback();
+
+    DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool);
+};
+
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 50952f559a..d8fd5c1bbe 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -44,9 +44,9 @@ using namespace ErrorCode;
 
 using std::map;
 
-EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& 
publish_version_req,
-                                                   std::vector<TTabletId>* 
error_tablet_ids,
-                                                   std::vector<TTabletId>* 
succ_tablet_ids)
+EnginePublishVersionTask::EnginePublishVersionTask(
+        const TPublishVersionRequest& publish_version_req, 
std::vector<TTabletId>* error_tablet_ids,
+        std::vector<TTabletId>* succ_tablet_ids)
         : _total_task_num(0),
           _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index eb90c9d7d8..7c163839bd 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -59,7 +59,7 @@ private:
 
 class EnginePublishVersionTask : public EngineTask {
 public:
-    EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
+    EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req,
                              vector<TTabletId>* error_tablet_ids,
                              std::vector<TTabletId>* succ_tablet_ids = 
nullptr);
     ~EnginePublishVersionTask() {}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to