This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f25c2  [Bug] Tablet and Disk report thread not work (#4597)
00f25c2 is described below

commit 00f25c2b77792a0d77d7488d02b96305b9b914ce
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Sep 20 20:51:52 2020 +0800

    [Bug] Tablet and Disk report thread not work (#4597)
    
    The tablet and disk information reporting threads need to report to the FE 
periodically.
    At the same time these two reporting threads will also be triggered by 
certain events.
    
    The modification in PR #4440 caused these two threads to be triggered only 
by events,
    and could not report regularly.
---
 be/src/agent/task_worker_pool.cpp | 62 +++++++++++++++++++++------------------
 be/src/agent/task_worker_pool.h   | 36 +++++++++++++++++++++++
 be/src/olap/storage_engine.cpp    |  3 +-
 3 files changed, 71 insertions(+), 30 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 411e294..2aacd4c 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -79,7 +79,8 @@ FrontendServiceClientCache 
TaskWorkerPool::_master_service_client_cache;
 
 TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* 
env,
                                const TMasterInfo& master_info)
-        : _master_info(master_info),
+        : _name(strings::Substitute("TaskWorkerPool.$0", 
TYPE_STRING(_task_worker_type))),
+          _master_info(master_info),
           _agent_utils(new AgentUtils()),
           _master_client(new MasterServerClient(_master_info, 
&_master_service_client_cache)),
           _env(env),
@@ -186,8 +187,7 @@ void TaskWorkerPool::start() {
     }
 
 #ifndef BE_TEST
-    // TODO(yingchun): need a better name
-    ThreadPoolBuilder(strings::Substitute("TaskWorkerPool.$0", 
_task_worker_type))
+    ThreadPoolBuilder(_name)
             .set_min_threads(_worker_count)
             .set_max_threads(_worker_count)
             .build(&_thread_pool);
@@ -233,6 +233,11 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& 
task) {
     }
 }
 
+void TaskWorkerPool::notify_thread() {
+    _worker_thread_condition_variable.notify_one();
+    LOG(INFO) << "notify task worker pool: " << _name;
+}
+
 bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, 
int64_t signature) {
     lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
     set<int64_t>& signature_set = _s_task_signatures[task_type];
@@ -970,7 +975,7 @@ void 
TaskWorkerPool::_check_consistency_worker_thread_callback() {
                          << ", signature: " << agent_task_req.signature;
             status_code = TStatusCode::RUNTIME_ERROR;
         } else {
-            LOG(INFO) << "check consistency success. status:" << res
+            LOG(INFO) << "check consistency success. status: " << res
                       << ", signature:" << agent_task_req.signature << ", 
checksum:" << checksum;
         }
 
@@ -1007,13 +1012,18 @@ void 
TaskWorkerPool::_report_task_worker_thread_callback() {
 
         if (status != DORIS_SUCCESS) {
             
DorisMetrics::instance()->report_task_requests_failed->increment(1);
-            LOG(WARNING) << "finish report task failed. status:" << status << 
", master host:"
+            LOG(WARNING) << "report task failed. status: " << status << ", 
master host: "
                          << _master_info.network_address.hostname
-                         << "port:" << _master_info.network_address.port;
+                         << "port: " << _master_info.network_address.port;
+        } else {
+            LOG(INFO) << "finish report task. master host: "
+                << _master_info.network_address.hostname
+                << "port: " << _master_info.network_address.port;
         }
     } while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(config::report_task_interval_seconds)));
 }
 
+/// disk state report thread will report disk state at a configurable fix 
interval.
 void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
     StorageEngine::instance()->register_report_listener(this);
 
@@ -1029,15 +1039,11 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
             continue;
         }
 
-        lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
-        while (_is_work && _tasks.empty()) {
-            _worker_thread_condition_variable.wait();
-        }
+        // wait at most report_disk_state_interval_seconds, or being notified
+        
_worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_disk_state_interval_seconds));
         if (!_is_work) {
-            return;
+            break;
         }
-        TAgentTaskRequest agent_task_req = _tasks.front();
-        _tasks.pop_front();
 
         vector<DataDirInfo> data_dir_infos;
         _env->storage_engine()->get_all_data_dir_info(&data_dir_infos, true /* 
update */);
@@ -1062,11 +1068,14 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
 
         if (status != DORIS_SUCCESS) {
             
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
-            LOG(WARNING) << "finish report disk state failed. status:" << 
status << ", master host:"
+            LOG(WARNING) << "report disk state failed. status: " << status << 
", master host: "
                          << _master_info.network_address.hostname
-                         << ", port:" << _master_info.network_address.port;
+                         << ", port: " << _master_info.network_address.port;
+        } else {
+            LOG(INFO) << "finish report disk state. master host: "
+                << _master_info.network_address.hostname
+                << ", port: " << _master_info.network_address.port;
         }
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
     }
     StorageEngine::instance()->deregister_report_listener(this);
 }
