chenhao7253886 closed pull request #497: Support io and cpu indicates for 
current query
URL: https://github.com/apache/incubator-doris/pull/497
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/exec/analytic_eval_node.cpp 
b/be/src/exec/analytic_eval_node.cpp
index b807152f..99aa5e8b 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -153,7 +153,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
     _mem_pool.reset(new MemPool(mem_tracker()));
 
     _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
-
+    _process_rows_counter = ADD_COUNTER(runtime_profile(), "ProcessRows", 
TUnit::UNIT);
     DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size());
 
     for (int i = 0; i < _evaluators.size(); ++i) {
@@ -236,7 +236,7 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
 
     while (!_input_eos && _prev_input_row == NULL) {
         RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), 
&_input_eos));
-
+        COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows()); 
         if (_curr_child_batch->num_rows() > 0) {
             _prev_input_row = _curr_child_batch->get_row(0);
             process_child_batches(state);
@@ -612,6 +612,7 @@ Status 
AnalyticEvalNode::process_child_batches(RuntimeState* state) {
         _prev_child_batch->reset();
         _prev_child_batch.swap(_curr_child_batch);
         RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), 
&_input_eos));
+        COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows());
     }
 
     return Status::OK;
diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h
index 882b3d39..6b92c37e 100644
--- a/be/src/exec/analytic_eval_node.h
+++ b/be/src/exec/analytic_eval_node.h
@@ -327,6 +327,8 @@ class AnalyticEvalNode : public ExecNode {
 
     // Time spent processing the child rows.
     RuntimeProfile::Counter* _evaluation_timer;
+
+    RuntimeProfile::Counter* _process_rows_counter;
 };
 
 }
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 840c875f..4c7828c3 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -72,6 +72,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
             _num_senders, config::exchg_node_buffer_size_bytes,
             state->runtime_profile(), _is_merging);
     if (_is_merging) {
+        _merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", 
TUnit::UNIT);
         RETURN_IF_ERROR(_sort_exec_exprs.prepare(
                     state, _row_descriptor, _row_descriptor, 
expr_mem_tracker()));
         // AddExprCtxsToFree(_sort_exec_exprs);
@@ -213,7 +214,8 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, 
RowBatch* output_batc
     state->set_query_state_for_wait();
     RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
     state->set_query_state_for_running();
-
+    //TODO chenhao, count only one instance lost others.
+    COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows()); 
     while ((_num_rows_skipped < _offset)) {
         _num_rows_skipped += output_batch->num_rows();
         // Throw away rows in the output batch until the offset is skipped.
@@ -228,6 +230,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, 
RowBatch* output_batc
             break;
         }
         RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
+        COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows());
     }
 
     _num_rows_returned += output_batch->num_rows();
diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h
index 8b847895..6450d39f 100644
--- a/be/src/exec/exchange_node.h
+++ b/be/src/exec/exchange_node.h
@@ -26,6 +26,7 @@ namespace doris {
 
 class RowBatch;
 class DataStreamRecvr;
+class RuntimeProfile;
 
 // Receiver node for data streams. The data stream receiver is created in 
Prepare()
 // and closed in Close().
@@ -106,6 +107,8 @@ class ExchangeNode : public ExecNode {
 
     // Number of rows skipped so far.
     int64_t _num_rows_skipped;
+
+    RuntimeProfile::Counter* _merge_rows_counter;
 };
 
 };
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 8b4f8d15..37b6b3a2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -186,7 +186,6 @@ Status ExecNode::prepare(RuntimeState* state) {
     _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), 
state->instance_mem_tracker()));
     _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get()));
     _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get()));
-
     // TODO chenhao
     RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc(), 
expr_mem_tracker()));
     // TODO(zc):
diff --git a/be/src/exec/new_partitioned_aggregation_node.cc 
b/be/src/exec/new_partitioned_aggregation_node.cc
index da063f92..72ba8876 100644
--- a/be/src/exec/new_partitioned_aggregation_node.cc
+++ b/be/src/exec/new_partitioned_aggregation_node.cc
@@ -44,7 +44,6 @@
 #include "runtime/tuple_row.h"
 #include "runtime/tuple.h"
 #include "udf/udf_internal.h"
-#include "util/runtime_profile.h"
 
 #include "gen_cpp/Exprs_types.h"
 #include "gen_cpp/PlanNodes_types.h"
@@ -201,6 +200,7 @@ Status NewPartitionedAggregationNode::prepare(RuntimeState* 
state) {
       ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
   largest_partition_percent_ =
       runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", 
TUnit::UNIT);
+  _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", 
TUnit::UNIT);
   if (is_streaming_preagg_) {
     runtime_profile()->append_exec_option("Streaming Preaggregation");
     streaming_timer_ = ADD_TIMER(runtime_profile(), "StreamingTime");
@@ -308,7 +308,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState* 
state) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(_children[0]->get_next(state, &batch, &eos));
-
+    COUNTER_UPDATE(_build_rows_counter, batch.num_rows());
     if (UNLIKELY(VLOG_ROW_IS_ON)) {
       for (int i = 0; i < batch.num_rows(); ++i) {
         TupleRow* row = batch.get_row(i);
@@ -532,7 +532,6 @@ Status 
NewPartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     RETURN_IF_ERROR(QueryMaintenance(state));
 
     RETURN_IF_ERROR(child(0)->get_next(state, child_batch_.get(), 
&child_eos_));
-
     SCOPED_TIMER(streaming_timer_);
 
     int remaining_capacity[PARTITION_FANOUT];
diff --git a/be/src/exec/new_partitioned_aggregation_node.h 
b/be/src/exec/new_partitioned_aggregation_node.h
index ac30c298..227cbed4 100644
--- a/be/src/exec/new_partitioned_aggregation_node.h
+++ b/be/src/exec/new_partitioned_aggregation_node.h
@@ -272,6 +272,8 @@ class NewPartitionedAggregationNode : public ExecNode {
   /// Time spent processing the child rows
   RuntimeProfile::Counter* build_timer_;
 
+  RuntimeProfile::Counter* _build_rows_counter;
+
   /// Total time spent resizing hash tables.
   RuntimeProfile::Counter* ht_resize_timer_;
 
diff --git a/be/src/exec/new_partitioned_aggregation_node_ir.cc 
b/be/src/exec/new_partitioned_aggregation_node_ir.cc
index 4a5c00cd..57afbc1d 100644
--- a/be/src/exec/new_partitioned_aggregation_node_ir.cc
+++ b/be/src/exec/new_partitioned_aggregation_node_ir.cc
@@ -23,6 +23,7 @@
 #include "runtime/buffered_tuple_stream3.inline.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
+#include "util/runtime_profile.h"
 
 using namespace doris;
 
@@ -238,7 +239,7 @@ bool NewPartitionedAggregationNode::TryAddToHashTable(
       return false;
     }
   }
-
+  COUNTER_UPDATE(_build_rows_counter, 1);
   UpdateTuple(partition->agg_fn_evals.data(), intermediate_tuple, in_row);
   return true;
 }
diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp
index 71a42c85..3b816e9e 100644
--- a/be/src/exec/sort_node.cpp
+++ b/be/src/exec/sort_node.cpp
@@ -19,7 +19,7 @@
 #include "exec/sort_exec_exprs.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-// #include "runtime/sorted_run_merger.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 
@@ -49,6 +49,7 @@ Status SortNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
     RETURN_IF_ERROR(_sort_exec_exprs.prepare(
             state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker()));
+    _sort_rows_counter = ADD_COUNTER(runtime_profile(), "SortRows", 
TUnit::UNIT);
     return Status::OK;
 }
 
