This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 422456c Add warn log when client report be state failed and refactor
some report code (#5342)
422456c is described below
commit 422456c31a74b4c17d83c615656e7ff19015078f
Author: caiconghui <[email protected]>
AuthorDate: Wed Mar 3 17:00:21 2021 +0800
Add warn log when client report be state failed and refactor some report
code (#5342)
There are some redundant code for report task, disk and tablet in be, and
when fe return error report message, there is no any warn log showing report
failed.
Co-authored-by: caiconghui [蔡聪辉] <[email protected]>
---
be/src/agent/task_worker_pool.cpp | 104 ++++++++++++++++++++------------------
be/src/agent/task_worker_pool.h | 20 ++++++++
be/src/olap/tablet_manager.cpp | 6 +--
be/src/olap/tablet_manager.h | 2 +-
4 files changed, 80 insertions(+), 52 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 9e7efc3..abe918d 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1084,21 +1084,7 @@ void
TaskWorkerPool::_report_task_worker_thread_callback() {
lock_guard<Mutex> task_signatures_lock(_s_task_signatures_lock);
request.__set_tasks(_s_task_signatures);
}
-
- DorisMetrics::instance()->report_task_requests_total->increment(1);
- TMasterResult result;
- AgentStatus status = _master_client->report(request, &result);
-
- if (status != DORIS_SUCCESS) {
-
DorisMetrics::instance()->report_task_requests_failed->increment(1);
- LOG(WARNING) << "report task failed. status: " << status
- << ", master host: " <<
_master_info.network_address.hostname
- << "port: " << _master_info.network_address.port;
- } else {
- LOG(INFO) << "finish report task. master host: "
- << _master_info.network_address.hostname
- << " port: " << _master_info.network_address.port;
- }
+ _handle_report(request, ReportType::TASK);
} while (!_stop_background_threads_latch.wait_for(
MonoDelta::FromSeconds(config::report_task_interval_seconds)));
}
@@ -1142,21 +1128,7 @@ void
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
disks[root_path_info.path] = disk;
}
request.__set_disks(disks);
-
- DorisMetrics::instance()->report_disk_requests_total->increment(1);
- TMasterResult result;
- AgentStatus status = _master_client->report(request, &result);
-
- if (status != DORIS_SUCCESS) {
-
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
- LOG(WARNING) << "report disk state failed. status: " << status
- << ", master host: " <<
_master_info.network_address.hostname
- << ", port: " << _master_info.network_address.port;
- } else {
- LOG(INFO) << "finish report disk state. master host: "
- << _master_info.network_address.hostname
- << ", port: " << _master_info.network_address.port;
- }
+ _handle_report(request, ReportType::DISK);
}
StorageEngine::instance()->deregister_report_listener(this);
}
@@ -1185,12 +1157,12 @@ void
TaskWorkerPool::_report_tablet_worker_thread_callback() {
}
request.tablets.clear();
- OLAPStatus report_all_tablets_info_status =
-
StorageEngine::instance()->tablet_manager()->report_all_tablets_info(
+ OLAPStatus build_all_report_tablets_info_status =
+
StorageEngine::instance()->tablet_manager()->build_all_report_tablets_info(
&request.tablets);
- if (report_all_tablets_info_status != OLAP_SUCCESS) {
- LOG(WARNING) << "report get all tablets info failed. status: "
- << report_all_tablets_info_status;
+ if (build_all_report_tablets_info_status != OLAP_SUCCESS) {
+ LOG(WARNING) << "build all report tablets info failed. status: "
+ << build_all_report_tablets_info_status;
continue;
}
int64_t max_compaction_score =
@@ -1198,19 +1170,7 @@ void
TaskWorkerPool::_report_tablet_worker_thread_callback() {
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
request.__set_tablet_max_compaction_score(max_compaction_score);
request.__set_report_version(_s_report_version);
-
- TMasterResult result;
- AgentStatus status = _master_client->report(request, &result);
- if (status != DORIS_SUCCESS) {
-
DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1);
- LOG(WARNING) << "report tablets failed. status: " << status
- << ", master host: " <<
_master_info.network_address.hostname
- << ", port:" << _master_info.network_address.port;
- } else {
- LOG(INFO) << "finish report tablets. master host: "
- << _master_info.network_address.hostname
- << ", port: " << _master_info.network_address.port;
- }
+ _handle_report(request, ReportType::TABLET);
}
StorageEngine::instance()->deregister_report_listener(this);
}
@@ -1566,4 +1526,52 @@ AgentStatus TaskWorkerPool::_move_dir(const TTabletId
tablet_id, const TSchemaHa
return DORIS_SUCCESS;
}
+void TaskWorkerPool::_handle_report(TReportRequest& request, ReportType type) {
+ TMasterResult result;
+ AgentStatus status = _master_client->report(request, &result);
+ bool is_report_success = false;
+ if (status != DORIS_SUCCESS) {
+ LOG(WARNING) << "report " << TYPE_STRING(type) << " failed. status: "
<< status
+ << ", master host: " <<
_master_info.network_address.hostname
+ << ", port:" << _master_info.network_address.port;
+ } else if (result.status.status_code != TStatusCode::OK) {
+ std::stringstream ss;
+ if (!result.status.error_msgs.empty()) {
+ ss << result.status.error_msgs[0];
+ for (int i = 1; i < result.status.error_msgs.size(); i++) {
+ ss << "," << result.status.error_msgs[i];
+ }
+ }
+ LOG(WARNING) << "finish report " << TYPE_STRING(type) << " failed.
status:" << result.status.status_code
+ << ", error msg:" << ss.str();
+ } else {
+ is_report_success = true;
+ LOG(INFO) << "finish report " << TYPE_STRING(type) << ". master host: "
+ << _master_info.network_address.hostname
+ << ", port: " << _master_info.network_address.port;
+ }
+ switch (type) {
+ case TASK:
+ DorisMetrics::instance()->report_task_requests_total->increment(1);
+ if (!is_report_success) {
+
DorisMetrics::instance()->report_task_requests_failed->increment(1);
+ }
+ break;
+ case DISK:
+ DorisMetrics::instance()->report_disk_requests_total->increment(1);
+ if (!is_report_success) {
+
DorisMetrics::instance()->report_disk_requests_failed->increment(1);
+ }
+ break;
+ case TABLET:
+ DorisMetrics::instance()->report_tablet_requests_total->increment(1);
+ if (!is_report_success) {
+
DorisMetrics::instance()->report_tablet_requests_failed->increment(1);
+ }
+ break;
+ default:
+ break;
+ }
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index fa62b06..cfbd6df 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -72,6 +72,12 @@ public:
UPDATE_TABLET_META_INFO
};
+ enum ReportType {
+ TASK,
+ DISK,
+ TABLET
+ };
+
inline const std::string TYPE_STRING(TaskWorkerType type) {
switch (type) {
case CREATE_TABLE:
@@ -125,6 +131,19 @@ public:
}
}
+ inline const std::string TYPE_STRING(ReportType type) {
+ switch (type) {
+ case TASK:
+ return "TASK";
+ case DISK:
+ return "DISK";
+ case TABLET:
+ return "TABLET";
+ default:
+ return "Unknown";
+ }
+ }
+
TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env,
const TMasterInfo& master_info);
virtual ~TaskWorkerPool();
@@ -172,6 +191,7 @@ private:
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t
signature,
const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
+ void _handle_report(TReportRequest& request, ReportType type);
AgentStatus _get_tablet_info(const TTabletId tablet_id, const TSchemaHash
schema_hash,
int64_t signature, TTabletInfo* tablet_info);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9f19cf4..df81c8b 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -946,9 +946,9 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo*
tablet_info) {
return res;
}
-OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId,
TTablet>* tablets_info) {
+OLAPStatus TabletManager::build_all_report_tablets_info(std::map<TTabletId,
TTablet>* tablets_info) {
DCHECK(tablets_info != nullptr);
- LOG(INFO) << "begin to report all tablets info";
+ LOG(INFO) << "begin to build all report tablets info";
// build the expired txn map first, outside the tablet map lock
std::map<TabletInfo, std::vector<int64_t>> expire_txn_map;
@@ -985,7 +985,7 @@ OLAPStatus
TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
}
}
}
- LOG(INFO) << "success to report all tablets info. tablet_count=" <<
tablets_info->size();
+ LOG(INFO) << "success to build all report tablets info. tablet_count=" <<
tablets_info->size();
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index c988b20..bbb417d 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -115,7 +115,7 @@ public:
// OLAP_ERR_INPUT_PARAMETER_ERROR, if tables is null
OLAPStatus report_tablet_info(TTabletInfo* tablet_info);
- OLAPStatus report_all_tablets_info(std::map<TTabletId, TTablet>*
tablets_info);
+ OLAPStatus build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablets_info);
OLAPStatus start_trash_sweep();
// Prevent schema change executed concurrently.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]