@@ -1088,17 +1097,12 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
             continue;
         }
 
-        lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
-        while (_is_work && _tasks.empty()) {
-            _worker_thread_condition_variable.wait();
-        }
+        // wait at most report_tablet_interval_seconds, or being notified
+        
_worker_thread_condition_variable.wait_for(MonoDelta::FromSeconds(config::report_tablet_interval_seconds));
         if (!_is_work) {
-            return;
+            break;
         }
 
-        TAgentTaskRequest agent_task_req = _tasks.front();
-        _tasks.pop_front();
-
         request.tablets.clear();
         OLAPStatus report_all_tablets_info_status =
                 
StorageEngine::instance()->tablet_manager()->report_all_tablets_info(
@@ -1117,12 +1121,14 @@ void 
TaskWorkerPool::_report_tablet_worker_thread_callback() {
         AgentStatus status = _master_client->report(request, &result);
         if (status != DORIS_SUCCESS) {
             
DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1);
-            LOG(WARNING) << "finish report olap table state failed. status:" 
<< status
-                         << ", master host:"
-                         << _master_info.network_address.hostname
+            LOG(WARNING) << "report tablets failed. status: " << status
+                         << ", master host: " << 
_master_info.network_address.hostname
                          << ", port:" << _master_info.network_address.port;
+        } else {
+            LOG(INFO) << "finish report tablets. master host: "
+                << _master_info.network_address.hostname
+                << ", port: " << _master_info.network_address.port;
         }
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
     }
     StorageEngine::instance()->deregister_report_listener(this);
 }
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index d7536d7..7cb1d0a 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -43,6 +43,7 @@ class ThreadPool;
 
 class TaskWorkerPool {
 public:
+    // You need to modify the content in TYPE_STRING at the same time,
     enum TaskWorkerType {
         CREATE_TABLE,
         DROP_TABLE,
@@ -71,6 +72,35 @@ public:
         UPDATE_TABLET_META_INFO
     };
 
+    inline const std::string TYPE_STRING(TaskWorkerType type) {
+        switch(type) {
+            case CREATE_TABLE: return "CREATE_TABLE";
+            case DROP_TABLE: return "DROP_TABLE";
+            case PUSH: return "PUSH";
+            case REALTIME_PUSH: return "REALTIME_PUSH";
+            case PUBLISH_VERSION: return "PUBLISH_VERSION";
+            case CLEAR_ALTER_TASK: return "CLEAR_ALTER_TASK";
+            case CLEAR_TRANSACTION_TASK: return "CLEAR_TRANSACTION_TASK";
+            case DELETE: return "DELETE";
+            case ALTER_TABLE: return "ALTER_TABLE";
+            case QUERY_SPLIT_KEY: return "QUERY_SPLIT_KEY";
+            case CLONE: return "CLONE";
+            case STORAGE_MEDIUM_MIGRATE: return "STORAGE_MEDIUM_MIGRATE";
+            case CHECK_CONSISTENCY: return "CHECK_CONSISTENCY";
+            case REPORT_TASK: return "REPORT_TASK";
+            case REPORT_DISK_STATE: return "REPORT_DISK_STATE";
+            case REPORT_OLAP_TABLE: return "REPORT_OLAP_TABLE";
+            case UPLOAD: return "UPLOAD";
+            case DOWNLOAD: return "DOWNLOAD";
+            case MAKE_SNAPSHOT: return "MAKE_SNAPSHOT";
+            case RELEASE_SNAPSHOT: return "RELEASE_SNAPSHOT";
+            case MOVE: return "MOVE";
+            case RECOVER_TABLET: return "RECOVER_TABLET";
+            case UPDATE_TABLET_META_INFO:  return "UPDATE_TABLET_META_INFO";
+            default: return "Unknown";
+        }
+    }
+
     TaskWorkerPool(
             const TaskWorkerType task_worker_type,
             ExecEnv* env,
@@ -89,6 +119,9 @@ public:
     // * task: the task need callback thread to do
     virtual void submit_task(const TAgentTaskRequest& task);
 
+    // notify the worker. currently for task/disk/tablet report thread
+    void notify_thread();
+
 private:
     bool _register_task_info(const TTaskType::type task_type, int64_t 
signature);
     void _remove_task_info(const TTaskType::type task_type, int64_t signature);
@@ -135,6 +168,9 @@ private:
             bool overwrite,
             std::vector<std::string>* error_msgs);
 
+private:
+    std::string _name;
+
     // Reference to the ExecEnv::_master_info
     const TMasterInfo& _master_info;
     TBackend _backend;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index a07d6b9..689fa8a 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -978,8 +978,7 @@ void 
StorageEngine::deregister_report_listener(TaskWorkerPool* listener) {
 void StorageEngine::notify_listeners() {
     std::lock_guard<std::mutex> l(_report_mtx);
     for (auto& listener : _report_listeners) {
-        TAgentTaskRequest task;
-        listener->submit_task(task);
+        listener->notify_thread();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to