chenhao7253886 closed pull request #451: Support showing io and cpu's
consumpation for query
URL: https://github.com/apache/incubator-doris/pull/451
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp
index 1a0e96dd..4ecf44ae 100644
--- a/be/src/exec/aggregation_node.cpp
+++ b/be/src/exec/aggregation_node.cpp
@@ -201,6 +201,8 @@ Status AggregationNode::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
+
+ _exec_info->add_cpu_consumpation(batch.num_rows());
// SCOPED_TIMER(_build_timer);
if (VLOG_ROW_IS_ON) {
for (int i = 0; i < batch.num_rows(); ++i) {
diff --git a/be/src/exec/analytic_eval_node.cpp
b/be/src/exec/analytic_eval_node.cpp
index b807152f..1b51f9b2 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -236,7 +236,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
while (!_input_eos && _prev_input_row == NULL) {
RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(),
&_input_eos));
-
+ _exec_info->add_cpu_consumpation(_curr_child_batch->num_rows());
if (_curr_child_batch->num_rows() > 0) {
_prev_input_row = _curr_child_batch->get_row(0);
process_child_batches(state);
@@ -612,6 +612,7 @@ Status
AnalyticEvalNode::process_child_batches(RuntimeState* state) {
_prev_child_batch->reset();
_prev_child_batch.swap(_curr_child_batch);
RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(),
&_input_eos));
+ _exec_info->add_cpu_consumpation(_curr_child_batch->num_rows());
}
return Status::OK;
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 840c875f..cf5bd6a8 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -228,6 +228,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state,
RowBatch* output_batc
break;
}
RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
+ _exec_info->add_cpu_consumpation(output_batch->num_rows());
}
_num_rows_returned += output_batch->num_rows();
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 8b4f8d15..bb64a5d1 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -186,7 +186,7 @@ Status ExecNode::prepare(RuntimeState* state) {
_mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(),
state->instance_mem_tracker()));
_expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get()));
_expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
-
+ _exec_info = state->register_current_exec_info(_id, _type);
// TODO chenhao
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(),
expr_mem_tracker()));
// TODO(zc):
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index bf2df357..cf183400 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -36,6 +36,7 @@ class Function;
namespace doris {
+class ExecNodeExecInfo;
class Expr;
class ExprContext;
class ObjectPool;
@@ -337,6 +338,8 @@ class ExecNode {
/// reservations pool in Close().
BufferPool::ClientHandle _buffer_pool_client;
+ ExecNodeExecInfo* _exec_info;
+
ExecNode* child(int i) {
return _children[i];
}
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 378f8f5a..646baf98 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -229,6 +229,7 @@ Status HashJoinNode::construct_hash_table(RuntimeState*
state) {
RETURN_IF_CANCELLED(state);
bool eos = true;
RETURN_IF_ERROR(child(1)->get_next(state, &build_batch, &eos));
+ _exec_info->add_cpu_consumpation(build_batch.num_rows());
SCOPED_TIMER(_build_timer);
// take ownership of tuple data of build_batch
_build_pool->acquire_data(build_batch.tuple_data_pool(), false);
@@ -380,6 +381,7 @@ Status HashJoinNode::open(RuntimeState* state) {
// seed probe batch and _current_probe_row, etc.
while (true) {
RETURN_IF_ERROR(child(0)->get_next(state, _probe_batch.get(),
&_probe_eos));
+ _exec_info->add_cpu_consumpation(_probe_batch->num_rows());
COUNTER_UPDATE(_probe_row_counter, _probe_batch->num_rows());
_probe_batch_pos = 0;
diff --git a/be/src/exec/new_partitioned_aggregation_node.cc
b/be/src/exec/new_partitioned_aggregation_node.cc
index da063f92..6a825295 100644
--- a/be/src/exec/new_partitioned_aggregation_node.cc
+++ b/be/src/exec/new_partitioned_aggregation_node.cc
@@ -308,7 +308,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState*
state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
-
+ _exec_info->add_cpu_consumpation(batch.num_rows());
if (UNLIKELY(VLOG_ROW_IS_ON)) {
for (int i = 0; i < batch.num_rows(); ++i) {
TupleRow* row = batch.get_row(i);
@@ -532,7 +532,7 @@ Status
NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(),
&child_eos_));
-
+ _exec_info->add_cpu_consumpation(child_batch_->num_rows());
SCOPED_TIMER(streaming_timer_);
int remaining_capacity[PARTITION_FANOUT];
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e3576b0d..5d65bdc6 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -426,6 +426,7 @@ void OlapScanner::update_counter() {
COUNTER_UPDATE(_parent->_decompressor_timer,
_reader->stats().decompress_ns);
COUNTER_UPDATE(_parent->_read_uncompressed_counter,
_reader->stats().uncompressed_bytes_read);
COUNTER_UPDATE(_parent->bytes_read_counter(), _reader->stats().bytes_read);
+ _parent->_exec_info->set_io_by_byte(_reader->stats().bytes_read);
COUNTER_UPDATE(_parent->_block_load_timer, _reader->stats().block_load_ns);
COUNTER_UPDATE(_parent->_block_load_counter, _reader->stats().blocks_load);
@@ -433,7 +434,7 @@ void OlapScanner::update_counter() {
COUNTER_UPDATE(_parent->_raw_rows_counter, _reader->stats().raw_rows_read);
// COUNTER_UPDATE(_parent->_filtered_rows_counter,
_reader->stats().num_rows_filtered);
-
+
COUNTER_UPDATE(_parent->_vec_cond_timer, _reader->stats().vec_cond_ns);
COUNTER_UPDATE(_parent->_rows_vec_cond_counter,
_reader->stats().rows_vec_cond_filtered);
diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp
index b962e5e3..b5627966 100644
--- a/be/src/exec/select_node.cpp
+++ b/be/src/exec/select_node.cpp
@@ -72,6 +72,7 @@ Status SelectNode::get_next(RuntimeState* state, RowBatch*
row_batch, bool* eos)
return Status::OK;
}
RETURN_IF_ERROR(child(0)->get_next(state, _child_row_batch.get(),
&_child_eos));
+ _exec_info->add_cpu_consumpation(_child_row_batch->num_rows());
}
if (copy_rows(row_batch)) {
diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp
index 71a42c85..61bec7ff 100644
--- a/be/src/exec/sort_node.cpp
+++ b/be/src/exec/sort_node.cpp
@@ -144,6 +144,7 @@ Status SortNode::sort_input(RuntimeState* state) {
do {
batch.reset();
RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
+ _exec_info->add_cpu_consumpation(batch.num_rows());
RETURN_IF_ERROR(_sorter->add_batch(&batch));
RETURN_IF_CANCELLED(state);
RETURN_IF_LIMIT_EXCEEDED(state);
diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp
index 5e07c3a3..535e9f9a 100644
--- a/be/src/exec/union_node.cpp
+++ b/be/src/exec/union_node.cpp
@@ -172,6 +172,7 @@ Status UnionNode::get_next_pass_through(RuntimeState*
state, RowBatch* row_batch
if (_child_eos) RETURN_IF_ERROR(child(_child_idx)->open(state));
DCHECK_EQ(row_batch->num_rows(), 0);
RETURN_IF_ERROR(child(_child_idx)->get_next(state, row_batch,
&_child_eos));
+ _exec_info->add_cpu_consumpation(row_batch->num_rows());
if (_child_eos) {
// Even though the child is at eos, it's not OK to close() it here.
Once we close
// the child, the row batches that it produced are invalid. Marking
the batch as
@@ -210,6 +211,7 @@ Status UnionNode::get_next_materialized(RuntimeState*
state, RowBatch* row_batch
// The first batch from each child is always fetched here.
RETURN_IF_ERROR(child(_child_idx)->get_next(
state, _child_batch.get(), &_child_eos));
+ _exec_info->add_cpu_consumpation(_child_batch->num_rows());
}
while (!row_batch->at_capacity()) {
@@ -224,6 +226,7 @@ Status UnionNode::get_next_materialized(RuntimeState*
state, RowBatch* row_batch
// All batches except the first batch from each child are
fetched here.
RETURN_IF_ERROR(child(_child_idx)->get_next(
state, _child_batch.get(), &_child_eos));
+ _exec_info->add_cpu_consumpation(_child_batch->num_rows());
// If we fetched an empty batch, go back to the beginning of
this while loop, and
// try again.
if (_child_batch->num_rows() == 0) continue;
diff --git a/be/src/runtime/exec_node_exec_info.h
b/be/src/runtime/exec_node_exec_info.h
new file mode 100644
index 00000000..ce97e8d1
--- /dev/null
+++ b/be/src/runtime/exec_node_exec_info.h
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_QUERY_RUNTIME_CURRENT_EXEC_QUERY_INFO_H
+#define DORIS_BE_SRC_QUERY_RUNTIME_CURRENT_EXEC_QUERY_INFO_H
+
+#include "common/atomic.h"
+#include "gen_cpp/PlanNodes_types.h"
+
+namespace doris {
+
+// Privide query's execution informations for current query execution,
+// to help locate which query causes cluster overload. Now the data size
+// read by ScanNode, as a measure of IO and row number of data processed
+// by ExecNode as a measure of CPU.
+// TODO ch, for more detail accurate cpu consumption, the times of comparing
+// in the calculation of all ExecNode can be actted as a measure of CPU.
+class ExecNodeExecInfo {
+public:
+ ExecNodeExecInfo() {
+ }
+
+ ExecNodeExecInfo(int id, TPlanNodeType::type type)
+ : id(id), type(type) {
+ }
+
+ void set_cpu_consumpation(long cpu_consumpation) {
+ this->cpu.store(cpu_consumpation);
+ }
+
+ void add_cpu_consumpation(long added) {
+ this->cpu.add(added);
+ }
+
+ int64_t get_cpu_consumpation() {
+ return this->cpu.load();
+ }
+
+ // only for scannode
+ void set_io_by_byte(long raw_data) {
+ this->io.store(raw_data);
+ }
+
+ int64_t get_io_by_byte() {
+ return io.load();
+ }
+
+ int get_id() {
+ return id;
+ }
+
+ TPlanNodeType::type get_type() {
+ return type;
+ }
+private:
+ // ExecNode Id
+ int id;
+ // ExecNode type
+ TPlanNodeType::type type;
+ // Row number of data processed by ExecNode, as a measure of CPU.
+ AtomicInt64 cpu;
+ // The data size readed by ScanNode in byte units, as a measure of IO.
+ AtomicInt64 io;
+};
+
+}
+#endif
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 42efed7f..a7496bfa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -247,55 +247,72 @@ void FragmentExecState::coordinator_callback(
}
TReportExecStatusParams params;
- params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(_query_id);
params.__set_backend_num(_backend_num);
params.__set_fragment_instance_id(_fragment_instance_id);
exec_status.set_t_status(¶ms);
params.__set_done(done);
- profile->to_thrift(¶ms.profile);
- params.__isset.profile = true;
-
- RuntimeState* runtime_state = _executor.runtime_state();
- if (!runtime_state->output_files().empty()) {
- params.__isset.delta_urls = true;
- for (auto& it : runtime_state->output_files()) {
- params.delta_urls.push_back(to_http_path(it));
+ if (profile != nullptr) {
+ params.protocol_version = FrontendServiceVersion::V1;
+ profile->to_thrift(¶ms.profile);
+ params.__isset.profile = true;
+
+ RuntimeState* runtime_state = _executor.runtime_state();
+ if (!runtime_state->output_files().empty()) {
+ params.__isset.delta_urls = true;
+ for (auto& it : runtime_state->output_files()) {
+ params.delta_urls.push_back(to_http_path(it));
+ }
}
- }
- if (runtime_state->num_rows_load_success() > 0 ||
- runtime_state->num_rows_load_filtered() > 0) {
- params.__isset.load_counters = true;
- // TODO(zc)
- static std::string s_dpp_normal_all = "dpp.norm.ALL";
- static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
-
- params.load_counters.emplace(
- s_dpp_normal_all,
std::to_string(runtime_state->num_rows_load_success()));
- params.load_counters.emplace(
- s_dpp_abnormal_all,
std::to_string(runtime_state->num_rows_load_filtered()));
- }
- if (!runtime_state->get_error_log_file_path().empty()) {
- params.__set_tracking_url(
-
to_load_error_http_path(runtime_state->get_error_log_file_path()));
- }
- if (!runtime_state->export_output_files().empty()) {
- params.__isset.export_files = true;
- params.export_files = runtime_state->export_output_files();
- }
- if (!runtime_state->tablet_commit_infos().empty()) {
- params.__isset.commitInfos = true;
-
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
- for (auto& info : runtime_state->tablet_commit_infos()) {
- params.commitInfos.push_back(info);
+ if (runtime_state->num_rows_load_success() > 0 ||
+ runtime_state->num_rows_load_filtered() > 0) {
+ params.__isset.load_counters = true;
+ // TODO(zc)
+ static std::string s_dpp_normal_all = "dpp.norm.ALL";
+ static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
+
+ params.load_counters.emplace(
+ s_dpp_normal_all,
std::to_string(runtime_state->num_rows_load_success()));
+ params.load_counters.emplace(
+ s_dpp_abnormal_all,
std::to_string(runtime_state->num_rows_load_filtered()));
}
- }
- DCHECK(runtime_state != NULL);
-
- // Send new errors to coordinator
- runtime_state->get_unreported_errors(&(params.error_log));
- params.__isset.error_log = (params.error_log.size() > 0);
+ if (!runtime_state->get_error_log_file_path().empty()) {
+ params.__set_tracking_url(
+
to_load_error_http_path(runtime_state->get_error_log_file_path()));
+ }
+ if (!runtime_state->export_output_files().empty()) {
+ params.__isset.export_files = true;
+ params.export_files = runtime_state->export_output_files();
+ }
+ if (!runtime_state->tablet_commit_infos().empty()) {
+ params.__isset.commitInfos = true;
+
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
+ for (auto& info : runtime_state->tablet_commit_infos()) {
+ params.commitInfos.push_back(info);
+ }
+ }
+ DCHECK(runtime_state != NULL);
+ // Send new errors to coordinator
+ runtime_state->get_unreported_errors(&(params.error_log));
+ params.__isset.error_log = (params.error_log.size() > 0);
+ } else {
+ // instance execution consumpation report
+ params.protocol_version = FrontendServiceVersion::V2;
+ std::map<int, ExecNodeExecInfo*>* exec_infos =
_executor.runtime_state()->get_exec_node_exec_info();
+ std::map<int, ExecNodeExecInfo*>::iterator iterator =
exec_infos->begin();
+ int64_t io_by_byte = 0;
+ int64_t cpu_consumpation = 0;
+ for (; iterator != exec_infos->end(); iterator++) {
+ ExecNodeExecInfo* exec_info = iterator->second;
+ io_by_byte += exec_info->get_io_by_byte();
+ cpu_consumpation += exec_info->get_cpu_consumpation();
+ }
+ params.io_by_byte = io_by_byte;
+ params.__isset.io_by_byte = true;
+ params.cpu_consumpation = cpu_consumpation;
+ params.__isset.cpu_consumpation = true;
+ }
TReportExecStatusResult res;
Status rpc_status;
@@ -314,7 +331,6 @@ void FragmentExecState::coordinator_callback(
_executor.cancel();
return;
}
-
coord->reportExecStatus(res, params);
}
@@ -510,12 +526,13 @@ Status
FragmentMgr::fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* res
TUniqueId id;
id.__set_hi(p_fragment_id.hi());
id.__set_lo(p_fragment_id.lo());
- PFragmentExecInfo* info = result->add_fragment_exec_info();
+ PInstanceExecInfo* info = result->add_instance_exec_info();
PUniqueId* finst_id = info->mutable_finst_id();
finst_id->set_hi(p_fragment_id.hi());
finst_id->set_lo(p_fragment_id.lo());
bool is_running = false;
+ std::map<int, ExecNodeExecInfo> exec_infos;
std::lock_guard<std::mutex> lock(_lock);
{
auto iter = _fragment_map.find(id);
@@ -524,6 +541,7 @@ Status
FragmentMgr::fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* res
continue;
}
is_running =
iter->second->executor()->runtime_state()->is_running();
+
iter->second->executor()->runtime_state()->get_current_exec_info(&exec_infos);
}
if (is_running) {
@@ -531,6 +549,16 @@ Status
FragmentMgr::fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* res
} else {
info->set_exec_status(PFragmentExecStatus::WAIT);
}
+
+ auto iter = exec_infos.begin();
+ for (;iter != exec_infos.end(); iter++) {
+ ExecNodeExecInfo& exec_info = iter->second;
+ PPlanNodeExecInfo* p_plan_node_exec_info =
info->add_plannode_exec_info();
+ p_plan_node_exec_info->set_id(exec_info.get_id());
+ p_plan_node_exec_info->set_type(exec_info.get_type());
+ p_plan_node_exec_info->set_io_by_byte(exec_info.get_io_by_byte());
+
p_plan_node_exec_info->set_cpu_consumpation(exec_info.get_cpu_consumpation());
+ }
}
return Status::OK;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c911fe30..5eb67ec5 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -30,6 +30,7 @@
#include "util/thread_pool.hpp"
#include "util/hash_util.hpp"
#include "http/rest_monitor_iface.h"
+#include "runtime/exec_node_exec_info.h"
namespace doris {
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 401b8254..4b1c5dea 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -409,6 +409,7 @@ void PlanFragmentExecutor::send_report(bool done) {
}
if (!_is_report_success && done && status.ok()) {
+ _report_status_cb(status, nullptr, done || !status.ok());
return;
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 7e2ecf58..fb9a8d44 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -83,6 +83,9 @@ RuntimeState::RuntimeState(const std::string& now)
}
RuntimeState::~RuntimeState() {
+
+ release_current_exec_info();
+
_block_mgr.reset();
_block_mgr2.reset();
// close error log file
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7a413db1..96451bb4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -33,6 +33,8 @@
#include "common/global_types.h"
#include "util/logging.h"
+#include "util/spinlock.h"
+#include "runtime/exec_node_exec_info.h"
#include "runtime/mem_pool.h"
#include "runtime/thread_resource_mgr.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
@@ -488,6 +490,39 @@ class RuntimeState {
return _is_running;
}
+ std::map<int, ExecNodeExecInfo*>* get_exec_node_exec_info() {
+ return &_exec_infos;
+ }
+
+ // get all ExecNodes resource consumption in Fragment.
+ void get_current_exec_info(std::map<int, ExecNodeExecInfo>* info) {
+ _exec_infos_lock.lock();
+ std::map<int, ExecNodeExecInfo*>::iterator iterator
+ = _exec_infos.begin();
+ while (iterator != _exec_infos.end()) {
+ (*info)[iterator->first] = *(iterator->second);
+ iterator++;
+ }
+ _exec_infos_lock.unlock();
+ }
+
+ // This is used by ExecNode to register a ExecNodeExecInfo to record
resource consumption.
+ // RuntimeState is responsible for releasing ExecNodeExecInfo in
release_current_exec_info.
+ ExecNodeExecInfo* register_current_exec_info(int plan_node_id,
TPlanNodeType::type type) {
+ _exec_infos_lock.lock();
+ std::map<int, ExecNodeExecInfo*>::iterator iterator
+ = _exec_infos.find(plan_node_id);
+ ExecNodeExecInfo* info;
+ if (iterator == _exec_infos.end()) {
+ info = new ExecNodeExecInfo(plan_node_id, type);
+ _exec_infos[plan_node_id] = info;
+ } else {
+ info = iterator->second;
+ }
+ _exec_infos_lock.unlock();
+ return info;
+ }
+
private:
// Allow TestEnv to set block_mgr manually for testing.
friend class TestEnv;
@@ -503,6 +538,15 @@ class RuntimeState {
Status create_error_log_file();
+ // clear ExecNodeExecInfo in destructor
+ void release_current_exec_info() {
+ std::map<int, ExecNodeExecInfo*>::iterator iterator =
_exec_infos.begin();
+ for (;iterator != _exec_infos.end(); iterator++) {
+ delete iterator->second;
+ _exec_infos.erase(iterator);
+ }
+ }
+
static const int DEFAULT_BATCH_SIZE = 2048;
DescriptorTbl* _desc_tbl;
@@ -634,6 +678,10 @@ class RuntimeState {
/// TODO: not needed if we call ReleaseResources() in a timely manner
(IMPALA-1575).
AtomicInt32 _initial_reservation_refcnt;
+ // collect infos for show proc "/current_queries"
+ SpinLock _exec_infos_lock;
+ std::map<int, ExecNodeExecInfo*> _exec_infos;
+
// prohibit copies
RuntimeState(const RuntimeState&);
};
diff --git
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
index 2b39b87e..d6eb0cc7 100644
---
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
+++
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
@@ -17,44 +17,28 @@
package org.apache.doris.common.proc;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PFetchFragmentExecInfoRequest;
-import org.apache.doris.rpc.PFetchFragmentExecInfosResult;
-import org.apache.doris.rpc.PFragmentExecInfo;
-import org.apache.doris.rpc.PUniqueId;
-import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/*
* show proc "/current_queries/{query_id}/fragments"
*/
public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
- private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("FragmentId").add("InstanceId").add("Host").add("ExecState").build();
+ .add("FragmentId").add("InstanceId").add("Host").add("ExecState")
+ .add("IO(Running PlanNode)").add("CPU(Running PlanNode)").build();
+ private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
private QueryStatisticsItem item;
public CurrentQueryFragmentProcNode(QueryStatisticsItem item) {
@@ -85,88 +69,31 @@ private TNetworkAddress toBrpcHost(TNetworkAddress host)
throws AnalysisExceptio
private ProcResult requestFragmentExecInfos() throws AnalysisException {
- // create request and remove redundant rpc
- final Map<TNetworkAddress, Request> requestMap = Maps.newHashMap();
- for (QueryStatisticsItem.FragmentInstanceInfo info :
item.getFragmentInstanceInfos()) {
- final TNetworkAddress brpcNetAddress;
- try {
- brpcNetAddress = toBrpcHost(info.getAddress());
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- throw new AnalysisException(e.getMessage());
- }
- Request request = requestMap.get(brpcNetAddress);
- if (request == null) {
- request = new Request(brpcNetAddress);
- requestMap.put(brpcNetAddress, request);
- }
- request.addInstanceId(info.getFragmentId(), new
PUniqueId(info.getInstanceId()));
- }
-
- // send request
- final List<Pair<Request, Future<PFetchFragmentExecInfosResult>>>
futures = Lists.newArrayList();
- for (TNetworkAddress address : requestMap.keySet()) {
- final Request request = requestMap.get(address);
- final PFetchFragmentExecInfoRequest pbRequest =
- new PFetchFragmentExecInfoRequest(request.getInstanceId());
- try {
- futures.add(Pair.create(request,
BackendServiceProxy.getInstance().
- fetchFragmentExecInfosAsync(address, pbRequest)));
- } catch (RpcException e) {
- throw new AnalysisException("exec rpc error");
- }
- }
-
+ final CurrentQueryInfoProvider provider = new
CurrentQueryInfoProvider();
+ final CurrentQueryInfoProvider.QueryExecInfo execInfo =
provider.getQueryExecInfoFromRemote(item);
final List<List<String>> sortedRowDatas = Lists.newArrayList();
- // get result
- for (Pair<Request, Future<PFetchFragmentExecInfosResult>> pair :
futures) {
- TStatusCode code;
- String errMsg = null;
- try {
- final PFetchFragmentExecInfosResult fragmentExecInfoResult
- = pair.second.get(10, TimeUnit.SECONDS);
- code =
TStatusCode.findByValue(fragmentExecInfoResult.status.code);
- if (fragmentExecInfoResult.status.msgs != null
- && !fragmentExecInfoResult.status.msgs.isEmpty()) {
- errMsg = fragmentExecInfoResult.status.msgs.get(0);
- }
- if (errMsg == null) {
- for (PFragmentExecInfo info :
fragmentExecInfoResult.execInfos) {
- final List<String> rowData = Lists.newArrayList();
-
rowData.add(pair.first.getFragmentId(info.instanceId).toString());
- rowData.add(DebugUtil.printId(info.instanceId));
- rowData.add(pair.first.getAddress().getHostname());
- rowData.add(getFragmentExecState(info.execStatus));
- sortedRowDatas.add(rowData);
- }
+ for (CurrentQueryInfoProvider.FragmentExecInfo fragmentExecInfo :
+ execInfo.getFragmentExecInfo()) {
+ for (CurrentQueryInfoProvider.InstanceExecInfo instanceExecInfo :
+ fragmentExecInfo.getInstanceExecInfo()) {
+ long ioByByte = 0;
+ long cpuConsumpation = 0;
+ for (CurrentQueryInfoProvider.PlanNodeExecInfo
planNodeExecInfo :
+ instanceExecInfo.getPlanNodeExecInfo()) {
+ ioByByte += planNodeExecInfo.getIoByByte();
+ cpuConsumpation += planNodeExecInfo.getCpuConsumpation();
}
- } catch (ExecutionException e) {
- LOG.warn("catch a execute exception", e);
- code = TStatusCode.THRIFT_RPC_ERROR;
- } catch (InterruptedException e) {
- LOG.warn("catch a interrupt exception", e);
- code = TStatusCode.INTERNAL_ERROR;
- } catch (TimeoutException e) {
- LOG.warn("catch a timeout exception", e);
- code = TStatusCode.TIMEOUT;
- }
-
- if (code != TStatusCode.OK) {
- switch (code) {
- case TIMEOUT:
- errMsg = "query timeout";
- break;
- case THRIFT_RPC_ERROR:
- errMsg = "rpc failed";
- break;
- default:
- errMsg = "exec rpc error";
- }
- throw new AnalysisException(errMsg);
+ final List<String> rowData = Lists.newArrayList();
+ rowData.add(fragmentExecInfo.getFragmentId());
+ rowData.add(instanceExecInfo.getInstanceId());
+ rowData.add(instanceExecInfo.getHost());
+ rowData.add(instanceExecInfo.getExecState());
+ rowData.add(String.valueOf(ioByByte));
+ rowData.add(String.valueOf(cpuConsumpation));
+ sortedRowDatas.add(rowData);
}
}
-
// sort according to explain's fragment index
sortedRowDatas.sort(new Comparator<List<String>>() {
@Override
@@ -182,51 +109,4 @@ public int compare(List<String> l1, List<String> l2) {
return result;
}
- private enum FragmentExecState {
- RUNNING,
- WAIT,
- FINISHED,
- NONE
- }
-
- private String getFragmentExecState(int i) {
- if (i >= FragmentExecState.values().length) {
- // can't run here
- LOG.warn("Fetch uncorrect instance state.");
- return FragmentExecState.NONE.toString();
- }
- return FragmentExecState.values()[i].toString();
- }
-
- private static class Request {
- private final TNetworkAddress address;
- private List<String> fragmentIds;
- private List<PUniqueId> instanceIds;
- private Map<String, String> instanceIdToFragmentId;
-
- public Request(TNetworkAddress address) {
- this.address = address;
- this.fragmentIds = Lists.newArrayList();
- this.instanceIds = Lists.newArrayList();
- this.instanceIdToFragmentId = Maps.newHashMap();
- }
-
- public TNetworkAddress getAddress() {
- return address;
- }
-
- public List<PUniqueId> getInstanceId() {
- return instanceIds;
- }
-
- public void addInstanceId(String fragmentId, PUniqueId instanceId) {
- this.fragmentIds.add(fragmentId);
- this.instanceIds.add(instanceId);
- this.instanceIdToFragmentId.put(DebugUtil.printId(instanceId),
fragmentId.toString());
- }
-
- public String getFragmentId(PUniqueId id) {
- return instanceIdToFragmentId.get(DebugUtil.printId(id));
- }
- }
}
diff --git
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
new file mode 100644
index 00000000..52869bd6
--- /dev/null
+++
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -0,0 +1,335 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.qe.QueryStatisticsItem;
+import org.apache.doris.rpc.*;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Provide running query's PlanNode informations, includeing execution State
+ * , IO consumpation and CPU consumpation.
+ */
+public class CurrentQueryInfoProvider {
+ private static final Logger LOG =
LogManager.getLogger(CurrentQueryInfoProvider.class);
+
+ public CurrentQueryInfoProvider() {
+ }
+
+ public QueryExecInfo getQueryExecInfoFromRemote(QueryStatisticsItem item)
throws AnalysisException {
+ return
getQueryExecInfoFromRemote(Lists.newArrayList(item)).iterator().next();
+ }
+
+ public Collection<QueryExecInfo>
getQueryExecInfoFromRemote(Collection<QueryStatisticsItem> items) throws
AnalysisException {
+ final Map<TNetworkAddress, Request> requestMap = Maps.newHashMap();
+ final Map<PUniqueId, QueryExecInfo> queryExecInfoMap =
Maps.newHashMap();
+ final Map<TNetworkAddress, TNetworkAddress> brpcAddressMap =
Maps.newHashMap();
+ for (QueryStatisticsItem item : items) {
+ final QueryExecInfo queryExecInfo = new
QueryExecInfo(item.getQueryId());
+ for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo :
item.getFragmentInstanceInfos()) {
+ // use brpc address
+ TNetworkAddress brpcNetAddress =
brpcAddressMap.get(instanceInfo.getAddress());
+ if (brpcNetAddress == null) {
+ try {
+ brpcNetAddress = toBrpcHost(instanceInfo.getAddress());
+ brpcAddressMap.put(instanceInfo.getAddress(),
brpcNetAddress);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage());
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+ // merge different queries's requests
+ Request request = requestMap.get(brpcNetAddress);
+ if (request == null) {
+ request = new Request(brpcNetAddress);
+ requestMap.put(brpcNetAddress, request);
+ }
+ final PUniqueId pUId = new
PUniqueId(instanceInfo.getInstanceId());
+ request.addInstanceId(pUId);
+
+ // map instance to QueryExecInfo and fragment to instance
+
Preconditions.checkArgument(queryExecInfoMap.get(instanceInfo.getInstanceId())
== null);
+ queryExecInfoMap.put(new
PUniqueId(instanceInfo.getInstanceId()), queryExecInfo);
+
queryExecInfo.addMappingInstanceToFragment(instanceInfo.getFragmentId(), pUId);
+ }
+ }
+ return recvResponse(queryExecInfoMap, sendRequest(requestMap));
+ }
+
+ private List<Pair<Request, Future<PFetchFragmentExecInfosResult>>>
sendRequest(
+ Map<TNetworkAddress, Request> requestMap) throws AnalysisException
{
+ final List<Pair<Request, Future<PFetchFragmentExecInfosResult>>>
futures = Lists.newArrayList();
+ for (TNetworkAddress address : requestMap.keySet()) {
+ final Request request = requestMap.get(address);
+ final PFetchFragmentExecInfoRequest pbRequest =
+ new
PFetchFragmentExecInfoRequest(request.getAllInstanceIds());
+ try {
+ futures.add(Pair.create(request,
BackendServiceProxy.getInstance().
+ fetchFragmentExecInfosAsync(address, pbRequest)));
+ } catch (RpcException e) {
+ throw new AnalysisException("Sending request fails for query's
execution informations.");
+ }
+ }
+ return futures;
+ }
+
+ private void fillQueryExecInfo(TNetworkAddress address,
+ PFetchFragmentExecInfosResult
fragmentExecInfoResult,
+ Map<PUniqueId, QueryExecInfo>
queryExecInfoMap) {
+ for (PInstanceExecInfo info : fragmentExecInfoResult.execInfos) {
+ final QueryExecInfo queryExecInfo
+ = queryExecInfoMap.get(info.instanceId);
+ final FragmentExecInfo fragmentExecInfo
+ = queryExecInfo.getFragmentWithInstance(info.instanceId);
+ final InstanceExecInfo instanceExecInfo
+ = fragmentExecInfo.createInstance(info.instanceId,
+ FragmentExecState.values()[info.execStatus],
address.toString());
+ if (info.execInfos != null) {
+ for (PPlanNodeExecInfo pPlanNodeExecInfo : info.execInfos) {
+ final PlanNodeExecInfo planNodeExecInfo =
+ new PlanNodeExecInfo(
+ pPlanNodeExecInfo.id,
+ pPlanNodeExecInfo.type,
+ pPlanNodeExecInfo.ioByByte,
+ pPlanNodeExecInfo.cpuConsumpation);
+ instanceExecInfo.addPlanNode(planNodeExecInfo);
+ }
+ }
+ }
+ }
+
+ private Collection<QueryExecInfo> recvResponse(Map<PUniqueId,
QueryExecInfo> queryExecInfoMap,
+ List<Pair<Request,
Future<PFetchFragmentExecInfosResult>>> futures)
+ throws AnalysisException {
+ final String reasonPrefix = "Fail to receive result.";
+ for (Pair<Request, Future<PFetchFragmentExecInfosResult>> pair :
futures) {
+ try {
+ final PFetchFragmentExecInfosResult result
+ = pair.second.get(10, TimeUnit.SECONDS);
+ TStatusCode code = TStatusCode.findByValue(result.status.code);
+ String errMsg = null;
+ if (result.status.msgs != null
+ && !result.status.msgs.isEmpty()) {
+ errMsg = result.status.msgs.get(0);
+ }
+
+ if (code == TStatusCode.OK && errMsg == null) {
+ fillQueryExecInfo(pair.first.address, result,
queryExecInfoMap);
+ } else {
+ LOG.warn(reasonPrefix + " reason:" + " code:" + errMsg +
code);
+ throw new AnalysisException(reasonPrefix);
+ }
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ LOG.warn(reasonPrefix + " reason:" + e);
+ throw new AnalysisException(reasonPrefix);
+ }
+
+ }
+ return queryExecInfoMap.values();
+ }
+
+ private TNetworkAddress toBrpcHost(TNetworkAddress host) throws
AnalysisException {
+ final Backend backend =
Catalog.getCurrentSystemInfo().getBackendWithBePort(
+ host.getHostname(), host.getPort());
+ if (backend == null) {
+ throw new AnalysisException(new StringBuilder("Backend ")
+ .append(host.getHostname())
+ .append(":")
+ .append(host.getPort())
+ .append(" does not exist")
+ .toString());
+ }
+ if (backend.getBrpcPort() < 0) {
+ throw new AnalysisException("BRPC port is't exist.");
+ }
+ return new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+ }
+
+ private enum FragmentExecState {
+ RUNNING,
+ WAIT,
+ FINISHED,
+ NONE
+ }
+
+ public static class QueryExecInfo {
+ private final String queryId;
+ private final Map<PUniqueId, String> instanceIdToFragmentId;
+ private final Map<String, FragmentExecInfo> fragmentExecInfoMap;
+
+ public QueryExecInfo(String queryId) {
+ this.queryId = queryId;
+ this.fragmentExecInfoMap = Maps.newHashMap();
+ this.instanceIdToFragmentId = Maps.newHashMap();
+ }
+
+ public void addMappingInstanceToFragment(String fragmentId, PUniqueId
instanceId) {
+ this.instanceIdToFragmentId.put(instanceId, fragmentId.toString());
+ FragmentExecInfo info = fragmentExecInfoMap.get(fragmentId);
+ if (info == null) {
+ info = new FragmentExecInfo(fragmentId);
+ fragmentExecInfoMap.put(fragmentId, info);
+ }
+ }
+
+ public FragmentExecInfo getFragmentWithInstance(PUniqueId pUId) {
+ final String instanceId = this.instanceIdToFragmentId.get(pUId);
+ return fragmentExecInfoMap.get(instanceId);
+ }
+
+ public Collection<FragmentExecInfo> getFragmentExecInfo() {
+ return fragmentExecInfoMap.values();
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+ }
+
+ public static class FragmentExecInfo {
+ private final String fragmentId;
+ private final Map<PUniqueId, InstanceExecInfo> instanceExecInfoMap;
+
+ public FragmentExecInfo(String fragmentId) {
+ this.fragmentId = fragmentId;
+ this.instanceExecInfoMap = Maps.newHashMap();
+ }
+
+ public String getFragmentId() {
+ return fragmentId;
+ }
+
+ public InstanceExecInfo createInstance(PUniqueId instanceId,
FragmentExecState execState, String host) {
+ InstanceExecInfo instanceExecInfo =
instanceExecInfoMap.get(instanceId);
+ if (instanceExecInfo == null) {
+ instanceExecInfo = new InstanceExecInfo(instanceId.toString(),
execState, host);
+ instanceExecInfoMap.put(instanceId, instanceExecInfo);
+ }
+ return instanceExecInfo;
+ }
+
+ public Collection<InstanceExecInfo> getInstanceExecInfo() {
+ return instanceExecInfoMap.values();
+ }
+ }
+
+ public static class InstanceExecInfo {
+ private final String instanceId;
+ private final FragmentExecState execState;
+ private final List<PlanNodeExecInfo> planNodeExecInfos;
+ private final String host;
+
+ public InstanceExecInfo(String instanceId, FragmentExecState
execState, String host) {
+ this.instanceId = instanceId;
+ this.execState = execState;
+ this.planNodeExecInfos = Lists.newArrayList();
+ this.host = host;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public String getExecState() {
+ return execState.name();
+ }
+
+ void addPlanNode(PlanNodeExecInfo info) {
+ this.planNodeExecInfos.add(info);
+ }
+
+ public Collection<PlanNodeExecInfo> getPlanNodeExecInfo() {
+ return planNodeExecInfos;
+ }
+ }
+
+ public static class PlanNodeExecInfo {
+ private final int id;
+ private final int type;
+ private final long ioByByte;
+ private final long cpuConsumpation;
+
+ public PlanNodeExecInfo(int id, int type, long ioByByte, long
cpuConsumpation) {
+ this.id = id;
+ this.type = type;
+ this.ioByByte = ioByByte;
+ this.cpuConsumpation = cpuConsumpation;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public long getIoByByte() {
+ return ioByByte;
+ }
+
+ public long getCpuConsumpation() {
+ return cpuConsumpation;
+ }
+ }
+
+ private static class Request {
+ private final TNetworkAddress address;
+ private final List<PUniqueId> instanceIds;
+
+ public Request(TNetworkAddress address) {
+ this.address = address;
+ this.instanceIds = Lists.newArrayList();
+ }
+
+ public TNetworkAddress getAddress() {
+ return address;
+ }
+
+ public List<PUniqueId> getAllInstanceIds() {
+ return instanceIds;
+ }
+
+ public void addInstanceId(PUniqueId pUId) {
+ this.instanceIds.add(pUId);
+ }
+ }
+}
diff --git
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
index 566a4831..7059b704 100644
---
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
+++
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
@@ -17,26 +17,32 @@
package org.apache.doris.common.proc;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.qe.QueryStatisticsItem;
-
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QueryStatisticsItem;
+import org.apache.doris.thrift.TPlanNodeType;
+import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/*
* show proc "/current_queries"
*/
public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
+ private static final Logger LOG =
LogManager.getLogger(CurrentQueryStatisticsProcDir.class);
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add("ConnectionId").add("QueryId").add("Database").add("User").add("ExecTime").build();
+ .add("ConnectionId").add("QueryId").add("Database").add("User")
+ .add("IO(Running PlanNode)").add("CPU(Running
PlanNode)").add("ExecTime").build();
- private static final int EXEC_TIME_INDEX = 3;
+ private static final int EXEC_TIME_INDEX = 6;
@Override
public boolean register(String name, ProcNodeInterface node) {
@@ -61,6 +67,7 @@ public ProcResult fetchResult() throws AnalysisException {
final BaseProcResult result = new BaseProcResult();
final Map<String, QueryStatisticsItem> statistic =
QeProcessorImpl.INSTANCE.getQueryStatistics();
result.setNames(TITLE_NAMES.asList());
+ final Map<String, QueryExecStatistic> queryScanRowCountMap =
getQueryExecInfo();
final List<List<String>> sortedRowData = Lists.newArrayList();
for (QueryStatisticsItem item : statistic.values()) {
final List<String> values = Lists.newArrayList();
@@ -68,6 +75,9 @@ public ProcResult fetchResult() throws AnalysisException {
values.add(item.getQueryId());
values.add(item.getDb());
values.add(item.getUser());
+ final QueryExecStatistic execStatistic =
queryScanRowCountMap.get(item.getQueryId());
+ values.add(String.valueOf(execStatistic.getIoByByte()));
+ values.add(String.valueOf(execStatistic.getCpuConsumpation()));
values.add(item.getQueryExecTime());
sortedRowData.add(values);
}
@@ -83,4 +93,43 @@ public int compare(List<String> l1, List<String> l2) {
result.setRows(sortedRowData);
return result;
}
+
+ private Map<String, QueryExecStatistic> getQueryExecInfo() throws
AnalysisException {
+ final CurrentQueryInfoProvider provider = new
CurrentQueryInfoProvider();
+ final Collection<CurrentQueryInfoProvider.QueryExecInfo> execInfos =
+
provider.getQueryExecInfoFromRemote(QeProcessorImpl.INSTANCE.getQueryStatistics().values());
+ final Map<String, QueryExecStatistic> queryScanRowCountMap =
Maps.newHashMap();
+ for (CurrentQueryInfoProvider.QueryExecInfo execInfo : execInfos) {
+ long ioByByte = 0;
+ long cpuConsumpation = 0;
+ for (CurrentQueryInfoProvider.FragmentExecInfo fragmentExecInfo :
execInfo.getFragmentExecInfo()) {
+ for (CurrentQueryInfoProvider.InstanceExecInfo
instanceExecInfo : fragmentExecInfo.getInstanceExecInfo()) {
+ for (CurrentQueryInfoProvider.PlanNodeExecInfo
planNodeExecInfo : instanceExecInfo.getPlanNodeExecInfo()) {
+ ioByByte += planNodeExecInfo.getIoByByte();
+ cpuConsumpation +=
planNodeExecInfo.getCpuConsumpation();
+ }
+ }
+ }
+ queryScanRowCountMap.put(execInfo.getQueryId(), new
QueryExecStatistic(ioByByte, cpuConsumpation));
+ }
+ return queryScanRowCountMap;
+ }
+
+ private static class QueryExecStatistic {
+ private final long ioByByte;
+ private final long cpuConsumpation;
+
+ public QueryExecStatistic(long ioByByte, long cpuConsumpation) {
+ this.ioByByte = ioByByte;
+ this.cpuConsumpation = cpuConsumpation;
+ }
+
+ public long getIoByByte() {
+ return ioByByte;
+ }
+
+ public long getCpuConsumpation() {
+ return cpuConsumpation;
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 560d538b..15d9ade7 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -92,12 +92,16 @@ private void handlePing() {
ctx.getState().setOk();
}
- private void auditAfterExec(String origStmt, StatementBase parsedStmt) {
+ private void auditAfterExec(String origStmt, StatementBase parsedStmt,
QeProcessorImpl.QueryInfo queryInfo) {
// slow query
long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
// query state log
ctx.getAuditBuilder().put("state", ctx.getState());
ctx.getAuditBuilder().put("time", elapseMs);
+ if (queryInfo != null) {
+ ctx.getAuditBuilder().put("IO", queryInfo.getIoByByte());
+ ctx.getAuditBuilder().put("CPU", queryInfo.getCpuConsumpation());
+ }
ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows());
ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId());
@@ -154,11 +158,12 @@ private void handleQuery() {
ctx.getAuditBuilder().put("user", ctx.getQualifiedUser());
ctx.getAuditBuilder().put("db", ctx.getDatabase());
+ QeProcessorImpl.QueryInfo queryInfo = null;
// execute this query.
try {
executor = new StmtExecutor(ctx, stmt);
ctx.setExecutor(executor);
- executor.execute();
+ queryInfo = executor.execute();
// set if this is a QueryStmt
ctx.getState().setQuery(executor.isQueryStmt());
} catch (DdlException e) {
@@ -177,7 +182,7 @@ private void handleQuery() {
// audit after exec
// replace '\n' to '\\n' to make string in one line
- auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt());
+ auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(),
queryInfo);
}
// Get the column definitions of a table
diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessor.java
b/fe/src/main/java/org/apache/doris/qe/QeProcessor.java
index a94d4fd4..bd087a66 100644
--- a/fe/src/main/java/org/apache/doris/qe/QeProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/QeProcessor.java
@@ -32,7 +32,7 @@
void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info)
throws UserException;
- void unregisterQuery(TUniqueId queryId);
+ QeProcessorImpl.QueryInfo unregisterQuery(TUniqueId queryId);
Map<String, QueryStatisticsItem> getQueryStatistics();
}
diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 1af14eb0..6110eef5 100644
--- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -18,6 +18,8 @@
package org.apache.doris.qe;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.*;
@@ -26,6 +28,7 @@
import org.apache.logging.log4j.Logger;
import java.util.Map;
+import java.util.Set;
public final class QeProcessorImpl implements QeProcessor {
@@ -57,9 +60,9 @@ public void registerQuery(TUniqueId queryId, QueryInfo info)
throws UserExceptio
}
@Override
- public void unregisterQuery(TUniqueId queryId) {
+ public QueryInfo unregisterQuery(TUniqueId queryId) {
LOG.info("deregister query id = " + queryId.toString());
- coordinatorMap.remove(queryId);
+ return coordinatorMap.remove(queryId);
}
@Override
@@ -89,7 +92,6 @@ public void unregisterQuery(TUniqueId queryId) {
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams
params) {
LOG.info("ReportExecStatus(): instance_id=" +
params.fragment_instance_id.toString()
+ "queryID=" + params.query_id.toString() + " params=" +
params);
-
final TReportExecStatusResult result = new TReportExecStatusResult();
final QueryInfo info = coordinatorMap.get(params.query_id);
if (info == null) {
@@ -97,11 +99,22 @@ public TReportExecStatusResult
reportExecStatus(TReportExecStatusParams params)
LOG.info("ReportExecStatus() runtime error");
return result;
}
- try {
- info.getCoord().updateFragmentExecStatus(params);
- } catch (Exception e) {
- LOG.warn(e.getMessage());
- return result;
+ if (params.protocol_version == FrontendServiceVersion.V1) {
+ try {
+ info.getCoord().updateFragmentExecStatus(params);
+ } catch (Exception e) {
+ LOG.warn(e.getMessage());
+ return result;
+ }
+ } else if (params.protocol_version == FrontendServiceVersion.V2){
+ if (params.getStatus().getStatus_code() == TStatusCode.OK
+ && params.isDone() && params.isSetIo_by_byte() &&
params.isSetCpu_consumpation()) {
+ info.addResourceConsumpation(params.getFragment_instance_id(),
+ params.getIo_by_byte(),
+ params.getCpu_consumpation());
+ }
+ } else {
+ Preconditions.checkState(false);
}
result.setStatus(new TStatus(TStatusCode.OK));
return result;
@@ -113,6 +126,9 @@ public TReportExecStatusResult
reportExecStatus(TReportExecStatusParams params)
private final Coordinator coord;
private final String sql;
private final long startExecTime;
+ private long ioByByte;
+ private long cpuConsumpation;
+ private Set<TUniqueId> reportInstances;
// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
@@ -125,6 +141,24 @@ public QueryInfo(ConnectContext connectContext, String
sql, Coordinator coord) {
this.coord = coord;
this.sql = sql;
this.startExecTime = System.currentTimeMillis();
+ this.ioByByte = 0;
+ this.cpuConsumpation = 0;
+ this.reportInstances = Sets.newHashSet();
+ }
+
+ public void addResourceConsumpation(TUniqueId instanceId, long io,
long cpu) {
+ if (!reportInstances.contains(instanceId)) {
+ ioByByte += io;
+ cpuConsumpation += cpu;
+ }
+ }
+
+ public long getIoByByte() {
+ return ioByByte;
+ }
+
+ public long getCpuConsumpation() {
+ return cpuConsumpation;
}
public ConnectContext getConnectContext() {
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index dcf27cb4..253e8683 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -93,7 +93,6 @@
private String originStmt;
private StatementBase parsedStmt;
private Analyzer analyzer;
- private boolean isRegisterQuery = false;
private RuntimeProfile profile;
private RuntimeProfile summaryProfile;
private volatile Coordinator coord = null;
@@ -193,16 +192,17 @@ public StatementBase getParsedStmt() {
// Execute one statement.
// Exception:
// IOException: talk with client failed.
- public void execute() throws Exception {
+ public QeProcessorImpl.QueryInfo execute() throws Exception {
long beginTimeInNanoSecond = TimeUtils.getStartTime();
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+ QeProcessorImpl.QueryInfo queryInfo = null;
try {
// analyze this query
analyze();
if (isForwardToMaster()) {
forwardToMaster();
- return;
+ return null;
} else {
LOG.debug("no need to transfer to Master. stmt: {}",
context.getStmtId());
}
@@ -227,7 +227,7 @@ public void execute() throws Exception {
throw e;
}
} finally {
-
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
+ queryInfo =
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
}
} else if (parsedStmt instanceof SetStmt) {
@@ -255,6 +255,8 @@ public void execute() throws Exception {
LOG.warn("errors when abort txn", abortTxnException);
}
throw t;
+ } finally {
+ queryInfo =
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
} else if (parsedStmt instanceof DdlStmt) {
handleDdlStmt();
@@ -287,11 +289,8 @@ public void execute() throws Exception {
// ignore kill stmt execute err(not monitor it)
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
}
- } finally {
- if (isRegisterQuery) {
- QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
- }
}
+ return queryInfo;
}
private void forwardToMaster() throws Exception {
@@ -533,7 +532,6 @@ private void handleQueryStmt() throws Exception {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt,
coord));
- isRegisterQuery = true;
coord.exec();
@@ -587,7 +585,6 @@ private void handleInsertStmt() throws Exception {
coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
- isRegisterQuery = true;
coord.exec();
diff --git
a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
b/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
index 1d0b757e..84d60d8a 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
@@ -28,5 +28,5 @@
@Protobuf(order = 1, required = true)
public PStatus status;
@Protobuf(fieldType = FieldType.OBJECT, order = 2, required = false)
- public List<PFragmentExecInfo> execInfos;
+ public List<PInstanceExecInfo> execInfos;
}
diff --git a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java
b/fe/src/main/java/org/apache/doris/rpc/PInstanceExecInfo.java
similarity index 83%
rename from fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java
rename to fe/src/main/java/org/apache/doris/rpc/PInstanceExecInfo.java
index 8f7bab3e..8cabe99f 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PInstanceExecInfo.java
@@ -17,17 +17,19 @@
package org.apache.doris.rpc;
+
+import com.baidu.bjf.remoting.protobuf.FieldType;
import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
+import java.util.List;
+
@ProtobufClass
-public class PFragmentExecInfo {
+public class PInstanceExecInfo {
@Protobuf(order = 1, required = true)
public PUniqueId instanceId;
@Protobuf(order = 2, required = true)
public int execStatus;
- @Protobuf(order = 3, required = false)
- public int planNodeType;
- @Protobuf(order = 4, required = false)
- public long rowsCount;
+ @Protobuf(fieldType = FieldType.OBJECT, order = 3, required = false)
+ public List<PPlanNodeExecInfo> execInfos;
}
diff --git a/fe/src/main/java/org/apache/doris/rpc/PPlanNodeExecInfo.java
b/fe/src/main/java/org/apache/doris/rpc/PPlanNodeExecInfo.java
new file mode 100644
index 00000000..1fc70796
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/rpc/PPlanNodeExecInfo.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
+import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
+
+@ProtobufClass
+public class PPlanNodeExecInfo {
+ @Protobuf(order = 1, required = false)
+ public int id;
+ @Protobuf(order = 2, required = false)
+ public int type;
+ @Protobuf(order = 3, required = false)
+ public long ioByByte;
+ @Protobuf(order = 4, required = false)
+ public long cpuConsumpation;
+}
+
diff --git a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
index c93db47c..da9f6ac5 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
@@ -23,6 +23,7 @@
@ProtobufClass
public class PUniqueId {
+
public PUniqueId() {}
public PUniqueId(TUniqueId tid) {
hi = tid.getHi();
@@ -33,4 +34,31 @@ public PUniqueId(TUniqueId tid) {
public long hi;
@Protobuf(order = 2, required = true)
public long lo;
+
+ @Override
+ public int hashCode() {
+ return ((int)hi + (int)(hi >> 32) + (int)lo + (int)(lo >> 32));
+ }
+
+ @Override
+ public String toString() {
+ return new
StringBuilder().append(hi).append(":").append(lo).toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (!(obj instanceof PUniqueId) || obj == null) {
+ return false;
+ }
+
+ final PUniqueId other = (PUniqueId)obj;
+ if (hi != other.hi || lo != other.lo) {
+ return false;
+ }
+ return true;
+ }
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index aaeb4bd7..07bc84fa 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -141,16 +141,22 @@ enum PFragmentExecStatus {
FINISHED = 2;
}
-message PFragmentExecInfo {
+message PPlanNodeExecInfo {
+ optional int32 id = 1;
+ optional int32 type = 2; // same as TPlanNodeType
+ optional int64 io_by_byte = 3;
+ optional int64 cpu_consumpation = 4;
+}
+
+message PInstanceExecInfo {
required PUniqueId finst_id = 1;
required int32 exec_status = 2;
- optional int32 plan_node_type = 3; // same as TPlanNodeType
- optional int64 rows_count = 4;
+ repeated PPlanNodeExecInfo plannode_exec_info = 3;
}
message PFetchFragmentExecInfosResult {
required PStatus status = 1;
- repeated PFragmentExecInfo fragment_exec_info = 2;
+ repeated PInstanceExecInfo instance_exec_info = 2;
}
service PBackendService {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 90277d75..2a1166b7 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -323,7 +323,8 @@ struct TReportExecStatusResult {
// Service Protocol Details
enum FrontendServiceVersion {
- V1
+ V1,
+ V2
}
// The results of an INSERT query, sent to the coordinator as part of
@@ -331,22 +332,22 @@ enum FrontendServiceVersion {
struct TReportExecStatusParams {
1: required FrontendServiceVersion protocol_version
- // required in V1
+ // required in V1 V2
2: optional Types.TUniqueId query_id
// passed into ExecPlanFragment() as TExecPlanFragmentParams.backend_num
- // required in V1
+ // required in V1 V2
3: optional i32 backend_num
- // required in V1
+ // required in V1 V2
4: optional Types.TUniqueId fragment_instance_id
// Status of fragment execution; any error status means it's done.
- // required in V1
+ // required in V1 V2
5: optional Status.TStatus status
// If true, fragment finished executing.
- // required in V1
+ // required in V1 V2
6: optional bool done
// cumulative profile
@@ -367,6 +368,10 @@ struct TReportExecStatusParams {
13: optional list<string> export_files
14: optional list<Types.TTabletCommitInfo> commitInfos
+
+ // V2 required
+ 15: optional i64 io_by_byte;
+ 16: optional i64 cpu_consumpation;
}
struct TFeResult {
@@ -567,7 +572,7 @@ service FrontendService {
TDescribeTableResult describeTable(1:TDescribeTableParams params)
TShowVariableResult showVariables(1:TShowVariableRequest params)
TReportExecStatusResult reportExecStatus(1:TReportExecStatusParams params)
-
+
MasterService.TMasterResult finishTask(1:MasterService.TFinishTaskRequest
request)
MasterService.TMasterResult report(1:MasterService.TReportRequest request)
MasterService.TFetchResourceResult fetchResource()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]