@@ -144,6 +145,7 @@ Status SortNode::sort_input(RuntimeState* state) {
     do {
         batch.reset();
         RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
+        COUNTER_UPDATE(_sort_rows_counter, batch.num_rows());
         RETURN_IF_ERROR(_sorter->add_batch(&batch));
         RETURN_IF_CANCELLED(state);
         RETURN_IF_LIMIT_EXCEEDED(state);
diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h
index e1de714a..68dcfd8e 100644
--- a/be/src/exec/sort_node.h
+++ b/be/src/exec/sort_node.h
@@ -70,6 +70,7 @@ class SortNode : public ExecNode {
     std::vector<bool> _nulls_first;
     boost::scoped_ptr<MemPool> _tuple_pool;
 
+    RuntimeProfile::Counter* _sort_rows_counter;
 };
 
 }
diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp
index 5e07c3a3..a1494cc1 100644
--- a/be/src/exec/union_node.cpp
+++ b/be/src/exec/union_node.cpp
@@ -77,7 +77,7 @@ Status UnionNode::prepare(RuntimeState* state) {
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
     _codegend_union_materialize_batch_fns.resize(_child_expr_lists.size());
-
+    _materialize_rows_counter = ADD_COUNTER(runtime_profile(), 
"MaterializeRows", TUnit::UNIT);
     // Prepare const expr lists.
     for (const vector<ExprContext*>& exprs : _const_expr_lists) {
         RETURN_IF_ERROR(Expr::prepare(exprs, state, row_desc(), 
expr_mem_tracker()));
@@ -210,6 +210,7 @@ Status UnionNode::get_next_materialized(RuntimeState* 
state, RowBatch* row_batch
             // The first batch from each child is always fetched here.
             RETURN_IF_ERROR(child(_child_idx)->get_next(
                     state, _child_batch.get(), &_child_eos));
+            COUNTER_UPDATE(_materialize_rows_counter, 
_child_batch->num_rows());
         }
 
         while (!row_batch->at_capacity()) {
@@ -224,6 +225,7 @@ Status UnionNode::get_next_materialized(RuntimeState* 
state, RowBatch* row_batch
                 // All batches except the first batch from each child are 
fetched here.
                 RETURN_IF_ERROR(child(_child_idx)->get_next(
                         state, _child_batch.get(), &_child_eos));
+                COUNTER_UPDATE(_materialize_rows_counter, 
_child_batch->num_rows());
                 // If we fetched an empty batch, go back to the beginning of 
this while loop, and
                 // try again.
                 if (_child_batch->num_rows() == 0) continue;
diff --git a/be/src/exec/union_node.h b/be/src/exec/union_node.h
index b12fb597..7daf2a62 100644
--- a/be/src/exec/union_node.h
+++ b/be/src/exec/union_node.h
@@ -100,6 +100,8 @@ class UnionNode : public ExecNode {
     /// to -1 if no child needs to be closed.
     int _to_close_child_idx;
 
+    RuntimeProfile::Counter* _materialize_rows_counter;
+
     /// END: Members that must be Reset()
     /////////////////////////////////////////
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 42efed7f..78358ea3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -314,7 +314,6 @@ void FragmentExecState::coordinator_callback(
                 _executor.cancel();
                 return;
             }
-
             coord->reportExecStatus(res, params);
         }
 
@@ -501,41 +500,32 @@ void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
 }
 
-
-Status FragmentMgr::fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* 
result,
-                                              const 
PFetchFragmentExecInfoRequest* request) {
-    int fragment_id_list_size = request->finst_id_size();
-    for (int i = 0; i < fragment_id_list_size; i++) {
-        const PUniqueId& p_fragment_id = request->finst_id(i);
-        TUniqueId id;
-        id.__set_hi(p_fragment_id.hi());
-        id.__set_lo(p_fragment_id.lo()); 
-        PFragmentExecInfo* info = result->add_fragment_exec_info();
-        PUniqueId* finst_id = info->mutable_finst_id();
-        finst_id->set_hi(p_fragment_id.hi());
-        finst_id->set_lo(p_fragment_id.lo()); 
-
-        bool is_running = false; 
-        std::lock_guard<std::mutex> lock(_lock);
-        {
-            auto iter = _fragment_map.find(id);
-            if (iter == _fragment_map.end()) {
-                info->set_exec_status(PFragmentExecStatus::FINISHED);
-                continue;
+Status FragmentMgr::trigger_profile_report(const PTriggerProfileReportRequest* 
request) {
+    if (request->instance_ids_size() > 0) {
+        for (int i = 0; i < request->instance_ids_size(); i++) {
+            const PUniqueId& p_fragment_id = request->instance_ids(i);
+            TUniqueId id;
+            id.__set_hi(p_fragment_id.hi());
+            id.__set_lo(p_fragment_id.lo());
+            {
+                std::lock_guard<std::mutex> lock(_lock);
+                auto iter = _fragment_map.find(id);
+                if (iter != _fragment_map.end()) {
+                    iter->second->executor()->report_profile_once();
+                }
             }
-            is_running = 
iter->second->executor()->runtime_state()->is_running();
         }
-
-        if (is_running) {
-            info->set_exec_status(PFragmentExecStatus::RUNNING);
-        } else {
-            info->set_exec_status(PFragmentExecStatus::WAIT);
+    } else {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _fragment_map.begin();
+        for (; iter != _fragment_map.end(); iter++) {
+            iter->second->executor()->report_profile_once();
         }
     }
-
-    return Status::OK;       
+    return Status::OK;
 }
 
+
 void FragmentMgr::debug(std::stringstream& ss) {
     // Keep things simple
     std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c911fe30..b5fc075c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -54,14 +54,14 @@ class FragmentMgr : public RestMonitorIface {
     // TODO(zc): report this is over
     Status exec_plan_fragment(const TExecPlanFragmentParams& params, 
FinishCallback cb);
 
-    Status fetch_fragment_exec_infos(PFetchFragmentExecInfosResult* result,
-                                     const PFetchFragmentExecInfoRequest* 
request);
-
     Status cancel(const TUniqueId& fragment_id);
 
     void cancel_worker();
 
     virtual void debug(std::stringstream& ss);
+
+    Status trigger_profile_report(const PTriggerProfileReportRequest* request);
+
 private:
     void exec_actual(std::shared_ptr<FragmentExecState> exec_state,
                      FinishCallback cb);
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index d216f9ac..1e769ee6 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -260,7 +260,7 @@ Status PlanFragmentExecutor::open() {
     // may block
     // TODO: if no report thread is started, make sure to send a final profile
     // at end, otherwise the coordinator hangs in case we finish w/ an error
-    if (_is_report_success && !_report_status_cb.empty() && 
config::status_report_interval > 0) {
+    if (!_report_status_cb.empty() && config::status_report_interval > 0) {
         boost::unique_lock<boost::mutex> l(_report_thread_lock);
         _report_thread = boost::thread(&PlanFragmentExecutor::report_profile, 
this);
         // make sure the thread started up, otherwise report_profile() might 
get into a race
@@ -366,17 +366,21 @@ void PlanFragmentExecutor::report_profile() {
         boost::get_system_time() + 
boost::posix_time::seconds(report_fragment_offset);
     // We don't want to wait longer than it takes to run the entire fragment.
     _stop_report_thread_cv.timed_wait(l, timeout);
-
+    bool is_report_profile_interval = _is_report_success && 
config::status_report_interval > 0;
     while (_report_thread_active) {
-        boost::system_time timeout =
-            boost::get_system_time() + 
boost::posix_time::seconds(config::status_report_interval);
-
-        // timed_wait can return because the timeout occurred or the condition 
variable
-        // was signaled.  We can't rely on its return value to distinguish 
between the
-        // two cases (e.g. there is a race here where the wait timed out but 
before grabbing
-        // the lock, the condition variable was signaled).  Instead, we will 
use an external
-        // flag, _report_thread_active, to coordinate this.
-        _stop_report_thread_cv.timed_wait(l, timeout);
+        if (is_report_profile_interval) {
+            boost::system_time timeout =
+                boost::get_system_time() + 
boost::posix_time::seconds(config::status_report_interval);
+            // timed_wait can return because the timeout occurred or the 
condition variable
+            // was signaled.  We can't rely on its return value to distinguish 
between the
+            // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
+            // the lock, the condition variable was signaled).  Instead, we 
will use an external
+            // flag, _report_thread_active, to coordinate this.
+            _stop_report_thread_cv.timed_wait(l, timeout);
+        } else {
+            // Artificial triggering, such as show proc "/current_queries".
+            _stop_report_thread_cv.wait(l);
+        }
 
         if (VLOG_FILE_IS_ON) {
             VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : 
" ")
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index fe342c6c..299191b5 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -135,6 +135,11 @@ class PlanFragmentExecutor {
     DataSink* get_sink() {
         return _sink.get();
     }
+
+    void report_profile_once() {
+        _stop_report_thread_cv.notify_one();
+    }
+
 private:
     ExecEnv* _exec_env;  // not owned
     ExecNode* _plan;  // lives in _runtime_state->obj_pool()
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 3b7080cc..2f4b3f34 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -186,17 +186,14 @@ void PInternalServiceImpl<T>::fetch_data(
 }
 
 template<typename T>
-void PInternalServiceImpl<T>::fetch_fragment_exec_infos(
+void PInternalServiceImpl<T>::trigger_profile_report(
         google::protobuf::RpcController* controller,
-        const PFetchFragmentExecInfoRequest* request,
-        PFetchFragmentExecInfosResult* result,
+        const PTriggerProfileReportRequest* request,
+        PTriggerProfileReportResult* result,
         google::protobuf::Closure* done) {
     brpc::ClosureGuard closure_guard(done);
-    auto status = _exec_env->fragment_mgr()->fetch_fragment_exec_infos(result, 
request);
-    if (!status.ok()) {
-        LOG(WARNING) << "fetch fragment exec status failed:" << 
status.get_error_msg();
-    }
-    status.to_protobuf(result->mutable_status());
+    auto st = _exec_env->fragment_mgr()->trigger_profile_report(request);
+    st.to_protobuf(result->mutable_status());
 }
 
 template class PInternalServiceImpl<PBackendService>;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 5cc5cb6f..c6406ee5 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -74,10 +74,10 @@ class PInternalServiceImpl : public T {
                               PTabletWriterCancelResult* response,
                               google::protobuf::Closure* done) override;
 
-    void fetch_fragment_exec_infos(
+    void trigger_profile_report(
         google::protobuf::RpcController* controller,
-        const PFetchFragmentExecInfoRequest* request,
-        PFetchFragmentExecInfosResult* result,
+        const PTriggerProfileReportRequest* request,
+        PTriggerProfileReportResult* result,
         google::protobuf::Closure* done) override;
 
 private:
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
index 2b39b87e..21ffef73 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
@@ -17,44 +17,29 @@
 
 package org.apache.doris.common.proc;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.qe.QueryStatisticsItem;
 import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.rpc.BackendServiceProxy;
-import org.apache.doris.rpc.PFetchFragmentExecInfoRequest;
-import org.apache.doris.rpc.PFetchFragmentExecInfosResult;
-import org.apache.doris.rpc.PFragmentExecInfo;
-import org.apache.doris.rpc.PUniqueId;
-import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /*
  * show proc "/current_queries/{query_id}/fragments"
  */
 public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
-    private static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
+    private static final Logger LOG = 
LogManager.getLogger(CurrentQueryFragmentProcNode.class);
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("FragmentId").add("InstanceId").add("Host").add("ExecState").build();
+            .add("FragmentId").add("InstanceId").add("Host")
+            .add("IO").add("CPU").build();
     private QueryStatisticsItem item;
 
     public CurrentQueryFragmentProcNode(QueryStatisticsItem item) {
@@ -84,89 +69,21 @@ private TNetworkAddress toBrpcHost(TNetworkAddress host) 
throws AnalysisExceptio
     }
 
     private ProcResult requestFragmentExecInfos() throws AnalysisException {
-
-        // create request and remove redundant rpc
-        final Map<TNetworkAddress, Request> requestMap = Maps.newHashMap();
-        for (QueryStatisticsItem.FragmentInstanceInfo info : 
item.getFragmentInstanceInfos()) {
-            final TNetworkAddress brpcNetAddress;
-            try {
-                brpcNetAddress = toBrpcHost(info.getAddress());
-            } catch (Exception e) {
-                LOG.warn(e.getMessage());
-                throw new AnalysisException(e.getMessage());
-            }
-            Request request = requestMap.get(brpcNetAddress);
-            if (request == null) {
-                request = new Request(brpcNetAddress);
-                requestMap.put(brpcNetAddress, request);
-            }
-            request.addInstanceId(info.getFragmentId(), new 
PUniqueId(info.getInstanceId()));
-        }
-
-        // send request
-        final List<Pair<Request, Future<PFetchFragmentExecInfosResult>>> 
futures = Lists.newArrayList();
-        for (TNetworkAddress address : requestMap.keySet()) {
-            final Request request = requestMap.get(address);
-            final PFetchFragmentExecInfoRequest pbRequest =
-                    new PFetchFragmentExecInfoRequest(request.getInstanceId());
-            try {
-                futures.add(Pair.create(request, 
BackendServiceProxy.getInstance().
-                        fetchFragmentExecInfosAsync(address, pbRequest)));
-            } catch (RpcException e) {
-                throw new AnalysisException("exec rpc error");
-            }
-        }
-
+        final CurrentQueryInfoProvider provider = new 
CurrentQueryInfoProvider();
+        final Collection<CurrentQueryInfoProvider.InstanceConsumption> 
instanceConsumptions
+                = provider.getQueryInstanceConsumption(item);
         final List<List<String>> sortedRowDatas = Lists.newArrayList();
-        // get result
-        for (Pair<Request, Future<PFetchFragmentExecInfosResult>> pair : 
futures) {
-            TStatusCode code;
-            String errMsg = null;
-            try {
-                final PFetchFragmentExecInfosResult fragmentExecInfoResult
-                        = pair.second.get(10, TimeUnit.SECONDS);
-                code = 
TStatusCode.findByValue(fragmentExecInfoResult.status.code);
-                if (fragmentExecInfoResult.status.msgs != null
-                        && !fragmentExecInfoResult.status.msgs.isEmpty()) {
-                    errMsg = fragmentExecInfoResult.status.msgs.get(0);
-                }
-
-                if (errMsg == null) {
-                    for (PFragmentExecInfo info : 
fragmentExecInfoResult.execInfos) {
-                        final List<String> rowData = Lists.newArrayList();
-                        
rowData.add(pair.first.getFragmentId(info.instanceId).toString());
-                        rowData.add(DebugUtil.printId(info.instanceId));
-                        rowData.add(pair.first.getAddress().getHostname());
-                        rowData.add(getFragmentExecState(info.execStatus));
-                        sortedRowDatas.add(rowData);
-                    }
-                }
-            } catch (ExecutionException e) {
-                LOG.warn("catch a execute exception", e);
-                code = TStatusCode.THRIFT_RPC_ERROR;
-            } catch (InterruptedException e) {
-                LOG.warn("catch a interrupt exception", e);
-                code = TStatusCode.INTERNAL_ERROR;
-            } catch (TimeoutException e) {
-                LOG.warn("catch a timeout exception", e);
-                code = TStatusCode.TIMEOUT;
-            }
-
-            if (code != TStatusCode.OK) {
-                switch (code) {
-                    case TIMEOUT:
-                        errMsg = "query timeout";
-                        break;
-                    case THRIFT_RPC_ERROR:
-                        errMsg = "rpc failed";
-                        break;
-                    default:
-                        errMsg = "exec rpc error";
-                }
-                throw new AnalysisException(errMsg);
-            }
+        for (CurrentQueryInfoProvider.InstanceConsumption instanceConsumption :
+                instanceConsumptions) {
+            final List<String> rowData = Lists.newArrayList();
+            rowData.add(instanceConsumption.getFragmentId());
+            rowData.add(instanceConsumption.getInstanceId().toString());
+            rowData.add(instanceConsumption.getAddress().toString());
+            
rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption()));
+            
rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption()));
+            sortedRowDatas.add(rowData);
         }
-            
+
         // sort according to explain's fragment index
         sortedRowDatas.sort(new Comparator<List<String>>() {
             @Override
@@ -182,51 +99,4 @@ public int compare(List<String> l1, List<String> l2) {
         return result;
     }
 
-    private enum FragmentExecState {
-        RUNNING,
-        WAIT,
-        FINISHED,
-        NONE
-    }
-
-    private String getFragmentExecState(int i) {
-        if (i >= FragmentExecState.values().length) {
-            // can't run here
-            LOG.warn("Fetch uncorrect instance state.");
-            return FragmentExecState.NONE.toString();
-        }
-        return FragmentExecState.values()[i].toString();
-    }
-
-    private static class Request {
-        private final TNetworkAddress address;
-        private List<String> fragmentIds;
-        private List<PUniqueId> instanceIds;
-        private Map<String, String> instanceIdToFragmentId;
-
-        public Request(TNetworkAddress address) {
-            this.address = address;
-            this.fragmentIds = Lists.newArrayList();
-            this.instanceIds = Lists.newArrayList();
-            this.instanceIdToFragmentId = Maps.newHashMap();
-        }
-
-        public TNetworkAddress getAddress() {
-            return address;
-        }
-
-        public List<PUniqueId> getInstanceId() {
-            return instanceIds;
-        }
-
-        public void addInstanceId(String fragmentId, PUniqueId instanceId) {
-            this.fragmentIds.add(fragmentId);
-            this.instanceIds.add(instanceId);
-            this.instanceIdToFragmentId.put(DebugUtil.printId(instanceId), 
fragmentId.toString());
-        }
-
-        public String getFragmentId(PUniqueId id) {
-            return instanceIdToFragmentId.get(DebugUtil.printId(id));
-        }
-    }
 }
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
new file mode 100644
index 00000000..f18d9d13
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.proc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.Counter;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.qe.QueryStatisticsItem;
+import org.apache.doris.rpc.*;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Provide running query's PlanNode informations, IO consumption and CPU 
consumption.
+ */
+public class CurrentQueryInfoProvider {
+    private static final Logger LOG = 
LogManager.getLogger(CurrentQueryInfoProvider.class);
+
+    public CurrentQueryInfoProvider() {
+    }
+
+    /**
+     * Firstly send request to trigger profile to report for specified query 
and wait a while,
+     * Secondly get Counters from Coordinator's RuntimeProfile and return 
query's consumption.
+     *
+     * @param item
+     * @return
+     * @throws AnalysisException
+     */
+    public Consumption getQueryConsumption(QueryStatisticsItem item) throws 
AnalysisException {
+        triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false);
+        return new Consumption(item.getQueryProfile());
+    }
+
+    /**
+     * Same as getQueryConsumption, but this will cause BE to report all 
queries profile.
+     *
+     * @param items
+     * @return
+     * @throws AnalysisException
+     */
+    public Map<String, Consumption> 
getQueryConsumption(Collection<QueryStatisticsItem> items)
+            throws AnalysisException {
+        triggerReportAndWait(items, getWaitingTime(items.size()), true);
+        final Map<String, Consumption> queryConsumptions = Maps.newHashMap();
+        for (QueryStatisticsItem item : items) {
+            queryConsumptions.put(item.getQueryId(), new 
Consumption(item.getQueryProfile()));
+        }
+        return queryConsumptions;
+    }
+
+    /**
+     * Return query's instances consumption.
+     *
+     * @param item
+     * @return
+     * @throws AnalysisException
+     */
+    public Collection<InstanceConsumption> 
getQueryInstanceConsumption(QueryStatisticsItem item) throws AnalysisException {
+        triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false);
+        final Map<String, RuntimeProfile> instanceProfiles = 
collectInstanceProfile(item.getQueryProfile());
+        final List<InstanceConsumption> instanceConsumptions = 
Lists.newArrayList();
+        for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : 
item.getFragmentInstanceInfos()) {
+            final RuntimeProfile instanceProfile = 
instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId()));
+            Preconditions.checkNotNull(instanceProfile);
+            final InstanceConsumption consumption =
+                    new InstanceConsumption(
+                            instanceInfo.getFragmentId(),
+                            instanceInfo.getInstanceId(),
+                            instanceInfo.getAddress(),
+                            instanceProfile);
+            instanceConsumptions.add(consumption);
+        }
+        return instanceConsumptions;
+    }
+
+    /**
+     * Profile trees is query profile -> fragment profile -> instance profile 
....
+     * @param queryProfile
+     * @return instanceProfiles
+     */
+    private Map<String, RuntimeProfile> collectInstanceProfile(RuntimeProfile 
queryProfile) {
+        final Map<String, RuntimeProfile> instanceProfiles = Maps.newHashMap();
+        for (RuntimeProfile fragmentProfile : 
queryProfile.getChildMap().values()) {
+            for (Map.Entry<String, RuntimeProfile> entry: 
fragmentProfile.getChildMap().entrySet()) {
+                
Preconditions.checkState(instanceProfiles.put(parseInstanceId(entry.getKey()), 
entry.getValue()) == null);
+            }
+        }
+        return instanceProfiles;
+    }
+
+    /**
+     * Instance profile key is "Instance ${instance_id} (host=$host $port)"
+     * @param str
+     * @return
+     */
+    private String parseInstanceId(String str) {
+        final String[] elements = str.split(" ");
+        if (elements.length == 4) {
+            return elements[1];
+        } else {
+            Preconditions.checkState(false);
+            return "";
+        }
+    }
+
+    private long getWaitingTimeForSingleQuery() {
+        return getWaitingTime(1);
+    }
+
+    /**
+     * @param numOfQuery
+     * @return unit(ms)
+     */
+    private long getWaitingTime(int numOfQuery) {
+        final int oneQueryWaitingTime = 100;
+        final int allQueryMaxWaitingTime = 2000;
+        final int waitingTime = numOfQuery * oneQueryWaitingTime;
+        return waitingTime > allQueryMaxWaitingTime ? allQueryMaxWaitingTime : 
waitingTime;
+    }
+
+    private void triggerReportAndWait(QueryStatisticsItem item, long 
waitingTime, boolean allQuery)
+            throws AnalysisException {
+        final List<QueryStatisticsItem> items = Lists.newArrayList(item);
+        triggerReportAndWait(items, waitingTime, allQuery);
+    }
+
+    private void triggerReportAndWait(Collection<QueryStatisticsItem> items, 
long waitingTime, boolean allQuery)
+            throws AnalysisException {
+        triggerProfileReport(items, allQuery);
+        try {
+            Thread.currentThread().sleep(waitingTime);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    /**
+     * send report profile request.
+     * @param items
+     * @param allQuery true:all queries profile will be reported, 
false:specified queries profile will be reported.
+     * @throws AnalysisException
+     */
+    private void triggerProfileReport(Collection<QueryStatisticsItem> items, 
boolean allQuery) throws AnalysisException {
+        final Map<TNetworkAddress, Request> requests = Maps.newHashMap();
+        final Map<TNetworkAddress, TNetworkAddress> brpcAddresses = 
Maps.newHashMap();
+        for (QueryStatisticsItem item : items) {
+            for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : 
item.getFragmentInstanceInfos()) {
+                // use brpc address
+                TNetworkAddress brpcNetAddress = 
brpcAddresses.get(instanceInfo.getAddress());
+                if (brpcNetAddress == null) {
+                    try {
+                        brpcNetAddress = toBrpcHost(instanceInfo.getAddress());
+                        brpcAddresses.put(instanceInfo.getAddress(), 
brpcNetAddress);
+                    } catch (Exception e) {
+                        LOG.warn(e.getMessage());
+                        throw new AnalysisException(e.getMessage());
+                    }
+                }
+                // merge different requests
+                Request request = requests.get(brpcNetAddress);
+                if (request == null) {
+                    request = new Request(brpcNetAddress);
+                    requests.put(brpcNetAddress, request);
+                }
+                // specified query instance which will report.
+                if (!allQuery) {
+                    final PUniqueId pUId = new 
PUniqueId(instanceInfo.getInstanceId());
+                    request.addInstanceId(pUId);
+                }
+            }
+        }
+        recvResponse(sendRequest(requests));
+    }
+
+    private List<Pair<Request, Future<PTriggerProfileReportResult>>> 
sendRequest(
+            Map<TNetworkAddress, Request> requests) throws AnalysisException {
+        final List<Pair<Request, Future<PTriggerProfileReportResult>>> futures 
= Lists.newArrayList();
+        for (TNetworkAddress address : requests.keySet()) {
+            final Request request = requests.get(address);
+            final PTriggerProfileReportRequest pbRequest =
+                    new PTriggerProfileReportRequest(request.getInstanceIds());
+            try {
+                futures.add(Pair.create(request, 
BackendServiceProxy.getInstance().
+                        triggerProfileReportAsync(address, pbRequest)));
+            } catch (RpcException e) {
+                throw new AnalysisException("Sending request fails for query's 
execution informations.");
+            }
+        }
+        return futures;
+    }
+
+    private void recvResponse(List<Pair<Request, 
Future<PTriggerProfileReportResult>>> futures)
+            throws AnalysisException {
+        final String reasonPrefix = "Fail to receive result.";
+        for (Pair<Request, Future<PTriggerProfileReportResult>> pair : 
futures) {
+            try {
+                final PTriggerProfileReportResult result
+                        = pair.second.get(2, TimeUnit.SECONDS);
+                final TStatusCode code = 
TStatusCode.findByValue(result.status.code);
+                if (code != TStatusCode.OK) {
+                    String errMsg = "";
+                    if (result.status.msgs != null && 
!result.status.msgs.isEmpty()) {
+                        errMsg = result.status.msgs.get(0);
+                    }
+                    throw new AnalysisException(reasonPrefix + " backend:" + 
pair.first.getAddress()
+                    + " reason:" + errMsg);
+                }
+            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+                LOG.warn(reasonPrefix + " reason:" + e.getCause());
+                throw new AnalysisException(reasonPrefix);
+            }
+
+        }
+    }
+
+    private TNetworkAddress toBrpcHost(TNetworkAddress host) throws 
AnalysisException {
+        final Backend backend = 
Catalog.getCurrentSystemInfo().getBackendWithBePort(
+                host.getHostname(), host.getPort());
+        if (backend == null) {
+            throw new AnalysisException(new StringBuilder("Backend ")
+                    .append(host.getHostname())
+                    .append(":")
+                    .append(host.getPort())
+                    .append(" does not exist")
+                    .toString());
+        }
+        if (backend.getBrpcPort() < 0) {
+            throw new AnalysisException("BRPC port is't exist.");
+        }
+        return new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
+    }
+
+
+    public static class Consumption {
+        private final static String OLAP_SCAN_NODE = "OLAP_SCAN_NODE";
+        private final static String HASH_JOIN_NODE = "HASH_JOIN_NODE";
+        private final static String HASH_AGGREGATION_NODE = "AGGREGATION_NODE";
+        private final static String SORT_NODE = "SORT_NODE";
+        private final static String ANALYTIC_EVAL_NODE = "ANALYTIC_EVAL_NODE";
+        private final static String UNION_NODE = "UNION_NODE";
+        private final static String EXCHANGE_NODE = "EXCHANGE_NODE";
+
+        protected final List<ConsumptionCalculator> calculators;
+
+        public Consumption(RuntimeProfile profile) {
+            this.calculators = Lists.newArrayList();
+            init(profile);
+        }
+
+        private void init(RuntimeProfile profile) {
+            final List<Map<String, Counter>> olapScanCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, olapScanCounters, 
OLAP_SCAN_NODE);
+            calculators.add(new 
OlapScanNodeConsumptionCalculator(olapScanCounters));
+
+            final List<Map<String, Counter>> hashJoinCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, hashJoinCounters, 
HASH_JOIN_NODE);
+            calculators.add(new 
HashJoinConsumptionCalculator(hashJoinCounters));
+
+            final List<Map<String, Counter>> hashAggCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, hashAggCounters, 
HASH_AGGREGATION_NODE);
+            calculators.add(new HashAggConsumptionCalculator(hashAggCounters));
+
+            final List<Map<String, Counter>> sortCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, sortCounters, SORT_NODE);
+            calculators.add(new SortConsumptionCalculator(sortCounters));
+
+            final List<Map<String, Counter>> windowsCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, windowsCounters, 
ANALYTIC_EVAL_NODE);
+            calculators.add(new WindowsConsumptionCalculator(windowsCounters));
+
+            final List<Map<String, Counter>> unionCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, unionCounters, UNION_NODE);
+            calculators.add(new UnionConsumptionCalculator(unionCounters));
+
+            final List<Map<String, Counter>> exchangeCounters = 
Lists.newArrayList();
+            collectNodeProfileCounters(profile, exchangeCounters, 
EXCHANGE_NODE);
+            calculators.add(new 
ExchangeConsumptionCalculator(exchangeCounters));
+        }
+
+        private void collectNodeProfileCounters(RuntimeProfile profile,
+                                                List<Map<String, Counter>> 
counterMaps, String name) {
+            for (Map.Entry<String, RuntimeProfile> entry : 
profile.getChildMap().entrySet()) {
+                if (name.equals(parsePossibleExecNodeName(entry.getKey()))) {
+                    counterMaps.add(entry.getValue().getCounterMap());
+                }
+                collectNodeProfileCounters(entry.getValue(), counterMaps, 
name);
+            }
+        }
+
+        /**
+         * ExecNode's RuntimeProfile name is "$node_type_name (id=?)"
+         * @param str
+         * @return
+         */
+        private String parsePossibleExecNodeName(String str) {
+            final String[] elements = str.split(" ");
+            if (elements.length == 2) {
+                return elements[0];
+            } else {
+                return "";
+            }
+        }
+
+        public long getTotalCpuConsumption() {
+            long cpu = 0;
+            for (ConsumptionCalculator consumption : calculators) {
+                cpu += consumption.getCpu();
+            }
+            return cpu;
+        }
+
+        public long getTotalIoConsumption() {
+            long io = 0;
+            for (ConsumptionCalculator consumption : calculators) {
+                io += consumption.getIo();
+            }
+            return io;
+        }
+    }
+
+    public static class InstanceConsumption extends Consumption {
+        private final String fragmentId;
+        private final TUniqueId instanceId;
+        private final TNetworkAddress address;
+
+        public InstanceConsumption(
+                String fragmentId,
+                TUniqueId instanceId,
+                TNetworkAddress address,
+                RuntimeProfile profile) {
+            super(profile);
+            this.fragmentId = fragmentId;
+            this.instanceId = instanceId;
+            this.address = address;
+
+        }
+
+        public String getFragmentId() {
+            return fragmentId;
+        }
+
+        public TUniqueId getInstanceId() {
+            return instanceId;
+        }
+
+        public TNetworkAddress getAddress() {
+            return address;
+        }
+    }
+
+    private static abstract class ConsumptionCalculator {
+        protected final List<Map<String, Counter>> counterMaps;
+
+        public ConsumptionCalculator(List<Map<String, Counter>> counterMaps) {
+            this.counterMaps = counterMaps;
+        }
+
+        public long getCpu() {
+            long cpu = 0;
+            for (Map<String, Counter> counters : counterMaps) {
+                cpu += getCpuByRows(counters);
+            }
+            return cpu;
+        }
+
+        public long getIo() {
+            long io = 0;
+            for (Map<String, Counter> counters : counterMaps) {
+                io += getIoByByte(counters);
+            }
+            return io;
+        }
+
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            return 0;
+        }
+
+        protected long getIoByByte(Map<String, Counter> counters) {
+            return 0;
+        }
+    }
+
+    private static class OlapScanNodeConsumptionCalculator extends 
ConsumptionCalculator {
+        public OlapScanNodeConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getIoByByte(Map<String, Counter> counters) {
+            final Counter counter = counters.get("CompressedBytesRead");
+            return counter == null ? 0 : counter.getValue();
+        }
+    }
+
+    private static class HashJoinConsumptionCalculator extends 
ConsumptionCalculator {
+        public HashJoinConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter probeCounter = counters.get("ProbeRows");
+            final Counter buildCounter = counters.get("BuildRows");
+            return probeCounter == null || buildCounter == null ?
+                    0 : probeCounter.getValue() + buildCounter.getValue();
+        }
+    }
+
+    private static class HashAggConsumptionCalculator extends 
ConsumptionCalculator {
+        public HashAggConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter buildCounter = counters.get("BuildRows");
+            return buildCounter == null ? 0 : buildCounter.getValue();
+        }
+    }
+
+    private static class SortConsumptionCalculator extends 
ConsumptionCalculator {
+        public SortConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter sortRowsCounter = counters.get("SortRows");
+            return sortRowsCounter == null ? 0 : sortRowsCounter.getValue();
+        }
+    }
+
+    private static class WindowsConsumptionCalculator extends 
ConsumptionCalculator {
+        public WindowsConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter processRowsCounter = counters.get("ProcessRows");
+            return processRowsCounter == null ? 0 : 
processRowsCounter.getValue();
+
+        }
+    }
+
+    private static class UnionConsumptionCalculator extends 
ConsumptionCalculator {
+        public UnionConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter materializeRowsCounter = 
counters.get("MaterializeRows");
+            return materializeRowsCounter == null ? 0 : 
materializeRowsCounter.getValue();
+        }
+    }
+
+    private static class ExchangeConsumptionCalculator extends 
ConsumptionCalculator {
+
+        public ExchangeConsumptionCalculator(List<Map<String, Counter>> 
counterMaps) {
+            super(counterMaps);
+        }
+
+        @Override
+        protected long getCpuByRows(Map<String, Counter> counters) {
+            final Counter mergeRowsCounter = counters.get("MergeRows");
+            return mergeRowsCounter == null ? 0 : mergeRowsCounter.getValue();
+        }
+    }
+
+    private static class Request {
+        private final TNetworkAddress address;
+        private final List<PUniqueId> instanceIds;
+
+        public Request(TNetworkAddress address) {
+            this.address = address;
+            this.instanceIds = Lists.newArrayList();
+        }
+
+        public TNetworkAddress getAddress() {
+            return address;
+        }
+
+        public List<PUniqueId> getInstanceIds() {
+            return instanceIds;
+        }
+
+        public void addInstanceId(PUniqueId instanceId) {
+            this.instanceIds.add(instanceId);
+        }
+    }
+}
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
index 566a4831..a59c3dbe 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
@@ -17,26 +17,29 @@
 
 package org.apache.doris.common.proc;
 
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.qe.QueryStatisticsItem;
-
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QueryStatisticsItem;
 
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 /*
  * show proc "/current_queries"
  */
 public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
+    private static final Logger LOG = 
LogManager.getLogger(CurrentQueryStatisticsProcDir.class);
     public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add("ConnectionId").add("QueryId").add("Database").add("User").add("ExecTime").build();
+            .add("ConnectionId").add("QueryId").add("Database").add("User")
+            .add("IO").add("CPU").add("ExecTime").build();
 
-    private static final int EXEC_TIME_INDEX = 3;
+    private static final int EXEC_TIME_INDEX = 6;
 
     @Override
     public boolean register(String name, ProcNodeInterface node) {
@@ -62,12 +65,19 @@ public ProcResult fetchResult() throws AnalysisException {
         final Map<String, QueryStatisticsItem> statistic = 
QeProcessorImpl.INSTANCE.getQueryStatistics();
         result.setNames(TITLE_NAMES.asList());
         final List<List<String>> sortedRowData = Lists.newArrayList();
+
+        final CurrentQueryInfoProvider provider = new 
CurrentQueryInfoProvider();
+        final Map<String, CurrentQueryInfoProvider.Consumption> consumptions
+                = provider.getQueryConsumption(statistic.values());
         for (QueryStatisticsItem item : statistic.values()) {
             final List<String> values = Lists.newArrayList();
             values.add(item.getConnId());
             values.add(item.getQueryId());
             values.add(item.getDb());
             values.add(item.getUser());
+            final CurrentQueryInfoProvider.Consumption consumption = 
consumptions.get(item.getQueryId());
+            values.add(String.valueOf(consumption.getTotalIoConsumption()));
+            values.add(String.valueOf(consumption.getTotalCpuConsumption()));
             values.add(item.getQueryExecTime());
             sortedRowData.add(values);
         }
diff --git a/fe/src/main/java/org/apache/doris/common/util/Counter.java 
b/fe/src/main/java/org/apache/doris/common/util/Counter.java
index 41e0c77b..f1fa098d 100644
--- a/fe/src/main/java/org/apache/doris/common/util/Counter.java
+++ b/fe/src/main/java/org/apache/doris/common/util/Counter.java
@@ -19,29 +19,32 @@
 
 import org.apache.doris.thrift.TUnit;
 
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 // Counter means indicators field. The counter's name is key, the counter 
itself is value.  
 public class Counter {
-    private long value;
-    private TUnit type;
-    
+    private AtomicLong value;
+    private AtomicInteger type;
+
     public long getValue() {
-        return value;
+        return value.get();
     }
 
     public void setValue(long newValue) {
-        value = newValue;
+        value.set(newValue);
     }
 
     public TUnit getType() {
-        return type;
+        return TUnit.findByValue(type.get());
     }
 
     public void setType(TUnit type) {
-        this.type = type;
+        this.type.set(type.getValue());
     }
 
     public Counter(TUnit type, long value) {
-        this.type = type;
-        this.value = value;
+        this.value = new AtomicLong(value);
+        this.type = new AtomicInteger(type.getValue());
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 047ab76f..f5f359f9 100644
--- a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -39,18 +39,24 @@
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * It is accessed by two kinds of thread, one is to create this RuntimeProfile
+ * , named 'query thread', the other is to call 
+ * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}.
+ */
 public class RuntimeProfile {
     private static final Logger LOG = 
LogManager.getLogger(RuntimeProfile.class);
     private static String ROOT_COUNTER = "";
     private Counter counterTotalTime;
     private double localTimePercent;
-    
+
     private Map<String, String> infoStrings = Maps.newHashMap();
     private List<String> infoStringsDisplayOrder = Lists.newArrayList();
 
-    private Map<String, Counter> counterMap = Maps.newHashMap();
-    private Map<String, Set<String> > childCounterMap = Maps.newHashMap();
+    // It will be hold by other thread.
+    private Map<String, Counter> counterMap = Maps.newConcurrentMap();
 
+    private Map<String, Set<String> > childCounterMap = Maps.newHashMap();
     private Map<String, RuntimeProfile> childMap = Maps.newHashMap();
     private List<Pair<RuntimeProfile, Boolean>> childList = 
Lists.newArrayList();
 
@@ -58,7 +64,7 @@
     
     public RuntimeProfile(String name) {
         this();
-        setName(name);
+        this.name = name;
     }
 
     public RuntimeProfile() {
@@ -79,6 +85,10 @@ public Counter getCounterTotalTime() {
         return childList;
     }
 
+    public Map<String, RuntimeProfile> getChildMap () {
+        return childMap;
+    }
+
     public Counter addCounter(String name, TUnit type, String 
parentCounterName) {
         Counter counter = this.counterMap.get(name); 
         if (counter != null) {
@@ -176,7 +186,7 @@ private void update(List<TRuntimeProfileNode> nodes, 
Reference<Integer> idx) {
     //  2. Info Strings
     //  3. Counters
     //  4. Children
-    public void prettyPrint(StringBuilder builder, String prefix) {        
+    public void prettyPrint(StringBuilder builder, String prefix) {
         Counter counter = this.counterMap.get("TotalTime");
         Preconditions.checkState(counter != null);
         // 1. profile name
@@ -335,7 +345,7 @@ public void computeTimeInProfile(long total) {
     }
     
     // from bigger to smaller
-    public void sortChildren() {    
+    public void sortChildren() {
         Collections.sort(this.childList, new Comparator<Pair<RuntimeProfile, 
Boolean>>() {
             @Override
             public int compare(Pair<RuntimeProfile, Boolean> profile1, 
Pair<RuntimeProfile, Boolean> profile2)
@@ -356,11 +366,12 @@ public void addInfoString(String key, String value) {
             this.infoStrings.put(key, value);
         }
     }
-    
+
     public void setName(String name) {
         this.name = name;
     }
-    
+
+
     // Returns the value to which the specified key is mapped;
     // or null if this map contains no mapping for the key.
     public String getInfoString(String key) {
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 60337327..252a9984 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -477,6 +477,7 @@ public void exec() throws Exception {
                 }
                 profileFragmentId += 1;
             }
+            attachInstanceProfileToFragmentProfile();
         } finally {
             unlock();
         }
@@ -1415,22 +1416,6 @@ public void endProfile() {
             }
         }
 
-        for (int i = 0; i < backendExecStates.size(); ++i) {
-            if (backendExecStates.get(i) == null) {
-                continue;
-            }
-            BackendExecState backendExecState = backendExecStates.get(i);
-            backendExecState.profile().computeTimeInProfile();
-
-            int profileFragmentId = backendExecState.profileFragmentId();
-            if (profileFragmentId < 0 || profileFragmentId > 
fragmentProfile.size()) {
-                LOG.error("profileFragmentId " + profileFragmentId
-                        + " should be in [0," + fragmentProfile.size() + ")");
-                return;
-            }
-            
fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile());
-        }
-
         for (int i = 1; i < fragmentProfile.size(); ++i) {
             fragmentProfile.get(i).sortChildren();
         }
@@ -1718,4 +1703,22 @@ public PlanFragment fragment() {
         }
         return result;
     }
+
+    private void attachInstanceProfileToFragmentProfile() {
+        for (int i = 0; i < backendExecStates.size(); ++i) {
+            if (backendExecStates.get(i) == null) {
+                continue;
+            }
+            BackendExecState backendExecState = backendExecStates.get(i);
+            backendExecState.profile().computeTimeInProfile();
+
+            int profileFragmentId = backendExecState.profileFragmentId();
+            if (profileFragmentId < 0 || profileFragmentId > 
fragmentProfile.size()) {
+                LOG.error("profileFragmentId " + profileFragmentId
+                        + " should be in [0," + fragmentProfile.size() + ")");
+                return;
+            }
+            
fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile());
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index cf8c3615..8d68667d 100644
--- a/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -84,8 +84,9 @@ public void unregisterQuery(TUniqueId queryId) {
                     .sql(info.getSql())
                     .user(context.getQualifiedUser())
                     .connId(String.valueOf(context.getConnectionId()))
-                    
.db(context.getDatabase()).fragmentInstanceInfos(info.getCoord()
-                            .getFragmentInstanceInfos()).build();
+                    .db(context.getDatabase())
+                    
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
+                    .profile(info.getCoord().getQueryProfile()).build();
             querySet.put(queryIdStr, item);
         }
         return querySet;
@@ -95,7 +96,6 @@ public void unregisterQuery(TUniqueId queryId) {
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams 
params) {
         LOG.info("ReportExecStatus(): fragment_instance_id=" + 
DebugUtil.printId(params.fragment_instance_id)
                 + ", query id=" + DebugUtil.printId(params.query_id) + " 
params=" + params);
-
         final TReportExecStatusResult result = new TReportExecStatusResult();
         final QueryInfo info = coordinatorMap.get(params.query_id);
         if (info == null) {
@@ -114,7 +114,6 @@ public TReportExecStatusResult 
reportExecStatus(TReportExecStatusParams params)
     }
 
     public static final class QueryInfo {
-
         private final ConnectContext connectContext;
         private final Coordinator coord;
         private final String sql;
diff --git a/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java 
b/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index f2c72d78..247ce0a0 100644
--- a/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 import com.google.common.collect.Lists;
@@ -31,16 +32,19 @@
     private final String db;
     private final String connId;
     private final long queryStartTime;
-    private List<FragmentInstanceInfo> fragmentInstanceInfos;
+    private final List<FragmentInstanceInfo> fragmentInstanceInfos;
+    // root query profile
+    private final RuntimeProfile queryProfile;
 
     private QueryStatisticsItem(Builder builder) {
         this.queryId = builder.queryId;
         this.user = builder.user;
         this.sql = builder.sql;
         this.db = builder.db;
+        this.connId = builder.connId;
         this.queryStartTime = builder.queryStartTime;
         this.fragmentInstanceInfos = builder.fragmentInstanceInfos;
-        this.connId = builder.connId;
+        this.queryProfile = builder.queryProfile;
     }
 
     public String getDb() {
@@ -72,6 +76,10 @@ public String getQueryId() {
         return fragmentInstanceInfos;
     }
 
+    public RuntimeProfile getQueryProfile() {
+        return queryProfile;
+    }
+
     public static final class Builder {
         private String queryId;
         private String db;
@@ -80,6 +88,7 @@ public String getQueryId() {
         private String connId;
         private long queryStartTime;
         private List<FragmentInstanceInfo> fragmentInstanceInfos;
+        private RuntimeProfile queryProfile;
 
         public Builder() {
             fragmentInstanceInfos = Lists.newArrayList();
@@ -120,6 +129,11 @@ public Builder 
fragmentInstanceInfos(List<FragmentInstanceInfo> infos) {
             return this;
         }
 
+        public Builder profile(RuntimeProfile profile) {
+            this.queryProfile = profile;
+            return this;
+        }
+
         public QueryStatisticsItem build() {
             initDefaultValue(this);
             return new QueryStatisticsItem(this);
@@ -145,6 +159,10 @@ private void initDefaultValue(Builder builder) {
             if (connId == null) {
                 builder.connId = "";
             }
+
+            if (queryProfile == null) {
+                queryProfile = new RuntimeProfile("");
+            }
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5f9af92a..53b6c023 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -93,7 +93,6 @@
     private String originStmt;
     private StatementBase parsedStmt;
     private Analyzer analyzer;
-    private boolean isRegisterQuery = false;
     private RuntimeProfile profile;
     private RuntimeProfile summaryProfile;
     private volatile Coordinator coord = null;
@@ -255,6 +254,8 @@ public void execute() throws Exception {
                         LOG.warn("errors when abort txn", abortTxnException);
                     }
                     throw t;
+                } finally {
+                    
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
                 }
             } else if (parsedStmt instanceof DdlStmt) {
                 handleDdlStmt();
@@ -287,10 +288,6 @@ public void execute() throws Exception {
                 // ignore kill stmt execute err(not monitor it)
                 context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
             }
-        } finally {
-            if (isRegisterQuery) {
-                QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
-            }
         }
     }
 
@@ -533,7 +530,6 @@ private void handleQueryStmt() throws Exception {
 
         QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), 
                        new QeProcessorImpl.QueryInfo(context, originStmt, 
coord));
-        isRegisterQuery = true;
 
         coord.exec();
 
@@ -587,7 +583,6 @@ private void handleInsertStmt() throws Exception {
         coord = new Coordinator(context, analyzer, planner);
 
         QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
-        isRegisterQuery = true;
 
         coord.exec();
 
diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 7b14afc8..76da014b 100644
--- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -142,12 +142,11 @@ private synchronized PBackendService 
getProxy(TNetworkAddress address) {
         }
     }
 
-
-    public Future<PFetchFragmentExecInfosResult> fetchFragmentExecInfosAsync(
-            TNetworkAddress address, PFetchFragmentExecInfoRequest request) 
throws RpcException {
+    public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
+            TNetworkAddress address, PTriggerProfileReportRequest request) 
throws RpcException {
         try {
             final PBackendService service = getProxy(address);
-            return service.fetchFragmentExecInfosAsync(request);
+            return service.triggerProfileReport(request);
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
diff --git a/fe/src/main/java/org/apache/doris/rpc/PBackendService.java 
b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java
index 5fe54901..5050debd 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PBackendService.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PBackendService.java
@@ -35,7 +35,7 @@
             attachmentHandler = ThriftClientAttachmentHandler.class, 
onceTalkTimeout = 86400000)
     Future<PFetchDataResult> fetchDataAsync(PFetchDataRequest request);
 
-    @ProtobufRPC(serviceName = "PBackendService", methodName = 
"fetch_fragment_exec_infos",
+    @ProtobufRPC(serviceName = "PBackendService", methodName = 
"trigger_profile_report",
             attachmentHandler = ThriftClientAttachmentHandler.class, 
onceTalkTimeout = 10000)
-    Future<PFetchFragmentExecInfosResult> 
fetchFragmentExecInfosAsync(PFetchFragmentExecInfoRequest request);
+    Future<PTriggerProfileReportResult> 
triggerProfileReport(PTriggerProfileReportRequest request);
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java 
b/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java
deleted file mode 100644
index 8f7bab3e..00000000
--- a/fe/src/main/java/org/apache/doris/rpc/PFragmentExecInfo.java
+++ /dev/null
@@ -1,33 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.rpc;
-
-import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
-
-@ProtobufClass
-public class PFragmentExecInfo {
-    @Protobuf(order = 1, required = true)
-    public PUniqueId instanceId;
-    @Protobuf(order = 2, required = true)
-    public int execStatus;
-    @Protobuf(order = 3, required = false)
-    public int planNodeType;
-    @Protobuf(order = 4, required = false)
-    public long rowsCount;
-}
diff --git 
a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java 
b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
similarity index 76%
rename from 
fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java
rename to 
fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
index 78ad30a2..3c948a63 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfoRequest.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportRequest.java
@@ -18,20 +18,23 @@
 package org.apache.doris.rpc;
 
 import com.baidu.bjf.remoting.protobuf.FieldType;
-import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
 import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
+import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 
 @ProtobufClass
-public class PFetchFragmentExecInfoRequest extends AttachmentRequest {
+public class PTriggerProfileReportRequest extends AttachmentRequest {
 
-    public PFetchFragmentExecInfoRequest() {
+    @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false)
+    List<PUniqueId> instanceIds;
+
+    public PTriggerProfileReportRequest() {
     }
 
-    public PFetchFragmentExecInfoRequest(List<PUniqueId> finstIds) {
-        this.finstIds = finstIds;
+    public PTriggerProfileReportRequest(List<PUniqueId> instanceIds) {
+        this.instanceIds = Lists.newArrayList();
+        this.instanceIds.addAll(instanceIds);
     }
-    @Protobuf(fieldType = FieldType.OBJECT, order = 1, required = false)
-    public List<PUniqueId> finstIds;
 }
diff --git 
a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java 
b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java
similarity index 81%
rename from 
fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
rename to fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java
index 1d0b757e..80e8c6e1 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PFetchFragmentExecInfosResult.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PTriggerProfileReportResult.java
@@ -17,16 +17,14 @@
 
 package org.apache.doris.rpc;
 
-import com.baidu.bjf.remoting.protobuf.FieldType;
 import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
 import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
 
-import java.util.List;
-
 @ProtobufClass
-public class PFetchFragmentExecInfosResult {
+public class PTriggerProfileReportResult {
     @Protobuf(order = 1, required = true)
     public PStatus status;
-    @Protobuf(fieldType = FieldType.OBJECT, order = 2, required = false)
-    public List<PFragmentExecInfo> execInfos;
+
+    public PTriggerProfileReportResult() {
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java 
b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
index c93db47c..1eea967e 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PUniqueId.java
@@ -19,10 +19,12 @@
 
 import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
 import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TUniqueId;
 
 @ProtobufClass
 public class PUniqueId {
+
     public PUniqueId() {}
     public PUniqueId(TUniqueId tid) {
         hi = tid.getHi();
@@ -33,4 +35,34 @@ public PUniqueId(TUniqueId tid) {
     public long hi;
     @Protobuf(order = 2, required = true)
     public long lo;
+
+    @Override
+    public int hashCode() {
+        int result = 16;
+        result = 31 * result + (int)(hi ^ (hi >>> 32));
+        result = 31 * result + (int)(lo ^ (lo >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return DebugUtil.printId(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (!(obj instanceof PUniqueId) || obj == null) {
+            return false;
+        }
+
+        final PUniqueId other = (PUniqueId)obj;
+        if (hi != other.hi || lo != other.lo) {
+            return false;
+        }
+        return true;
+    }
 }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index aaeb4bd7..71782934 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -131,26 +131,12 @@ message PFetchDataResult {
     optional bool eos = 3;
 };
 
-message PFetchFragmentExecInfoRequest {
-    repeated PUniqueId finst_id = 1;
-};
-
-enum PFragmentExecStatus {
-    RUNNING = 0;
-    WAIT = 1;
-    FINISHED = 2;
-}
-
-message PFragmentExecInfo {
-    required PUniqueId finst_id = 1;
-    required int32 exec_status = 2;
-    optional int32 plan_node_type = 3;  // same as TPlanNodeType
-    optional int64 rows_count = 4;
+message PTriggerProfileReportRequest {
+    repeated PUniqueId instance_ids = 1;
 }
 
-message PFetchFragmentExecInfosResult {
+message PTriggerProfileReportResult {
     required PStatus status = 1;
-    repeated PFragmentExecInfo fragment_exec_info = 2;
 }
 
 service PBackendService {
@@ -161,7 +147,7 @@ service PBackendService {
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
     rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns 
(PTabletWriterAddBatchResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns 
(PTabletWriterCancelResult);
-    rpc fetch_fragment_exec_infos(PFetchFragmentExecInfoRequest) returns 
(PFetchFragmentExecInfosResult);
+    rpc trigger_profile_report(PTriggerProfileReportRequest) returns 
(PTriggerProfileReportResult);
     // NOTE(zc): If you want to add new method here,
     // you MUST add same method to palo_internal_service.proto
 };
diff --git a/gensrc/proto/palo_internal_service.proto 
b/gensrc/proto/palo_internal_service.proto
index 5a91e82b..da9faf69 100644
--- a/gensrc/proto/palo_internal_service.proto
+++ b/gensrc/proto/palo_internal_service.proto
@@ -33,5 +33,5 @@ service PInternalService {
     rpc tablet_writer_open(doris.PTabletWriterOpenRequest) returns 
(doris.PTabletWriterOpenResult);
     rpc tablet_writer_add_batch(doris.PTabletWriterAddBatchRequest) returns 
(doris.PTabletWriterAddBatchResult);
     rpc tablet_writer_cancel(doris.PTabletWriterCancelRequest) returns 
(doris.PTabletWriterCancelResult);
-    rpc fetch_fragment_exec_infos(doris.PFetchFragmentExecInfoRequest) returns 
(doris.PFetchFragmentExecInfosResult);
+    rpc trigger_profile_report(doris.PTriggerProfileReportRequest) returns 
(doris.PTriggerProfileReportResult);
 };


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to