IMPALA-3342: Add thread counters to monitor plan fragment execution
This change removes the use of total_cpu_timer which incorrectly
monitors the CPU time. Adding THREAD_COUNTERS to measure the user
and sys time in plan fragment execution. This also accounts for the
time spent in the hdfs/kudu scanner and in a blocking join.
Snippet of a query plan with the newly added PlanFragment
THREAD_COUNTERS:
Instance 2b40b101e2626e7a:a3d8f2300000000
- PeakMemoryUsage: 32.02 KB (32784)
- PerHostPeakMemUsage: 430.52 MB (451431312)
- RowsProduced: 1 (1)
- TotalNetworkReceiveTime: 10s379ms
- TotalNetworkSendTime: 0.000ns
- TotalStorageWaitTime: 0.000ns
- TotalWallClockTime: 10s577ms
- SysTime: 8.000ms
- UserTime: 8.000ms
- VoluntaryContextSwitches: 80 (80)
Change-Id: Ifa88aa6f3371fa42d11ecc122f43c7d83623c300
Reviewed-on: http://gerrit.cloudera.org:8080/4633
Reviewed-by: Bharath Vissapragada <[email protected]>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bb1c6338
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bb1c6338
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bb1c6338
Branch: refs/heads/master
Commit: bb1c63380b8ec14fb6058f3157cad8746463e054
Parents: 8f2bb2f
Author: aphadke <[email protected]>
Authored: Thu Oct 6 22:10:51 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Wed Nov 23 00:56:48 2016 +0000
----------------------------------------------------------------------
be/src/exec/blocking-join-node.cc | 2 +-
be/src/exec/hdfs-scan-node.cc | 2 +-
be/src/exec/kudu-scan-node.cc | 3 +--
be/src/runtime/plan-fragment-executor.cc | 16 +++-------------
be/src/runtime/plan-fragment-executor.h | 3 ---
be/src/runtime/runtime-state.cc | 2 +-
be/src/runtime/runtime-state.h | 14 ++++++++++----
7 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc
b/be/src/exec/blocking-join-node.cc
index b114451..0e23727 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -145,9 +145,9 @@ void BlockingJoinNode::Close(RuntimeState* state) {
void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink*
build_sink,
Promise<Status>* status) {
+ SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
Status s;
{
- SCOPED_TIMER(state->total_cpu_timer());
if (build_sink == NULL){
s = ProcessBuildInput(state);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 3798107..eebf075 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -356,7 +356,7 @@ void
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
void HdfsScanNode::ScannerThread() {
SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
- SCOPED_TIMER(runtime_state_->total_cpu_timer());
+ SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
// Make thread-local copy of filter contexts to prune scan ranges, and to
pass to the
// scanner for finer-grained filtering.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index f98077e..40689ee 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -273,8 +273,7 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner,
const string& scan_t
void KuduScanNode::RunScannerThread(const string& name, const string*
initial_token) {
DCHECK(initial_token != NULL);
SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
- SCOPED_TIMER(runtime_state_->total_cpu_timer());
-
+ SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
// Set to true if this thread observes that the number of optional threads
has been
// exceeded and is exiting early.
bool optional_thread_exiting = false;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc
b/be/src/runtime/plan-fragment-executor.cc
index 9db4d38..f673f34 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -39,6 +39,7 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-filter-bank.h"
#include "util/container-util.h"
+#include "runtime/runtime-state.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/mem-info.h"
@@ -107,7 +108,6 @@ Status PlanFragmentExecutor::PrepareInternal(const
TExecPlanFragmentParams& requ
is_prepared_ = true;
// TODO: Break this method up.
- fragment_sw_.Start();
const TPlanFragmentInstanceCtx& fragment_instance_ctx =
request.fragment_instance_ctx;
query_id_ = request.query_ctx.query_id;
@@ -302,6 +302,7 @@ Status PlanFragmentExecutor::Open() {
}
Status PlanFragmentExecutor::OpenInternal() {
+ SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
RETURN_IF_ERROR(
runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
@@ -357,6 +358,7 @@ Status PlanFragmentExecutor::Exec() {
Status PlanFragmentExecutor::ExecInternal() {
RuntimeProfile::Counter* plan_exec_timer =
ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
+ SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
bool exec_tree_complete = false;
do {
Status status;
@@ -467,18 +469,6 @@ void PlanFragmentExecutor::FragmentComplete() {
// Check the atomic flag. If it is set, then a fragment complete report has
already
// been sent.
bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
-
- fragment_sw_.Stop();
- int64_t cpu_and_wait_time = fragment_sw_.ElapsedTime();
- fragment_sw_ = MonotonicStopWatch();
- int64_t cpu_time = cpu_and_wait_time
- - runtime_state_->total_storage_wait_timer()->value()
- - runtime_state_->total_network_send_timer()->value()
- - runtime_state_->total_network_receive_timer()->value();
- // Timing is not perfect.
- if (cpu_time < 0) cpu_time = 0;
- runtime_state_->total_cpu_timer()->Add(cpu_time);
-
ReleaseThreadToken();
StopReportThread();
if (send_report) SendReport(true);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h
b/be/src/runtime/plan-fragment-executor.h
index 7149272..708e979 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -242,9 +242,6 @@ class PlanFragmentExecutor {
/// of the execution.
RuntimeProfile::Counter* average_thread_tokens_;
- /// Stopwatch for this entire fragment. Started in Prepare(), stopped in
Close().
- MonotonicStopWatch fragment_sw_;
-
/// (Atomic) Flag that indicates whether a completed fragment report has
been or will
/// be fired. It is initialized to 0 and atomically swapped to 1 when a
completed
/// fragment report is about to be fired. Used for reducing the probability
that a
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index c2adcb7..40a4946 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -134,7 +134,7 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
DCHECK(resource_pool_ != NULL);
}
- total_cpu_timer_ = ADD_TIMER(runtime_profile(), "TotalCpuTime");
+ total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "");
total_storage_wait_timer_ = ADD_TIMER(runtime_profile(),
"TotalStorageWaitTime");
total_network_send_timer_ = ADD_TIMER(runtime_profile(),
"TotalNetworkSendTime");
total_network_receive_timer_ = ADD_TIMER(runtime_profile(),
"TotalNetworkReceiveTime");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 0ada7e4..f86374e 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -251,17 +251,22 @@ class RuntimeState {
bool is_cancelled() const { return is_cancelled_; }
void set_is_cancelled(bool v) { is_cancelled_ = v; }
- RuntimeProfile::Counter* total_cpu_timer() { return total_cpu_timer_; }
RuntimeProfile::Counter* total_storage_wait_timer() {
return total_storage_wait_timer_;
}
+
RuntimeProfile::Counter* total_network_send_timer() {
return total_network_send_timer_;
}
+
RuntimeProfile::Counter* total_network_receive_timer() {
return total_network_receive_timer_;
}
+ RuntimeProfile::ThreadCounters* total_thread_statistics() const {
+ return total_thread_statistics_;
+ }
+
/// Sets query_status_ with err_msg if no error has been set yet.
void SetQueryStatus(const std::string& err_msg) {
boost::lock_guard<SpinLock> l(query_status_lock_);
@@ -351,9 +356,6 @@ class RuntimeState {
RuntimeProfile profile_;
- /// Total CPU time (across all threads), including all wait times.
- RuntimeProfile::Counter* total_cpu_timer_;
-
/// Total time waiting in storage (across all threads)
RuntimeProfile::Counter* total_storage_wait_timer_;
@@ -363,6 +365,9 @@ class RuntimeState {
/// Total time spent receiving over the network (across all threads)
RuntimeProfile::Counter* total_network_receive_timer_;
+ /// Total CPU utilization for all threads in this plan fragment.
+ RuntimeProfile::ThreadCounters* total_thread_statistics_;
+
/// MemTracker that is shared by all fragment instances running on this host.
/// The query mem tracker must be released after the instance_mem_tracker_.
std::shared_ptr<MemTracker> query_mem_tracker_;
@@ -405,6 +410,7 @@ class RuntimeState {
/// prohibit copies
RuntimeState(const RuntimeState&);
+
};
#define RETURN_IF_CANCELLED(state) \