chenhao7253886 closed pull request #497: Support io and cpu indicates for current query URL: https://github.com/apache/incubator-doris/pull/497
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index b807152f..99aa5e8b 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -153,7 +153,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); - + _process_rows_counter = ADD_COUNTER(runtime_profile(), "ProcessRows", TUnit::UNIT); DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size()); for (int i = 0; i < _evaluators.size(); ++i) { @@ -236,7 +236,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) { while (!_input_eos && _prev_input_row == NULL) { RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos)); - + COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows()); if (_curr_child_batch->num_rows() > 0) { _prev_input_row = _curr_child_batch->get_row(0); process_child_batches(state); @@ -612,6 +612,7 @@ Status AnalyticEvalNode::process_child_batches(RuntimeState* state) { _prev_child_batch->reset(); _prev_child_batch.swap(_curr_child_batch); RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos)); + COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows()); } return Status::OK; diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h index 882b3d39..6b92c37e 100644 --- a/be/src/exec/analytic_eval_node.h +++ b/be/src/exec/analytic_eval_node.h @@ -327,6 +327,8 @@ class AnalyticEvalNode : public ExecNode { // Time spent processing the child rows. RuntimeProfile::Counter* _evaluation_timer; + + RuntimeProfile::Counter* _process_rows_counter; }; } diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 840c875f..4c7828c3 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -72,6 +72,7 @@ Status ExchangeNode::prepare(RuntimeState* state) { _num_senders, config::exchg_node_buffer_size_bytes, state->runtime_profile(), _is_merging); if (_is_merging) { + _merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT); RETURN_IF_ERROR(_sort_exec_exprs.prepare( state, _row_descriptor, _row_descriptor, expr_mem_tracker())); // AddExprCtxsToFree(_sort_exec_exprs); @@ -213,7 +214,8 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc state->set_query_state_for_wait(); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); state->set_query_state_for_running(); - + //TODO chenhao, count only one instance lost others. + COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows()); while ((_num_rows_skipped < _offset)) { _num_rows_skipped += output_batch->num_rows(); // Throw away rows in the output batch until the offset is skipped. @@ -228,6 +230,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc break; } RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); + COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows()); } _num_rows_returned += output_batch->num_rows(); diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h index 8b847895..6450d39f 100644 --- a/be/src/exec/exchange_node.h +++ b/be/src/exec/exchange_node.h @@ -26,6 +26,7 @@ namespace doris { class RowBatch; class DataStreamRecvr; +class RuntimeProfile; // Receiver node for data streams. The data stream receiver is created in Prepare() // and closed in Close(). @@ -106,6 +107,8 @@ class ExchangeNode : public ExecNode { // Number of rows skipped so far. int64_t _num_rows_skipped; + + RuntimeProfile::Counter* _merge_rows_counter; }; }; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 8b4f8d15..37b6b3a2 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -186,7 +186,6 @@ Status ExecNode::prepare(RuntimeState* state) { _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker())); _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get())); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); - // TODO chenhao RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), expr_mem_tracker())); // TODO(zc): diff --git a/be/src/exec/new_partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc index da063f92..72ba8876 100644 --- a/be/src/exec/new_partitioned_aggregation_node.cc +++ b/be/src/exec/new_partitioned_aggregation_node.cc @@ -44,7 +44,6 @@ #include "runtime/tuple_row.h" #include "runtime/tuple.h" #include "udf/udf_internal.h" -#include "util/runtime_profile.h" #include "gen_cpp/Exprs_types.h" #include "gen_cpp/PlanNodes_types.h" @@ -201,6 +200,7 @@ Status NewPartitionedAggregationNode::prepare(RuntimeState* state) { ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT); largest_partition_percent_ = runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT); + _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT); if (is_streaming_preagg_) { runtime_profile()->append_exec_option("Streaming Preaggregation"); streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime"); @@ -308,7 +308,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos)); - + COUNTER_UPDATE(_build_rows_counter, batch.num_rows()); if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); @@ -532,7 +532,6 @@ Status NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), &child_eos_)); - SCOPED_TIMER(streaming_timer_); int remaining_capacity[PARTITION_FANOUT]; diff --git a/be/src/exec/new_partitioned_aggregation_node.h b/be/src/exec/new_partitioned_aggregation_node.h index ac30c298..227cbed4 100644 --- a/be/src/exec/new_partitioned_aggregation_node.h +++ b/be/src/exec/new_partitioned_aggregation_node.h @@ -272,6 +272,8 @@ class NewPartitionedAggregationNode : public ExecNode { /// Time spent processing the child rows RuntimeProfile::Counter* build_timer_; + RuntimeProfile::Counter* _build_rows_counter; + /// Total time spent resizing hash tables. RuntimeProfile::Counter* ht_resize_timer_; diff --git a/be/src/exec/new_partitioned_aggregation_node_ir.cc b/be/src/exec/new_partitioned_aggregation_node_ir.cc index 4a5c00cd..57afbc1d 100644 --- a/be/src/exec/new_partitioned_aggregation_node_ir.cc +++ b/be/src/exec/new_partitioned_aggregation_node_ir.cc @@ -23,6 +23,7 @@ #include "runtime/buffered_tuple_stream3.inline.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" +#include "util/runtime_profile.h" using namespace doris; @@ -238,7 +239,7 @@ bool NewPartitionedAggregationNode::TryAddToHashTable( return false; } } - + COUNTER_UPDATE(_build_rows_counter, 1); UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row); return true; } diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp index 71a42c85..3b816e9e 100644 --- a/be/src/exec/sort_node.cpp +++ b/be/src/exec/sort_node.cpp @@ -19,7 +19,7 @@ #include "exec/sort_exec_exprs.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -// #include "runtime/sorted_run_merger.h" +#include "util/runtime_profile.h" namespace doris { @@ -49,6 +49,7 @@ Status SortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); RETURN_IF_ERROR(_sort_exec_exprs.prepare( state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker())); + _sort_rows_counter = ADD_COUNTER(runtime_profile(), "SortRows", TUnit::UNIT); return Status::OK; } @@ -144,6 +145,7 @@ Status SortNode::sort_input(RuntimeState* state) { do { batch.reset(); RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos)); + COUNTER_UPDATE(_sort_rows_counter, batch.num_rows()); RETURN_IF_ERROR(_sorter->add_batch(&batch)); RETURN_IF_CANCELLED(state); RETURN_IF_LIMIT_EXCEEDED(state); diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h index e1de714a..68dcfd8e 100644 --- a/be/src/exec/sort_node.h +++ b/be/src/exec/sort_node.h @@ -70,6 +70,7 @@ class SortNode : public ExecNode { std::vector<bool> _nulls_first; boost::scoped_ptr<MemPool> _tuple_pool; + RuntimeProfile::Counter* _sort_rows_counter; }; } diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp index 5e07c3a3..a1494cc1 100644 --- a/be/src/exec/union_node.cpp +++ b/be/src/exec/union_node.cpp @@ -77,7 +77,7 @@ Status UnionNode::prepare(RuntimeState* state) { _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _codegend_union_materialize_batch_fns.resize(_child_expr_lists.size()); - + _materialize_rows_counter = ADD_COUNTER(runtime_profile(), "MaterializeRows", TUnit::UNIT); // Prepare const expr lists. for (const vector<ExprContext*>& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::prepare(exprs, state, row_desc(), expr_mem_tracker())); @@ -210,6 +210,7 @@ Status UnionNode::get_next_materialized(RuntimeState* state, RowBatch* row_batch // The first batch from each child is always fetched here. RETURN_IF_ERROR(child(_child_idx)->get_next( state, _child_batch.get(), &_child_eos)); + COUNTER_UPDATE(_materialize_rows_counter, _child_batch->num_rows()); } while (!row_batch->at_capacity()) { @@ -224,6 +225,7 @@ Status UnionNode::get_next_materialized(RuntimeState* state, RowBatch* row_batch // All batches except the first batch from each child are fetched here. RETURN_IF_ERROR(child(_child_idx)->get_next( state, _child_batch.get(), &_child_eos)); + COUNTER_UPDATE(_materialize_rows_counter, _child_batch->num_rows()); // If we fetched an empty batch, go back to the beginning of this while loop, and // try again. if (_child_batch->num_rows() == 0) continue; diff --git a/be/src/exec/union_node.h b/be/src/exec/union_node.h index b12fb597..7daf2a62 100644 --- a/be/src/exec/union_node.h +++ b/be/src/exec/union_node.h @@ -100,6 +100,8 @@ class UnionNode : public ExecNode { /// to -1 if no child needs to be closed. int _to_close_child_idx; + RuntimeProfile::Counter* _materialize_rows_counter; + /// END: Members that must be Reset() ///////////////////////////////////////// diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 42efed7f..78358ea3 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -314,7 +314,6 @@ void FragmentExecState::coordinator_callback( _executor.cancel(); return; } - coord->reportExecStatus(res, params); } @@ -501,41 +500,32 @@ void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker is going to exit."; } - -Status FragmentMgr::fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* result, - const PFetchFragmentExecInfoRequest* request) { - int fragment_id_list_size = request->finst_id_size(); - for (int i = 0; i < fragment_id_list_size; i++) { - const PUniqueId& p_fragment_id = request->finst_id(i); - TUniqueId id; - id.__set_hi(p_fragment_id.hi()); - id.__set_lo(p_fragment_id.lo()); - PFragmentExecInfo* info = result->add_fragment_exec_info(); - PUniqueId* finst_id = info->mutable_finst_id(); - finst_id->set_hi(p_fragment_id.hi()); - finst_id->set_lo(p_fragment_id.lo()); - - bool is_running = false; - std::lock_guard<std::mutex> lock(_lock); - { - auto iter = _fragment_map.find(id); - if (iter == _fragment_map.end()) { - info->set_exec_status(PFragmentExecStatus::FINISHED); - continue; +Status FragmentMgr::trigger_profile_report(const PTriggerProfileReportRequest* request) { + if (request->instance_ids_size() > 0) { + for (int i = 0; i < request->instance_ids_size(); i++) { + const PUniqueId& p_fragment_id = request->instance_ids(i); + TUniqueId id; + id.__set_hi(p_fragment_id.hi()); + id.__set_lo(p_fragment_id.lo()); + { + std::lock_guard<std::mutex> lock(_lock); + auto iter = _fragment_map.find(id); + if (iter != _fragment_map.end()) { + iter->second->executor()->report_profile_once(); + } } - is_running = iter->second->executor()->runtime_state()->is_running(); } - - if (is_running) { - info->set_exec_status(PFragmentExecStatus::RUNNING); - } else { - info->set_exec_status(PFragmentExecStatus::WAIT); + } else { + std::lock_guard<std::mutex> lock(_lock); + auto iter = _fragment_map.begin(); + for (; iter != _fragment_map.end(); iter++) { + iter->second->executor()->report_profile_once(); } } - - return Status::OK; + return Status::OK; } + void FragmentMgr::debug(std::stringstream& ss) { // Keep things simple std::lock_guard<std::mutex> lock(_lock); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index c911fe30..b5fc075c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -54,14 +54,14 @@ class FragmentMgr : public RestMonitorIface { // TODO(zc): report this is over Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); - Status fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* result, - const PFetchFragmentExecInfoRequest* request); - Status cancel(const TUniqueId& fragment_id); void cancel_worker(); virtual void debug(std::stringstream& ss); + + Status trigger_profile_report(const PTriggerProfileReportRequest* request); + private: void exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index d216f9ac..1e769ee6 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -260,7 +260,7 @@ Status PlanFragmentExecutor::open() { // may block // TODO: if no report thread is started, make sure to send a final profile // at end, otherwise the coordinator hangs in case we finish w/ an error - if (_is_report_success && !_report_status_cb.empty() && config::status_report_interval > 0) { + if (!_report_status_cb.empty() && config::status_report_interval > 0) { boost::unique_lock<boost::mutex> l(_report_thread_lock); _report_thread = boost::thread(&PlanFragmentExecutor::report_profile, this); // make sure the thread started up, otherwise report_profile() might get into a race @@ -366,17 +366,21 @@ void PlanFragmentExecutor::report_profile() { boost::get_system_time() + boost::posix_time::seconds(report_fragment_offset); // We don't want to wait longer than it takes to run the entire fragment. _stop_report_thread_cv.timed_wait(l, timeout); - + bool is_report_profile_interval = _is_report_success && config::status_report_interval > 0; while (_report_thread_active) { - boost::system_time timeout = - boost::get_system_time() + boost::posix_time::seconds(config::status_report_interval); - - // timed_wait can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, _report_thread_active, to coordinate this. - _stop_report_thread_cv.timed_wait(l, timeout); + if (is_report_profile_interval) { + boost::system_time timeout = + boost::get_system_time() + boost::posix_time::seconds(config::status_report_interval); + // timed_wait can return because the timeout occurred or the condition variable + // was signaled. We can't rely on its return value to distinguish between the + // two cases (e.g. there is a race here where the wait timed out but before grabbing + // the lock, the condition variable was signaled). Instead, we will use an external + // flag, _report_thread_active, to coordinate this. + _stop_report_thread_cv.timed_wait(l, timeout); + } else { + // Artificial triggering, such as show proc "/current_queries". + _stop_report_thread_cv.wait(l); + } if (VLOG_FILE_IS_ON) { VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index fe342c6c..299191b5 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -135,6 +135,11 @@ class PlanFragmentExecutor { DataSink* get_sink() { return _sink.get(); } + + void report_profile_once() { + _stop_report_thread_cv.notify_one(); + } + private: ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3b7080cc..2f4b3f34 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -186,17 +186,14 @@ void PInternalServiceImpl<T>::fetch_data( } template<typename T> -void PInternalServiceImpl<T>::fetch_fragment_exec_infos( +void PInternalServiceImpl<T>::trigger_profile_report( google::protobuf::RpcController* controller, - const PFetchFragmentExecInfoRequest* request, - PFetchFragmentExecInfosResult* result, + const PTriggerProfileReportRequest* request, + PTriggerProfileReportResult* result, google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); - auto status = _exec_env->fragment_mgr()->fetch_fragment_exec_infos(result, request); - if (!status.ok()) { - LOG(WARNING) << "fetch fragment exec status failed:" << status.get_error_msg(); - } - status.to_protobuf(result->mutable_status()); + auto st = _exec_env->fragment_mgr()->trigger_profile_report(request); + st.to_protobuf(result->mutable_status()); } template class PInternalServiceImpl<PBackendService>; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 5cc5cb6f..c6406ee5 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -74,10 +74,10 @@ class PInternalServiceImpl : public T { PTabletWriterCancelResult* response, google::protobuf::Closure* done) override; - void fetch_fragment_exec_infos( + void trigger_profile_report( google::protobuf::RpcController* controller, - const PFetchFragmentExecInfoRequest* request, - PFetchFragmentExecInfosResult* result, + const PTriggerProfileReportRequest* request, + PTriggerProfileReportResult* result, google::protobuf::Closure* done) override; private: diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 2b39b87e..21ffef73 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -17,44 +17,29 @@ package org.apache.doris.common.proc; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; -import org.apache.doris.common.util.DebugUtil; import org.apache.doris.qe.QueryStatisticsItem; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.rpc.BackendServiceProxy; -import org.apache.doris.rpc.PFetchFragmentExecInfoRequest; -import org.apache.doris.rpc.PFetchFragmentExecInfosResult; -import org.apache.doris.rpc.PFragmentExecInfo; -import org.apache.doris.rpc.PUniqueId; -import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TStatusCode; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /* * show proc "/current_queries/{query_id}/fragments" */ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { - private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); + private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class); public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("FragmentId").add("InstanceId").add("Host").add("ExecState").build(); + .add("FragmentId").add("InstanceId").add("Host") + .add("IO").add("CPU").build(); private QueryStatisticsItem item; public CurrentQueryFragmentProcNode(QueryStatisticsItem item) { @@ -84,89 +69,21 @@ private TNetworkAddress toBrpcHost(TNetworkAddress host) throws AnalysisExceptio } private ProcResult requestFragmentExecInfos() throws AnalysisException { - - // create request and remove redundant rpc - final Map<TNetworkAddress, Request> requestMap = Maps.newHashMap(); - for (QueryStatisticsItem.FragmentInstanceInfo info : item.getFragmentInstanceInfos()) { - final TNetworkAddress brpcNetAddress; - try { - brpcNetAddress = toBrpcHost(info.getAddress()); - } catch (Exception e) { - LOG.warn(e.getMessage()); - throw new AnalysisException(e.getMessage()); - } - Request request = requestMap.get(brpcNetAddress); - if (request == null) { - request = new Request(brpcNetAddress); - requestMap.put(brpcNetAddress, request); - } - request.addInstanceId(info.getFragmentId(), new PUniqueId(info.getInstanceId())); - } - - // send request - final List<Pair<Request, Future<PFetchFragmentExecInfosResult>>> futures = Lists.newArrayList(); - for (TNetworkAddress address : requestMap.keySet()) { - final Request request = requestMap.get(address); - final PFetchFragmentExecInfoRequest pbRequest = - new PFetchFragmentExecInfoRequest(request.getInstanceId()); - try { - futures.add(Pair.create(request, BackendServiceProxy.getInstance(). - fetchFragmentExecInfosAsync(address, pbRequest))); - } catch (RpcException e) { - throw new AnalysisException("exec rpc error"); - } - } - + final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); + final Collection<CurrentQueryInfoProvider.InstanceConsumption> instanceConsumptions + = provider.getQueryInstanceConsumption(item); final List<List<String>> sortedRowDatas = Lists.newArrayList(); - // get result - for (Pair<Request, Future<PFetchFragmentExecInfosResult>> pair : futures) { - TStatusCode code; - String errMsg = null; - try { - final PFetchFragmentExecInfosResult fragmentExecInfoResult - = pair.second.get(10, TimeUnit.SECONDS); - code = TStatusCode.findByValue(fragmentExecInfoResult.status.code); - if (fragmentExecInfoResult.status.msgs != null - && !fragmentExecInfoResult.status.msgs.isEmpty()) { - errMsg = fragmentExecInfoResult.status.msgs.get(0); - } - - if (errMsg == null) { - for (PFragmentExecInfo info : fragmentExecInfoResult.execInfos) { - final List<String> rowData = Lists.newArrayList(); - rowData.add(pair.first.getFragmentId(info.instanceId).toString()); - rowData.add(DebugUtil.printId(info.instanceId)); - rowData.add(pair.first.getAddress().getHostname()); - rowData.add(getFragmentExecState(info.execStatus)); - sortedRowDatas.add(rowData); - } - } - } catch (ExecutionException e) { - LOG.warn("catch a execute exception", e); - code = TStatusCode.THRIFT_RPC_ERROR; - } catch (InterruptedException e) { - LOG.warn("catch a interrupt exception", e); - code = TStatusCode.INTERNAL_ERROR; - } catch (TimeoutException e) { - LOG.warn("catch a timeout exception", e); - code = TStatusCode.TIMEOUT; - } - - if (code != TStatusCode.OK) { - switch (code) { - case TIMEOUT: - errMsg = "query timeout"; - break; - case THRIFT_RPC_ERROR: - errMsg = "rpc failed"; - break; - default: - errMsg = "exec rpc error"; - } - throw new AnalysisException(errMsg); - } + for (CurrentQueryInfoProvider.InstanceConsumption instanceConsumption : + instanceConsumptions) { + final List<String> rowData = Lists.newArrayList(); + rowData.add(instanceConsumption.getFragmentId()); + rowData.add(instanceConsumption.getInstanceId().toString()); + rowData.add(instanceConsumption.getAddress().toString()); + rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption())); + rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption())); + sortedRowDatas.add(rowData); } - + // sort according to explain's fragment index sortedRowDatas.sort(new Comparator<List<String>>() { @Override @@ -182,51 +99,4 @@ public int compare(List<String> l1, List<String> l2) { return result; } - private enum FragmentExecState { - RUNNING, - WAIT, - FINISHED, - NONE - } - - private String getFragmentExecState(int i) { - if (i >= FragmentExecState.values().length) { - // can't run here - LOG.warn("Fetch uncorrect instance state."); - return FragmentExecState.NONE.toString(); - } - return FragmentExecState.values()[i].toString(); - } - - private static class Request { - private final TNetworkAddress address; - private List<String> fragmentIds; - private List<PUniqueId> instanceIds; - private Map<String, String> instanceIdToFragmentId; - - public Request(TNetworkAddress address) { - this.address = address; - this.fragmentIds = Lists.newArrayList(); - this.instanceIds = Lists.newArrayList(); - this.instanceIdToFragmentId = Maps.newHashMap(); - } - - public TNetworkAddress getAddress() { - return address; - } - - public List<PUniqueId> getInstanceId() { - return instanceIds; - } - - public void addInstanceId(String fragmentId, PUniqueId instanceId) { - this.fragmentIds.add(fragmentId); - this.instanceIds.add(instanceId); - this.instanceIdToFragmentId.put(DebugUtil.printId(instanceId), fragmentId.toString()); - } - - public String getFragmentId(PUniqueId id) { - return instanceIdToFragmentId.get(DebugUtil.printId(id)); - } - } } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java new file mode 100644 index 00000000..f18d9d13 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.Counter; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.qe.QueryStatisticsItem; +import org.apache.doris.rpc.*; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Provide running query's PlanNode informations, IO consumption and CPU consumption. + */ +public class CurrentQueryInfoProvider { + private static final Logger LOG = LogManager.getLogger(CurrentQueryInfoProvider.class); + + public CurrentQueryInfoProvider() { + } + + /** + * Firstly send request to trigger profile to report for specified query and wait a while, + * Secondly get Counters from Coordinator's RuntimeProfile and return query's consumption. + * + * @param item + * @return + * @throws AnalysisException + */ + public Consumption getQueryConsumption(QueryStatisticsItem item) throws AnalysisException { + triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); + return new Consumption(item.getQueryProfile()); + } + + /** + * Same as getQueryConsumption, but this will cause BE to report all queries profile. + * + * @param items + * @return + * @throws AnalysisException + */ + public Map<String, Consumption> getQueryConsumption(Collection<QueryStatisticsItem> items) + throws AnalysisException { + triggerReportAndWait(items, getWaitingTime(items.size()), true); + final Map<String, Consumption> queryConsumptions = Maps.newHashMap(); + for (QueryStatisticsItem item : items) { + queryConsumptions.put(item.getQueryId(), new Consumption(item.getQueryProfile())); + } + return queryConsumptions; + } + + /** + * Return query's instances consumption. + * + * @param item + * @return + * @throws AnalysisException + */ + public Collection<InstanceConsumption> getQueryInstanceConsumption(QueryStatisticsItem item) throws AnalysisException { + triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); + final Map<String, RuntimeProfile> instanceProfiles = collectInstanceProfile(item.getQueryProfile()); + final List<InstanceConsumption> instanceConsumptions = Lists.newArrayList(); + for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { + final RuntimeProfile instanceProfile = instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId())); + Preconditions.checkNotNull(instanceProfile); + final InstanceConsumption consumption = + new InstanceConsumption( + instanceInfo.getFragmentId(), + instanceInfo.getInstanceId(), + instanceInfo.getAddress(), + instanceProfile); + instanceConsumptions.add(consumption); + } + return instanceConsumptions; + } + + /** + * Profile trees is query profile -> fragment profile -> instance profile .... + * @param queryProfile + * @return instanceProfiles + */ + private Map<String, RuntimeProfile> collectInstanceProfile(RuntimeProfile queryProfile) { + final Map<String, RuntimeProfile> instanceProfiles = Maps.newHashMap(); + for (RuntimeProfile fragmentProfile : queryProfile.getChildMap().values()) { + for (Map.Entry<String, RuntimeProfile> entry: fragmentProfile.getChildMap().entrySet()) { + Preconditions.checkState(instanceProfiles.put(parseInstanceId(entry.getKey()), entry.getValue()) == null); + } + } + return instanceProfiles; + } + + /** + * Instance profile key is "Instance ${instance_id} (host=$host $port)" + * @param str + * @return + */ + private String parseInstanceId(String str) { + final String[] elements = str.split(" "); + if (elements.length == 4) { + return elements[1]; + } else { + Preconditions.checkState(false); + return ""; + } + } + + private long getWaitingTimeForSingleQuery() { + return getWaitingTime(1); + } + + /** + * @param numOfQuery + * @return unit(ms) + */ + private long getWaitingTime(int numOfQuery) { + final int oneQueryWaitingTime = 100; + final int allQueryMaxWaitingTime = 2000; + final int waitingTime = numOfQuery * oneQueryWaitingTime; + return waitingTime > allQueryMaxWaitingTime ? allQueryMaxWaitingTime : waitingTime; + } + + private void triggerReportAndWait(QueryStatisticsItem item, long waitingTime, boolean allQuery) + throws AnalysisException { + final List<QueryStatisticsItem> items = Lists.newArrayList(item); + triggerReportAndWait(items, waitingTime, allQuery); + } + + private void triggerReportAndWait(Collection<QueryStatisticsItem> items, long waitingTime, boolean allQuery) + throws AnalysisException { + triggerProfileReport(items, allQuery); + try { + Thread.currentThread().sleep(waitingTime); + } catch (InterruptedException e) { + } + } + + /** + * send report profile request. + * @param items + * @param allQuery true:all queries profile will be reported, false:specified queries profile will be reported. + * @throws AnalysisException + */ + private void triggerProfileReport(Collection<QueryStatisticsItem> items, boolean allQuery) throws AnalysisException { + final Map<TNetworkAddress, Request> requests = Maps.newHashMap(); + final Map<TNetworkAddress, TNetworkAddress> brpcAddresses = Maps.newHashMap(); + for (QueryStatisticsItem item : items) { + for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { + // use brpc address + TNetworkAddress brpcNetAddress = brpcAddresses.get(instanceInfo.getAddress()); + if (brpcNetAddress == null) { + try { + brpcNetAddress = toBrpcHost(instanceInfo.getAddress()); + brpcAddresses.put(instanceInfo.getAddress(), brpcNetAddress); + } catch (Exception e) { + LOG.warn(e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + // merge different requests + Request request = requests.get(brpcNetAddress); + if (request == null) { + request = new Request(brpcNetAddress); + requests.put(brpcNetAddress, request); + } + // specified query instance which will report. + if (!allQuery) { + final PUniqueId pUId = new PUniqueId(instanceInfo.getInstanceId()); + request.addInstanceId(pUId); + } + } + } + recvResponse(sendRequest(requests)); + } + + private List<Pair<Request, Future<PTriggerProfileReportResult>>> sendRequest( + Map<TNetworkAddress, Request> requests) throws AnalysisException { + final List<Pair<Request, Future<PTriggerProfileReportResult>>> futures = Lists.newArrayList(); + for (TNetworkAddress address : requests.keySet()) { + final Request request = requests.get(address); + final PTriggerProfileReportRequest pbRequest = + new PTriggerProfileReportRequest(request.getInstanceIds()); + try { + futures.add(Pair.create(request, BackendServiceProxy.getInstance(). + triggerProfileReportAsync(address, pbRequest))); + } catch (RpcException e) { + throw new AnalysisException("Sending request fails for query's execution informations."); + } + } + return futures; + } + + private void recvResponse(List<Pair<Request, Future<PTriggerProfileReportResult>>> futures) + throws AnalysisException { + final String reasonPrefix = "Fail to receive result."; + for (Pair<Request, Future<PTriggerProfileReportResult>> pair : futures) { + try { + final PTriggerProfileReportResult result + = pair.second.get(2, TimeUnit.SECONDS); + final TStatusCode code = TStatusCode.findByValue(result.status.code); + if (code != TStatusCode.OK) { + String errMsg = ""; + if (result.status.msgs != null && !result.status.msgs.isEmpty()) { + errMsg = result.status.msgs.get(0); + } + throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress() + + " reason:" + errMsg); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn(reasonPrefix + " reason:" + e.getCause()); + throw new AnalysisException(reasonPrefix); + } + + } + } + + private TNetworkAddress toBrpcHost(TNetworkAddress host) throws AnalysisException { + final Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort( + host.getHostname(), host.getPort()); + if (backend == null) { + throw new AnalysisException(new StringBuilder("Backend ") + .append(host.getHostname()) + .append(":") + .append(host.getPort()) + .append(" does not exist") + .toString()); + } + if (backend.getBrpcPort() < 0) { + throw new AnalysisException("BRPC port is't exist."); + } + return new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + } + + + public static class Consumption { + private final static String OLAP_SCAN_NODE = "OLAP_SCAN_NODE"; + private final static String HASH_JOIN_NODE = "HASH_JOIN_NODE"; + private final static String HASH_AGGREGATION_NODE = "AGGREGATION_NODE"; + private final static String SORT_NODE = "SORT_NODE"; + private final static String ANALYTIC_EVAL_NODE = "ANALYTIC_EVAL_NODE"; + private final static String UNION_NODE = "UNION_NODE"; + private final static String EXCHANGE_NODE = "EXCHANGE_NODE"; + + protected final List<ConsumptionCalculator> calculators; + + public Consumption(RuntimeProfile profile) { + this.calculators = Lists.newArrayList(); + init(profile); + } + + private void init(RuntimeProfile profile) { + final List<Map<String, Counter>> olapScanCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, olapScanCounters, OLAP_SCAN_NODE); + calculators.add(new OlapScanNodeConsumptionCalculator(olapScanCounters)); + + final List<Map<String, Counter>> hashJoinCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, hashJoinCounters, HASH_JOIN_NODE); + calculators.add(new HashJoinConsumptionCalculator(hashJoinCounters)); + + final List<Map<String, Counter>> hashAggCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, hashAggCounters, HASH_AGGREGATION_NODE); + calculators.add(new HashAggConsumptionCalculator(hashAggCounters)); + + final List<Map<String, Counter>> sortCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, sortCounters, SORT_NODE); + calculators.add(new SortConsumptionCalculator(sortCounters)); + + final List<Map<String, Counter>> windowsCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, windowsCounters, ANALYTIC_EVAL_NODE); + calculators.add(new WindowsConsumptionCalculator(windowsCounters)); + + final List<Map<String, Counter>> unionCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, unionCounters, UNION_NODE); + calculators.add(new UnionConsumptionCalculator(unionCounters)); + + final List<Map<String, Counter>> exchangeCounters = Lists.newArrayList(); + collectNodeProfileCounters(profile, exchangeCounters, EXCHANGE_NODE); + calculators.add(new ExchangeConsumptionCalculator(exchangeCounters)); + } + + private void collectNodeProfileCounters(RuntimeProfile profile, + List<Map<String, Counter>> counterMaps, String name) { + for (Map.Entry<String, RuntimeProfile> entry : profile.getChildMap().entrySet()) { + if (name.equals(parsePossibleExecNodeName(entry.getKey()))) { + counterMaps.add(entry.getValue().getCounterMap()); + } + collectNodeProfileCounters(entry.getValue(), counterMaps, name); + } + } + + /** + * ExecNode's RuntimeProfile name is "$node_type_name (id=?)" + * @param str + * @return + */ + private String parsePossibleExecNodeName(String str) { + final String[] elements = str.split(" "); + if (elements.length == 2) { + return elements[0]; + } else { + return ""; + } + } + + public long getTotalCpuConsumption() { + long cpu = 0; + for (ConsumptionCalculator consumption : calculators) { + cpu += consumption.getCpu(); + } + return cpu; + } + + public long getTotalIoConsumption() { + long io = 0; + for (ConsumptionCalculator consumption : calculators) { + io += consumption.getIo(); + } + return io; + } + } + + public static class InstanceConsumption extends Consumption { + private final String fragmentId; + private final TUniqueId instanceId; + private final TNetworkAddress address; + + public InstanceConsumption( + String fragmentId, + TUniqueId instanceId, + TNetworkAddress address, + RuntimeProfile profile) { + super(profile); + this.fragmentId = fragmentId; + this.instanceId = instanceId; + this.address = address; + + } + + public String getFragmentId() { + return fragmentId; + } + + public TUniqueId getInstanceId() { + return instanceId; + } + + public TNetworkAddress getAddress() { + return address; + } + } + + private static abstract class ConsumptionCalculator { + protected final List<Map<String, Counter>> counterMaps; + + public ConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + this.counterMaps = counterMaps; + } + + public long getCpu() { + long cpu = 0; + for (Map<String, Counter> counters : counterMaps) { + cpu += getCpuByRows(counters); + } + return cpu; + } + + public long getIo() { + long io = 0; + for (Map<String, Counter> counters : counterMaps) { + io += getIoByByte(counters); + } + return io; + } + + protected long getCpuByRows(Map<String, Counter> counters) { + return 0; + } + + protected long getIoByByte(Map<String, Counter> counters) { + return 0; + } + } + + private static class OlapScanNodeConsumptionCalculator extends ConsumptionCalculator { + public OlapScanNodeConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getIoByByte(Map<String, Counter> counters) { + final Counter counter = counters.get("CompressedBytesRead"); + return counter == null ? 0 : counter.getValue(); + } + } + + private static class HashJoinConsumptionCalculator extends ConsumptionCalculator { + public HashJoinConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter probeCounter = counters.get("ProbeRows"); + final Counter buildCounter = counters.get("BuildRows"); + return probeCounter == null || buildCounter == null ? + 0 : probeCounter.getValue() + buildCounter.getValue(); + } + } + + private static class HashAggConsumptionCalculator extends ConsumptionCalculator { + public HashAggConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter buildCounter = counters.get("BuildRows"); + return buildCounter == null ? 0 : buildCounter.getValue(); + } + } + + private static class SortConsumptionCalculator extends ConsumptionCalculator { + public SortConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter sortRowsCounter = counters.get("SortRows"); + return sortRowsCounter == null ? 0 : sortRowsCounter.getValue(); + } + } + + private static class WindowsConsumptionCalculator extends ConsumptionCalculator { + public WindowsConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter processRowsCounter = counters.get("ProcessRows"); + return processRowsCounter == null ? 0 : processRowsCounter.getValue(); + + } + } + + private static class UnionConsumptionCalculator extends ConsumptionCalculator { + public UnionConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter materializeRowsCounter = counters.get("MaterializeRows"); + return materializeRowsCounter == null ? 0 : materializeRowsCounter.getValue(); + } + } + + private static class ExchangeConsumptionCalculator extends ConsumptionCalculator { + + public ExchangeConsumptionCalculator(List<Map<String, Counter>> counterMaps) { + super(counterMaps); + } + + @Override + protected long getCpuByRows(Map<String, Counter> counters) { + final Counter mergeRowsCounter = counters.get("MergeRows"); + return mergeRowsCounter == null ? 0 : mergeRowsCounter.getValue(); + } + } + + private static class Request { + private final TNetworkAddress address; + private final List<PUniqueId> instanceIds; + + public Request(TNetworkAddress address) { + this.address = address; + this.instanceIds = Lists.newArrayList(); + } + + public TNetworkAddress getAddress() { + return address; + } + + public List<PUniqueId> getInstanceIds() { + return instanceIds; + } + + public void addInstanceId(PUniqueId instanceId) { + this.instanceIds.add(instanceId); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index 566a4831..a59c3dbe 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -17,26 +17,29 @@ package org.apache.doris.common.proc; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.qe.QueryStatisticsItem; - import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QueryStatisticsItem; import java.util.Comparator; import java.util.List; import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /* * show proc "/current_queries" */ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { + private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class); public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() - .add("ConnectionId").add("QueryId").add("Database").add("User").add("ExecTime").build(); + .add("ConnectionId").add("QueryId").add("Database").add("User") + .add("IO").add("CPU").add("ExecTime").build(); - private static final int EXEC_TIME_INDEX = 3; + private static final int EXEC_TIME_INDEX = 6; @Override public boolean register(String name, ProcNodeInterface node) { @@ -62,12 +65,19 @@ public ProcResult fetchResult() throws AnalysisException { final Map<String, QueryStatisticsItem> statistic = QeProcessorImpl.INSTANCE.getQueryStatistics(); result.setNames(TITLE_NAMES.asList()); final List<List<String>> sortedRowData = Lists.newArrayList(); + + final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); + final Map<String, CurrentQueryInfoProvider.Consumption> consumptions + = provider.getQueryConsumption(statistic.values()); for (QueryStatisticsItem item : statistic.values()) { final List<String> values = Lists.newArrayList(); values.add(item.getConnId()); values.add(item.getQueryId()); values.add(item.getDb()); values.add(item.getUser()); + final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId()); + values.add(String.valueOf(consumption.getTotalIoConsumption())); + values.add(String.valueOf(consumption.getTotalCpuConsumption())); values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/src/main/java/org/apache/doris/common/util/Counter.java b/fe/src/main/java/org/apache/doris/common/util/Counter.java index 41e0c77b..f1fa098d 100644 --- a/fe/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/src/main/java/org/apache/doris/common/util/Counter.java @@ -19,29 +19,32 @@ import org.apache.doris.thrift.TUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + // Counter means indicators field. The counter's name is key, the counter itself is value. public class Counter { - private long value; - private TUnit type; - + private AtomicLong value; + private AtomicInteger type; + public long getValue() { - return value; + return value.get(); } public void setValue(long newValue) { - value = newValue; + value.set(newValue); } public TUnit getType() { - return type; + return TUnit.findByValue(type.get()); } public void setType(TUnit type) { - this.type = type; + this.type.set(type.getValue()); } public Counter(TUnit type, long value) { - this.type = type; - this.value = value; + this.value = new AtomicLong(value); + this.type = new AtomicInteger(type.getValue()); } } diff --git a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 047ab76f..f5f359f9 100644 --- a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -39,18 +39,24 @@ import java.util.Map; import java.util.Set; +/** + * It is accessed by two kinds of thread, one is to create this RuntimeProfile + * , named 'query thread', the other is to call + * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}. + */ public class RuntimeProfile { private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class); private static String ROOT_COUNTER = ""; private Counter counterTotalTime; private double localTimePercent; - + private Map<String, String> infoStrings = Maps.newHashMap(); private List<String> infoStringsDisplayOrder = Lists.newArrayList(); - private Map<String, Counter> counterMap = Maps.newHashMap(); - private Map<String, Set<String> > childCounterMap = Maps.newHashMap(); + // It will be hold by other thread. + private Map<String, Counter> counterMap = Maps.newConcurrentMap(); + private Map<String, Set<String> > childCounterMap = Maps.newHashMap(); private Map<String, RuntimeProfile> childMap = Maps.newHashMap(); private List<Pair<RuntimeProfile, Boolean>> childList = Lists.newArrayList(); @@ -58,7 +64,7 @@ public RuntimeProfile(String name) { this(); - setName(name); + this.name = name; } public RuntimeProfile() { @@ -79,6 +85,10 @@ public Counter getCounterTotalTime() { return childList; } + public Map<String, RuntimeProfile> getChildMap () { + return childMap; + } + public Counter addCounter(String name, TUnit type, String parentCounterName) { Counter counter = this.counterMap.get(name); if (counter != null) { @@ -176,7 +186,7 @@ private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) { // 2. Info Strings // 3. Counters // 4. Children - public void prettyPrint(StringBuilder builder, String prefix) { + public void prettyPrint(StringBuilder builder, String prefix) { Counter counter = this.counterMap.get("TotalTime"); Preconditions.checkState(counter != null); // 1. profile name @@ -335,7 +345,7 @@ public void computeTimeInProfile(long total) { } // from bigger to smaller - public void sortChildren() { + public void sortChildren() { Collections.sort(this.childList, new Comparator<Pair<RuntimeProfile, Boolean>>() { @Override public int compare(Pair<RuntimeProfile, Boolean> profile1, Pair<RuntimeProfile, Boolean> profile2) @@ -356,11 +366,12 @@ public void addInfoString(String key, String value) { this.infoStrings.put(key, value); } } - + public void setName(String name) { this.name = name; } - + + // Returns the value to which the specified key is mapped; // or null if this map contains no mapping for the key. public String getInfoString(String key) { diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 60337327..252a9984 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -477,6 +477,7 @@ public void exec() throws Exception { } profileFragmentId += 1; } + attachInstanceProfileToFragmentProfile(); } finally { unlock(); } @@ -1415,22 +1416,6 @@ public void endProfile() { } } - for (int i = 0; i < backendExecStates.size(); ++i) { - if (backendExecStates.get(i) == null) { - continue; - } - BackendExecState backendExecState = backendExecStates.get(i); - backendExecState.profile().computeTimeInProfile(); - - int profileFragmentId = backendExecState.profileFragmentId(); - if (profileFragmentId < 0 || profileFragmentId > fragmentProfile.size()) { - LOG.error("profileFragmentId " + profileFragmentId - + " should be in [0," + fragmentProfile.size() + ")"); - return; - } - fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile()); - } - for (int i = 1; i < fragmentProfile.size(); ++i) { fragmentProfile.get(i).sortChildren(); } @@ -1718,4 +1703,22 @@ public PlanFragment fragment() { } return result; } + + private void attachInstanceProfileToFragmentProfile() { + for (int i = 0; i < backendExecStates.size(); ++i) { + if (backendExecStates.get(i) == null) { + continue; + } + BackendExecState backendExecState = backendExecStates.get(i); + backendExecState.profile().computeTimeInProfile(); + + int profileFragmentId = backendExecState.profileFragmentId(); + if (profileFragmentId < 0 || profileFragmentId > fragmentProfile.size()) { + LOG.error("profileFragmentId " + profileFragmentId + + " should be in [0," + fragmentProfile.size() + ")"); + return; + } + fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile()); + } + } } diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index cf8c3615..8d68667d 100644 --- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -84,8 +84,9 @@ public void unregisterQuery(TUniqueId queryId) { .sql(info.getSql()) .user(context.getQualifiedUser()) .connId(String.valueOf(context.getConnectionId())) - .db(context.getDatabase()).fragmentInstanceInfos(info.getCoord() - .getFragmentInstanceInfos()).build(); + .db(context.getDatabase()) + .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) + .profile(info.getCoord().getQueryProfile()).build(); querySet.put(queryIdStr, item); } return querySet; @@ -95,7 +96,6 @@ public void unregisterQuery(TUniqueId queryId) { public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) { LOG.info("ReportExecStatus(): fragment_instance_id=" + DebugUtil.printId(params.fragment_instance_id) + ", query id=" + DebugUtil.printId(params.query_id) + " params=" + params); - final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { @@ -114,7 +114,6 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) } public static final class QueryInfo { - private final ConnectContext connectContext; private final Coordinator coord; private final String sql; diff --git a/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index f2c72d78..247ce0a0 100644 --- a/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -31,16 +32,19 @@ private final String db; private final String connId; private final long queryStartTime; - private List<FragmentInstanceInfo> fragmentInstanceInfos; + private final List<FragmentInstanceInfo> fragmentInstanceInfos; + // root query profile + private final RuntimeProfile queryProfile; private QueryStatisticsItem(Builder builder) { this.queryId = builder.queryId; this.user = builder.user; this.sql = builder.sql; this.db = builder.db; + this.connId = builder.connId; this.queryStartTime = builder.queryStartTime; this.fragmentInstanceInfos = builder.fragmentInstanceInfos; - this.connId = builder.connId; + this.queryProfile = builder.queryProfile; } public String getDb() { @@ -72,6 +76,10 @@ public String getQueryId() { return fragmentInstanceInfos; } + public RuntimeProfile getQueryProfile() { + return queryProfile; + } + public static final class Builder { private String queryId; private String db; @@ -80,6 +88,7 @@ public String getQueryId() { private String connId; private long queryStartTime; private List<FragmentInstanceInfo> fragmentInstanceInfos; + private RuntimeProfile queryProfile; public Builder() { fragmentInstanceInfos = Lists.newArrayList(); @@ -120,6 +129,11 @@ public Builder fragmentInstanceInfos(List<FragmentInstanceInfo> infos) { return this; } + public Builder profile(RuntimeProfile profile) { + this.queryProfile = profile; + return this; + } + public QueryStatisticsItem build() { initDefaultValue(this); return new QueryStatisticsItem(this); @@ -145,6 +159,10 @@ private void initDefaultValue(Builder builder) { if (connId == null) { builder.connId = ""; } + + if (queryProfile == null) { + queryProfile = new RuntimeProfile(""); + } } } diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5f9af92a..53b6c023 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -93,7 +93,6 @@ private String originStmt; private StatementBase parsedStmt; private Analyzer analyzer; - private boolean isRegisterQuery = false; private RuntimeProfile profile; private RuntimeProfile summaryProfile; private volatile Coordinator coord = null; @@ -255,6 +254,8 @@ public void execute() throws Exception { LOG.warn("errors when abort txn", abortTxnException); } throw t; + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } } else if (parsedStmt instanceof DdlStmt) { handleDdlStmt(); @@ -287,10 +288,6 @@ public void execute() throws Exception { // ignore kill stmt execute err(not monitor it) context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } - } finally { - if (isRegisterQuery) { - QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); - } } } @@ -533,7 +530,6 @@ private void handleQueryStmt() throws Exception { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt, coord)); - isRegisterQuery = true; coord.exec(); @@ -587,7 +583,6 @@ private void handleInsertStmt() throws Exception { coord = new Coordinator(context, analyzer, planner); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); - isRegisterQuery = true; coord.exec(); diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 7b14afc8..76da014b 100644 --- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -142,12 +142,11 @@ private synchronized PBackendService getProxy(TNetworkAddress address) { } } - - public Future<PFetchFragmentExecInfosResult> fetchFragmentExecInfosAsync( - TNetworkAddress address, PFetchFragmentExecInfoRequest request) throws RpcException { + public Future<PTriggerProfileReportResult> triggerProfileReportAsync( + TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException { try { final PBackendService service = getProxy(address); - return service.fetchFragmentExecInfosAsync(request); + return service.triggerProfileReport(request); } catch (Throwable e) { LOG.warn("fetch data catch a exception, address={}:{}", address.getHostname(), address.getPort(), e); diff --git a/fe/src/main/java/org/apache/doris/rpc/PBackendService.java b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java index 5fe54901..5050debd 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PBackendService.java +++ b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java @@ -35,7 +35,7 @@ attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 86400000) Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request); - @ProtobufRPC(serviceName = "PBackendService", methodName = "fetch_fragment_exec_infos", + @ProtobufRPC(serviceName = "PBackendService", methodName = "trigger_profile_report", attachmentHandler = ThriftClientAttachmentHandler.class, onceTalkTimeout = 10000) - Future<PFetchFragmentExecInfosResult> fetchFragmentExecInfosAsync(PFetchFragmentExecInfoRequest request); + Future<PTriggerProfileReportResult> triggerProfileReport(PTriggerProfileReportRequest request); } diff --git a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java b/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java deleted file mode 100644 index 8f7bab3e..00000000 --- a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.rpc; - -import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; -import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; - -@ProtobufClass -public class PFragmentExecInfo { - @Protobuf(order = 1, required = true) - public PUniqueId instanceId; - @Protobuf(order = 2, required = true) - public int execStatus; - @Protobuf(order = 3, required = false) - public int planNodeType; - @Protobuf(order = 4, required = false) - public long rowsCount; -} diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java similarity index 76% rename from fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java rename to fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java index 78ad30a2..3c948a63 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java +++ b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java @@ -18,20 +18,23 @@ package org.apache.doris.rpc; import com.baidu.bjf.remoting.protobuf.FieldType; -import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; +import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; +import com.google.common.collect.Lists; import java.util.List; @ProtobufClass -public class PFetchFragmentExecInfoRequest extends AttachmentRequest { +public class PTriggerProfileReportRequest extends AttachmentRequest { - public PFetchFragmentExecInfoRequest() { + @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false) + List<PUniqueId> instanceIds; + + public PTriggerProfileReportRequest() { } - public PFetchFragmentExecInfoRequest(List<PUniqueId> finstIds) { - this.finstIds = finstIds; + public PTriggerProfileReportRequest(List<PUniqueId> instanceIds) { + this.instanceIds = Lists.newArrayList(); + this.instanceIds.addAll(instanceIds); } - @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false) - public List<PUniqueId> finstIds; } diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java similarity index 81% rename from fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java rename to fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java index 1d0b757e..80e8c6e1 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java +++ b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java @@ -17,16 +17,14 @@ package org.apache.doris.rpc; -import com.baidu.bjf.remoting.protobuf.FieldType; import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; -import java.util.List; - @ProtobufClass -public class PFetchFragmentExecInfosResult { +public class PTriggerProfileReportResult { @Protobuf(order = 1, required = true) public PStatus status; - @Protobuf(fieldType = FieldType.OBJECT, order = 2, required = false) - public List<PFragmentExecInfo> execInfos; + + public PTriggerProfileReportResult() { + } } diff --git a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java index c93db47c..1eea967e 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java +++ b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java @@ -19,10 +19,12 @@ import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TUniqueId; @ProtobufClass public class PUniqueId { + public PUniqueId() {} public PUniqueId(TUniqueId tid) { hi = tid.getHi(); @@ -33,4 +35,34 @@ public PUniqueId(TUniqueId tid) { public long hi; @Protobuf(order = 2, required = true) public long lo; + + @Override + public int hashCode() { + int result = 16; + result = 31 * result + (int)(hi ^ (hi >>> 32)); + result = 31 * result + (int)(lo ^ (lo >>> 32)); + return result; + } + + @Override + public String toString() { + return DebugUtil.printId(this); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof PUniqueId) || obj == null) { + return false; + } + + final PUniqueId other = (PUniqueId)obj; + if (hi != other.hi || lo != other.lo) { + return false; + } + return true; + } } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index aaeb4bd7..71782934 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -131,26 +131,12 @@ message PFetchDataResult { optional bool eos = 3; }; -message PFetchFragmentExecInfoRequest { - repeated PUniqueId finst_id = 1; -}; - -enum PFragmentExecStatus { - RUNNING = 0; - WAIT = 1; - FINISHED = 2; -} - -message PFragmentExecInfo { - required PUniqueId finst_id = 1; - required int32 exec_status = 2; - optional int32 plan_node_type = 3; // same as TPlanNodeType - optional int64 rows_count = 4; +message PTriggerProfileReportRequest { + repeated PUniqueId instance_ids = 1; } -message PFetchFragmentExecInfosResult { +message PTriggerProfileReportResult { required PStatus status = 1; - repeated PFragmentExecInfo fragment_exec_info = 2; } service PBackendService { @@ -161,7 +147,7 @@ service PBackendService { rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); - rpc fetch_fragment_exec_infos(PFetchFragmentExecInfoRequest) returns (PFetchFragmentExecInfosResult); + rpc trigger_profile_report(PTriggerProfileReportRequest) returns (PTriggerProfileReportResult); // NOTE(zc): If you want to add new method here, // you MUST add same method to palo_internal_service.proto }; diff --git a/gensrc/proto/palo_internal_service.proto b/gensrc/proto/palo_internal_service.proto index 5a91e82b..da9faf69 100644 --- a/gensrc/proto/palo_internal_service.proto +++ b/gensrc/proto/palo_internal_service.proto @@ -33,5 +33,5 @@ service PInternalService { rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns (doris.PTabletWriterOpenResult); rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns (doris.PTabletWriterAddBatchResult); rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns (doris.PTabletWriterCancelResult); - rpc fetch_fragment_exec_infos(doris.PFetchFragmentExecInfoRequest) returns (doris.PFetchFragmentExecInfosResult); + rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns (doris.PTriggerProfileReportResult); }; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org