This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new aa164e3cb IMPALA-12176: Improve client fetch metrics
aa164e3cb is described below

commit aa164e3cbc490b69403959f979e55b76c3d0e0ca
Author: Kurt Deschler <[email protected]>
AuthorDate: Thu Jun 1 08:12:03 2023 -0500

    IMPALA-12176: Improve client fetch metrics
    
    This patch makes multiple improvements to query profile and RPC metrics
    to improve observability and allow more detailed analysis of where time
    is being spent by client RPCs.
    
    - A new CreateResultSetTime metric has been added to PLAN_ROOT_SINK node
      in the query profile. This timer isolates the cost to convert fetched
      rows to the client protocol.
    - Read/Write time is now tracked during client RPC execution and added to
      the rpcz JSON output. A checkbox in the /rpcz Web UI page enables
      display of the Read/Write stats.
    - Read and Write time are defined from Thrift callbacks defined in
      apache::thrift::TProcessorEventHandler. Read time includes reading and
      deserializing Thrift RPC args from the transport. Write time includes
      serializing, writing, and flushing Thrift RPC args to the transport.
    - Client RPC cost is tracked on a per-query basis and displayed in the
      server profile as RPCCount, RPCReadTimer, and RPCWriteTimer
    - Accuracy of RPC histograms is changed from milliseconds to microseconds
    
    Testing:
    tests added to test_fetch.py and test_web_pages.py
    
    Change-Id: I986f3f2afac1775274895393969b270cf956b262
    Reviewed-on: http://gerrit.cloudera.org:8080/19966
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/blocking-plan-root-sink.cc |   7 +-
 be/src/exec/buffered-plan-root-sink.cc |   9 ++-
 be/src/exec/plan-root-sink.cc          |   1 +
 be/src/exec/plan-root-sink.h           |   3 +
 be/src/rpc/rpc-trace.cc                | 120 ++++++++++++++++++++++++++++++---
 be/src/rpc/rpc-trace.h                 | 111 +++++++++++++++++++++---------
 be/src/runtime/query-driver.cc         |   4 ++
 be/src/service/client-request-state.cc |  56 +++++++++++++++
 be/src/service/client-request-state.h  |  21 ++++++
 be/src/service/impala-server.cc        |  10 +++
 be/src/service/impala-server.h         |   1 +
 be/src/util/histogram-metric.cc        |   1 +
 be/src/util/metrics-test.cc            |   3 +-
 tests/query_test/test_fetch.py         |  19 +++++-
 tests/webserver/test_web_pages.py      |  20 ++++++
 www/rpcz.tmpl                          |  45 +++++++++++++
 16 files changed, 386 insertions(+), 45 deletions(-)

diff --git a/be/src/exec/blocking-plan-root-sink.cc 
b/be/src/exec/blocking-plan-root-sink.cc
index 84831a654..adedf0d06 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -67,8 +67,11 @@ Status BlockingPlanRootSink::Send(RuntimeState* state, 
RowBatch* batch) {
     if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, 
num_rows_requested_);
     // Debug action before AddBatch is called.
     RETURN_IF_ERROR(DebugAction(state->query_options(), 
"BPRS_BEFORE_ADD_ROWS"));
-    RETURN_IF_ERROR(results_->AddRows(
-        output_expr_evals_, batch, current_batch_row, num_to_fetch));
+    {
+      SCOPED_TIMER(create_result_set_timer_);
+      RETURN_IF_ERROR(results_->AddRows(
+          output_expr_evals_, batch, current_batch_row, num_to_fetch));
+    }
     current_batch_row += num_to_fetch;
     // Prevent expr result allocations from accumulating.
     expr_results_pool_->Clear();
diff --git a/be/src/exec/buffered-plan-root-sink.cc 
b/be/src/exec/buffered-plan-root-sink.cc
index f2f4d52a2..86f48d399 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -233,9 +233,12 @@ Status BufferedPlanRootSink::GetNext(RuntimeState* state, 
QueryResultSet* result
             num_rows_to_read - num_rows_read);
         DCHECK_GE(num_rows_to_fetch, 0);
 
-        // Read rows from 'current_batch_' and add them to 'results'.
-        RETURN_IF_ERROR(results->AddRows(output_expr_evals_, 
current_batch_.get(),
-            current_batch_row_, num_rows_to_fetch));
+        {
+          SCOPED_TIMER(create_result_set_timer_);
+          // Read rows from 'current_batch_' and add them to 'results'.
+          RETURN_IF_ERROR(results->AddRows(output_expr_evals_, 
current_batch_.get(),
+              current_batch_row_, num_rows_to_fetch));
+        }
         num_rows_read += num_rows_to_fetch;
         current_batch_row_ += num_rows_to_fetch;
 
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index e6ace8967..2b13294c6 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -61,6 +61,7 @@ Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker
   rows_sent_rate_ = profile_->AddDerivedCounter("RowsSentRate", 
TUnit::UNIT_PER_SECOND,
       bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_sent_counter_,
                                                     
profile_->total_time_counter()));
+  create_result_set_timer_ = ADD_TIMER(profile(), "CreateResultSetTime");
   return Status::OK();
 }
 
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index f60e7a967..ede2c8456 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -121,6 +121,9 @@ class PlanRootSink : public DataSink {
   /// Prepare().
   RuntimeProfile::Counter* rows_sent_rate_ = nullptr;
 
+  /// Measures the amount of time spend converting results to the client 
protocol
+  RuntimeProfile::Counter* create_result_set_timer_ = nullptr;
+
  private:
   /// Limit on the number of rows produced by this query, initialized by the 
constructor.
   const int64_t num_rows_produced_limit_;
diff --git a/be/src/rpc/rpc-trace.cc b/be/src/rpc/rpc-trace.cc
index 87bfad5e5..0b487c361 100644
--- a/be/src/rpc/rpc-trace.cc
+++ b/be/src/rpc/rpc-trace.cc
@@ -38,6 +38,12 @@ using namespace strings;
 // Metric key format for rpc call duration metrics.
 const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = 
"rpc-method.$0.call_duration";
 
+// Metric key format for rpc read metrics.
+const string RPC_READ_TIME_DISTRIBUTION_METRIC_KEY = 
"rpc-method.$0.read_duration";
+
+// Metric key format for rpc write metrics.
+const string RPC_WRITE_TIME_DISTRIBUTION_METRIC_KEY = 
"rpc-method.$0.write_duration";
+
 // Singleton class to keep track of all RpcEventHandlers, and to render them 
to a
 // web-based summary page.
 class RpcEventHandlerManager {
@@ -136,6 +142,8 @@ void RpcEventHandler::Reset(const string& method_name) {
   MethodMap::iterator it = method_map_.find(method_name);
   if (it == method_map_.end()) return;
   it->second->processing_time_distribution->Reset();
+  it->second->read_time_distribution->Reset();
+  it->second->write_time_distribution->Reset();
   it->second->num_in_flight.Store(0L);
 }
 
@@ -143,6 +151,8 @@ void RpcEventHandler::ResetAll() {
   lock_guard<mutex> l(method_map_lock_);
   for (const MethodMap::value_type& method: method_map_) {
     method.second->processing_time_distribution->Reset();
+    method.second->read_time_distribution->Reset();
+    method.second->write_time_distribution->Reset();
     method.second->num_in_flight.Store(0L);
   }
 }
@@ -165,6 +175,14 @@ void RpcEventHandler::ToJson(Value* server, Document* 
document) {
         rpc.second->processing_time_distribution->ToHumanReadable();
     Value summary(human_readable.c_str(), document->GetAllocator());
     method.AddMember("summary", summary, document->GetAllocator());
+    const string& human_readable_read =
+        rpc.second->read_time_distribution->ToHumanReadable();
+    Value read_stats(human_readable_read.c_str(), document->GetAllocator());
+    method.AddMember("read", read_stats, document->GetAllocator());
+    const string& human_readable_write =
+        rpc.second->write_time_distribution->ToHumanReadable();
+    Value write_stats(human_readable_write.c_str(), document->GetAllocator());
+    method.AddMember("write", write_stats, document->GetAllocator());
     method.AddMember("in_flight", rpc.second->num_in_flight.Load(),
         document->GetAllocator());
     Value server_name(server_name_.c_str(), document->GetAllocator());
@@ -187,33 +205,119 @@ void* RpcEventHandler::getContext(const char* fn_name, 
void* server_context) {
       const string& rpc_name = 
Substitute(RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY,
           Substitute("$0.$1", server_name_, descriptor->name));
       const TMetricDef& def =
-          MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_MS);
-      constexpr int32_t SIXTY_MINUTES_IN_MS = 60 * 1000 * 60;
+          MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_US);
+      constexpr int64_t SIXTY_MINUTES_IN_US = 60LL * 1000000LL * 60LL;
       // Store processing times of up to 60 minutes with 3 sig. fig.
       descriptor->processing_time_distribution =
-          metrics_->RegisterMetric(new HistogramMetric(def, 
SIXTY_MINUTES_IN_MS, 3));
+          metrics_->RegisterMetric(new HistogramMetric(def, 
SIXTY_MINUTES_IN_US, 3));
+
+      const string& read_rpc_name = 
Substitute(RPC_READ_TIME_DISTRIBUTION_METRIC_KEY,
+          Substitute("$0.$1", server_name_, descriptor->name));
+      const TMetricDef& read_def =
+          MakeTMetricDef(read_rpc_name, TMetricKind::HISTOGRAM, 
TUnit::TIME_US);
+      descriptor->read_time_distribution =
+          metrics_->RegisterMetric(new HistogramMetric(read_def, 
SIXTY_MINUTES_IN_US, 3));
+
+      const string& write_rpc_name = 
Substitute(RPC_WRITE_TIME_DISTRIBUTION_METRIC_KEY,
+          Substitute("$0.$1", server_name_, descriptor->name));
+      const TMetricDef& write_def =
+          MakeTMetricDef(write_rpc_name, TMetricKind::HISTOGRAM, 
TUnit::TIME_US);
+      descriptor->write_time_distribution = metrics_->RegisterMetric(
+          new HistogramMetric(write_def, SIXTY_MINUTES_IN_US, 3));
+
       it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
     }
   }
   it->second->num_in_flight.Add(1);
   // TODO: Consider pooling these
   InvocationContext* ctxt_ptr =
-      new InvocationContext(MonotonicMillis(), cnxn_ctx, it->second);
+      new InvocationContext(GetMonoTimeMicros(), cnxn_ctx, it->second);
+  SetThreadRPCContext(ctxt_ptr);
   VLOG_RPC << "RPC call: " << string(fn_name) << "(from "
            << TNetworkAddressToString(ctxt_ptr->cnxn_ctx->network_address) << 
")";
   return reinterpret_cast<void*>(ctxt_ptr);
 }
 
+void RpcEventHandler::freeContext(void* ctx, const char* /* fn_name */) {
+  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
+  DCHECK(GetThreadRPCContext() == rpc_ctx);
+  SetThreadRPCContext(nullptr);
+  rpc_ctx->UnRegister();
+}
+
+// postWrite callback occurs after RPC write completes
 void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t 
bytes) {
   InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
-  int64_t elapsed_time = MonotonicMillis() - rpc_ctx->start_time_ms;
+  rpc_ctx->write_end_us = GetMonoTimeMicros();
+  const int64_t elapsed_time = rpc_ctx->write_end_us - rpc_ctx->start_time_us;
+  const int64_t write_time = rpc_ctx->write_end_us - rpc_ctx->write_start_us;
   const string& call_name = string(fn_name);
-  // TODO: bytes is always 0, how come?
+  // TODO: bytes is always 0 since TTransport does not track write count.
   VLOG_RPC << "RPC call: " << server_name_ << ":" << call_name << " from "
            << TNetworkAddressToString(rpc_ctx->cnxn_ctx->network_address) << " 
took "
-           << PrettyPrinter::Print(elapsed_time * 1000L * 1000L, 
TUnit::TIME_NS);
+           << PrettyPrinter::Print(elapsed_time * 1000L, TUnit::TIME_NS);
   MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
-  delete rpc_ctx;
   descriptor->num_in_flight.Add(-1);
   descriptor->processing_time_distribution->Update(elapsed_time);
+  descriptor->write_time_distribution->Update(write_time);
+}
+
+// preRead callback occurs before RPC read starts
+void RpcEventHandler::preRead(void* ctx, const char* fn_name) {
+  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
+  rpc_ctx->read_start_us = GetMonoTimeMicros();
+}
+
+// postRead callback occurs after RPC read completes
+void RpcEventHandler::postRead(void* ctx, const char* fn_name, uint32_t bytes) 
{
+  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
+  rpc_ctx->read_end_us = GetMonoTimeMicros();
+  int64_t elapsed_time = rpc_ctx->read_end_us - rpc_ctx->start_time_us;
+  rpc_ctx->method_descriptor->read_time_distribution->Update(elapsed_time);
+}
+
+// preWrite callback occurs before RPC read starts
+void RpcEventHandler::preWrite(void* ctx, const char* fn_name) {
+  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
+  rpc_ctx->write_start_us = GetMonoTimeMicros();
+}
+
+__thread RpcEventHandler::InvocationContext* __rpc_context__;
+
+void RpcEventHandler::SetThreadRPCContext(RpcEventHandler::InvocationContext* 
ctxt_ptr) {
+  __rpc_context__ = ctxt_ptr;
+}
+
+RpcEventHandler::InvocationContext* RpcEventHandler::GetThreadRPCContext() {
+  return __rpc_context__;
+}
+
+void RpcEventHandler::InvocationContext::Register() {
+  DCHECK(refcnt_.Load() >= 1); // Should be registered with RpcEventHandler
+  refcnt_.Add(1);
+}
+
+void RpcEventHandler::InvocationContext::UnRegister() {
+  int32_t newCount = refcnt_.Add(-1);
+  DCHECK (newCount >= 0);
+  if (newCount == 0) {
+    delete this;
+  }
+}
+
+bool RpcEventHandler::InvocationContext::UnRegisterCompleted(
+    uint64_t& read_ns, uint64_t& write_ns) {
+  int32_t newCount = refcnt_.Load();
+  DCHECK (newCount >= 1);
+  if (newCount == 1) {
+    if (read_end_us > read_start_us) {
+      read_ns += ((read_end_us - read_start_us) * 1000L);
+    }
+    if (write_end_us > write_start_us) {
+      write_ns += ((write_end_us - write_start_us) * 1000L);
+    }
+    UnRegister();
+    return true;
+  }
+  return false;
 }
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index c54de76b7..f896d8370 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -36,6 +36,39 @@ class Webserver;
 /// most one RpcEventHandler per ThriftServer. When an Rpc is started, 
getContext() creates
 /// an InvocationContext recording the current time and other metadata for 
that invocation.
 class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
+ private:
+  /// Per-method descriptor
+  struct MethodDescriptor {
+    /// Name of the method
+    std::string name;
+
+    /// Distribution of the time taken to process this RPC.
+    HistogramMetric* processing_time_distribution;
+
+    /// Distribution of the time taken for RPC read.
+    HistogramMetric* read_time_distribution;
+
+    /// Distribution of the time taken for RPC write.
+    HistogramMetric* write_time_distribution;
+
+    /// Number of invocations in flight
+    AtomicInt32 num_in_flight;
+  };
+
+  /// Map from method name to descriptor
+  typedef boost::unordered_map<std::string, MethodDescriptor*> MethodMap;
+
+  /// Protects method_map_
+  std::mutex method_map_lock_;
+
+  /// Map of all methods, populated lazily as they are invoked for the first 
time.
+  MethodMap method_map_;
+
+  /// Name of the server that we listen for events from.
+  std::string server_name_;
+
+  /// Metrics subsystem access
+  MetricGroup* metrics_;
  public:
   RpcEventHandler(const std::string& server_name, MetricGroup* metrics);
 
@@ -44,6 +77,24 @@ class RpcEventHandler : public 
apache::thrift::TProcessorEventHandler {
   /// Thrift servers, it is always a ThriftServer::ConnectionContext*.
   virtual void* getContext(const char* fn_name, void* server_context);
 
+  /// From TProcessorEventHandler, called after all RPC work is completed to 
free cxt
+  virtual void freeContext(void* ctx, const char* fn_name);
+
+  /// From TProcessorEventHandler, called before all bytes were read from the 
calling
+  /// client. 'ctx' is the context returned by getContext(), which is an
+  /// InvocationContext*.
+  virtual void preRead(void* ctx, const char* fn_name);
+
+  /// From TProcessorEventHandler, called after all bytes were read from the 
calling
+  /// client. 'ctx' is the context returned by getContext(), which is an
+  /// InvocationContext*.
+  virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes);
+
+  /// From TProcessorEventHandler, called after all bytes were written to the 
calling
+  /// client. 'ctx' is the context returned by getContext(), which is an
+  /// InvocationContext*.
+  virtual void preWrite(void* ctx, const char* fn_name);
+
   /// From TProcessorEventHandler, called after all bytes were written to the 
calling
   /// client. 'ctx' is the context returned by getContext(), which is an
   /// InvocationContext*.
@@ -76,26 +127,18 @@ class RpcEventHandler : public 
apache::thrift::TProcessorEventHandler {
 
   std::string server_name() const { return server_name_; }
 
- private:
-  /// Per-method descriptor
-  struct MethodDescriptor {
-    /// Name of the method
-    std::string name;
-
-    /// Distribution of the time taken to process this RPC.
-    HistogramMetric* processing_time_distribution;
-
-    /// Number of invocations in flight
-    AtomicInt32 num_in_flight;
-  };
-
-  /// Map from method name to descriptor
-  typedef boost::unordered_map<std::string, MethodDescriptor*> MethodMap;
-
   /// Created per-Rpc invocation
   struct InvocationContext {
-    /// Monotonic milliseconds (typically boot time) when the call started.
-    const int64_t start_time_ms;
+    /// Monotonic microseconds (typically boot time) when the call started.
+    const int64_t start_time_us;
+    /// Monotonic microseconds (typically boot time) when thrift read started.
+    int64_t read_start_us;
+    /// Monotonic microseconds (typically boot time) when thrift read ended.
+    int64_t read_end_us;
+    /// Monotonic microseconds (typically boot time) when thrift write started.
+    int64_t write_start_us;
+    /// Monotonic microseconds (typically boot time) when thrift write ended.
+    int64_t write_end_us;
 
     /// Per-connection information, owned by ThriftServer. The lifetime of 
this struct is
     /// tied to the lifetime of the connection, which is guaranteed to be 
longer than the
@@ -105,22 +148,30 @@ class RpcEventHandler : public 
apache::thrift::TProcessorEventHandler {
     /// Pointer to parent MethodDescriptor, to save a lookup on deletion
     MethodDescriptor* method_descriptor;
 
+    // Reference count for registration and automatic deletion
+    AtomicInt64 refcnt_;
+
+    // Add a reference to this InvocationContext so that it can live longer 
than
+    // the RPC call for stats collection purposes.
+    void Register();
+    // Remove a reference to this InvocationContext and free if there are no
+    // more references
+    void UnRegister();
+    // Unregister and collect stats for this InvocationContext if caller is the
+    // last reference. Returns true if unregistered.
+    bool UnRegisterCompleted(uint64_t& read_ns, uint64_t& write_ns);
+
     InvocationContext(int64_t start_time, const 
ThriftServer::ConnectionContext* cnxn_ctx,
         MethodDescriptor* descriptor)
-        : start_time_ms(start_time), cnxn_ctx(cnxn_ctx), 
method_descriptor(descriptor) { }
+        : start_time_us(start_time),
+          read_start_us(0), read_end_us(0), write_start_us(0), write_end_us(0),
+          cnxn_ctx(cnxn_ctx), method_descriptor(descriptor), refcnt_(1) { }
+  private:
+    ~InvocationContext() {} // Use Unregister() externally
   };
 
-  /// Protects method_map_ and rpc_counter_
-  std::mutex method_map_lock_;
-
-  /// Map of all methods, populated lazily as they are invoked for the first 
time.
-  MethodMap method_map_;
-
-  /// Name of the server that we listen for events from.
-  std::string server_name_;
-
-  /// Metrics subsystem access
-  MetricGroup* metrics_;
+  static InvocationContext* GetThreadRPCContext();
+  static void SetThreadRPCContext(InvocationContext* ctxt_ptr);
 
 };
 
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 954ddd085..161d0dab2 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -291,6 +291,10 @@ void QueryDriver::RetryQueryFromThread(
   retry_request_state->SetBlacklistedExecutorAddresses(
       client_request_state_->GetBlacklistedExecutorAddresses());
 
+  // Copy pending RPCs to the retry request. Whichever query ends up succeding
+  // will reap them.
+  retry_request_state->CopyRPCs(*client_request_state_);
+
   // Run the new query.
   status = retry_request_state->Exec();
   if (!status.ok()) {
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index b7d9ab535..b59c27e1d 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -133,6 +133,9 @@ ClientRequestState::ClientRequestState(const TQueryCtx& 
query_ctx, Frontend* fro
   client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer");
   client_wait_time_stats_ =
       ADD_SUMMARY_STATS_TIMER(server_profile_, "ClientFetchWaitTimeStats");
+  rpc_read_timer_ = ADD_TIMER(server_profile_, "RPCReadTimer");
+  rpc_write_timer_ = ADD_TIMER(server_profile_, "RPCWriteTimer");
+  rpc_count_ = ADD_COUNTER(server_profile_, "RPCCount", TUnit::UNIT);
   bool is_external_fe = session_type() == TSessionType::EXTERNAL_FRONTEND;
   // "Impala Backend Timeline" was specifically chosen to exploit the 
lexicographical
   // ordering defined by the underlying std::map holding the timelines 
displayed in
@@ -181,6 +184,8 @@ ClientRequestState::ClientRequestState(const TQueryCtx& 
query_ctx, Frontend* fro
 
 ClientRequestState::~ClientRequestState() {
   DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
+  DCHECK(pending_rpcs_.empty());
+  UnRegisterRemainingRPCs();
 }
 
 Status ClientRequestState::SetResultCache(QueryResultSet* cache,
@@ -1029,6 +1034,7 @@ Status ClientRequestState::ExecShutdownRequest() {
 }
 
 Status ClientRequestState::Finalize(bool check_inflight, const Status* cause) {
+  UnRegisterCompletedRPCs();
   RETURN_IF_ERROR(Cancel(check_inflight, cause, 
/*wait_until_finalized=*/true));
   MarkActive();
   // Make sure we join on wait_thread_ before we finish (and especially before 
this object
@@ -2156,4 +2162,54 @@ TCatalogServiceRequestHeader 
ClientRequestState::GetCatalogServiceRequestHeader(
           query_ctx_.client_request.redacted_stmt : 
query_ctx_.client_request.stmt);
   return header;
 }
+
+void ClientRequestState::RegisterRPC() {
+  RpcEventHandler::InvocationContext* rpc_context =
+      RpcEventHandler::GetThreadRPCContext();
+  // The existence of rpc_context means that this is called from an RPC
+  if (rpc_context) {
+    lock_guard<mutex> l(lock_);
+    if(pending_rpcs_.find(rpc_context) == pending_rpcs_.end()) {
+      rpc_context->Register();
+      pending_rpcs_.insert(rpc_context);
+      rpc_count_->Add(1);
+    }
+  }
+}
+
+void ClientRequestState::UnRegisterCompletedRPCs() {
+  lock_guard<mutex> l(lock_);
+  for (auto iter = pending_rpcs_.begin(); iter != pending_rpcs_.end();) {
+    RpcEventHandler::InvocationContext* rpc_context = *iter;
+    uint64_t read_ns = 0, write_ns = 0;
+    if (rpc_context->UnRegisterCompleted(read_ns, write_ns)) {
+      rpc_read_timer_->Add(read_ns);
+      rpc_write_timer_->Add(write_ns);
+      iter = pending_rpcs_.erase(iter);
+    } else {
+      ++iter;
+    }
+  }
+}
+
+void ClientRequestState::UnRegisterRemainingRPCs() {
+  lock_guard<mutex> l(lock_);
+  for (auto rpc_context: pending_rpcs_) {
+    rpc_context->UnRegister();
+  }
+  pending_rpcs_.clear();
+}
+
+void ClientRequestState::CopyRPCs(ClientRequestState& from_request) {
+  lock_guard<mutex> l_to(lock_);
+  lock_guard<mutex> l_from(from_request.lock_);
+  rpc_read_timer_->Add(from_request.rpc_read_timer_->value());
+  rpc_write_timer_->Add(from_request.rpc_write_timer_->value());
+  rpc_count_->Add(from_request.rpc_count_->value());
+  for (auto rpc_context: from_request.pending_rpcs_) {
+    rpc_context->Register();
+    pending_rpcs_.insert(rpc_context);
+  }
+}
+
 }
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 9f6f4ad91..2f112dc08 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -21,6 +21,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exec/catalog-op-executor.h"
+#include "rpc/rpc-trace.h"
 #include "service/child-query.h"
 #include "service/impala-server.h"
 #include "service/query-result-set.h"
@@ -450,6 +451,17 @@ class ClientRequestState {
   /// returned without error.
   void SetPlanningDone() { is_planning_done_.store(true); }
 
+  // Register an RPC context with the query so that RPC metrics can be
+  // accumulated at the query level and shown in the query profile
+  void RegisterRPC();
+  // Unregister any RPCs that have been released by their RPC thread and
+  // accumulate their metrics into the profile timers
+  void UnRegisterCompletedRPCs();
+  // Unregister any remaining RPCs without recording timing in this query.
+  // RPC threads will usually have completed by this point.
+  void UnRegisterRemainingRPCs();
+  // Copy pending RPCs for a retried query
+  void CopyRPCs(ClientRequestState& from_request);
  protected:
   /// Updates the end_time_us_ of this query if it isn't set. The end time is 
determined
   /// when this function is called for the first time, calling it multiple 
times does not
@@ -609,6 +621,15 @@ class ClientRequestState {
   MonotonicStopWatch client_wait_sw_;
   int64_t last_client_wait_time_ = 0;
 
+  // Tracks time spent by client calls reading RPC arguments
+  RuntimeProfile::Counter* rpc_read_timer_;
+  // Tracks time spent by client calls writing RPC results
+  RuntimeProfile::Counter* rpc_write_timer_;
+  // Tracks number of client RPCs
+  RuntimeProfile::Counter* rpc_count_;
+  // Contexts for RPC calls that have not completed and had their stats 
collected
+  std::set<RpcEventHandler::InvocationContext*> pending_rpcs_;
+
   RuntimeProfile::EventSequence* query_events_;
 
   bool is_cancelled_ = false; // if true, Cancel() was called.
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3e73c0029..78421f6e7 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1507,6 +1507,12 @@ void ImpalaServer::CloseClientRequestState(const 
QueryHandle& query_handle) {
       ImpaladMetrics::QUERY_DURATIONS->Update(duration_ms);
     }
   }
+
+  // Final attempt to capture completed RPC stats
+  query_handle->UnRegisterCompletedRPCs();
+  // Unregister any remaining RPC and discard stats
+  query_handle->UnRegisterRemainingRPCs();
+
   {
     lock_guard<mutex> l(query_handle->session()->lock);
     query_handle->session()->inflight_queries.erase(query_handle->query_id());
@@ -1603,6 +1609,10 @@ Status ImpalaServer::GetActiveQueryHandle(
     return err;
   }
   query_handle->SetHandle(query_driver, 
query_driver->GetActiveClientRequestState());
+  // Update RPC Stats before every call. This is done here to minimize the
+  // pending set size and keep the profile updated while the query is 
executing.
+  (*query_handle)->UnRegisterCompletedRPCs();
+  (*query_handle)->RegisterRPC();
   return Status::OK();
 }
 
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f8764cb90..64ccc5c10 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1023,6 +1023,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Returns the active QueryHandle for this query id. The QueryHandle 
contains the
   /// active ClientRequestState. Returns an error Status if the query id 
cannot be found.
+  /// If caller is an RPC thread, RPC context will be registered for time 
tracking
   /// See QueryDriver for a description of active ClientRequestStates.
   Status GetActiveQueryHandle(
       const TUniqueId& query_id, QueryHandle* query_handle);
diff --git a/be/src/util/histogram-metric.cc b/be/src/util/histogram-metric.cc
index 2de1a3272..a4b276e94 100644
--- a/be/src/util/histogram-metric.cc
+++ b/be/src/util/histogram-metric.cc
@@ -151,6 +151,7 @@ string HistogramMetric::HistogramToHumanReadable(T* 
histogram, TUnit::type unit)
   DCHECK(histogram != nullptr);
   stringstream out;
   out << "Count: " << histogram->TotalCount() << ", "
+      << "sum: " << PrettyPrinter::Print(histogram->TotalSum(), unit) << ", "
       << "min / max: " << PrettyPrinter::Print(histogram->MinValue(), unit) << 
" / "
       << PrettyPrinter::Print(histogram->MaxValue(), unit) << ", "
       << "25th %-ile: " << 
PrettyPrinter::Print(histogram->ValueAtPercentile(25), unit)
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 6c1785b04..770ae2c99 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -408,7 +408,8 @@ TEST_F(MetricsTest, HistogramMetrics) {
   EXPECT_EQ(histo_val["min"].GetInt(), 0);
   EXPECT_EQ(histo_val["max"].GetInt(), MAX_VALUE + 1);
 
-  EXPECT_EQ(metric->ToHumanReadable(), "Count: 10002, min / max: 0 / 10s001ms, 
"
+  EXPECT_EQ(metric->ToHumanReadable(),
+      "Count: 10002, sum: 13h53m, min / max: 0 / 10s001ms, "
       "25th %-ile: 2s500ms, 50th %-ile: 5s000ms, 75th %-ile: 7s500ms, "
       "90th %-ile: 9s000ms, 95th %-ile: 9s496ms, 99.9th %-ile: 9s984ms");
 }
diff --git a/tests/query_test/test_fetch.py b/tests/query_test/test_fetch.py
index 16e92a042..22f2b3b38 100644
--- a/tests/query_test/test_fetch.py
+++ b/tests/query_test/test_fetch.py
@@ -22,7 +22,7 @@ from time import sleep
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import extend_exec_option_dimension
 from tests.util.parse_util import parse_duration_string_ms, \
-    get_time_summary_stats_counter
+    parse_duration_string_ns, get_time_summary_stats_counter
 
 
 class TestFetch(ImpalaTestSuite):
@@ -66,6 +66,23 @@ class TestFetch(ImpalaTestSuite):
       materialization_timer = re.search("RowMaterializationTimer: (.*)", 
runtime_profile)
       assert materialization_timer and len(materialization_timer.groups()) == 
1 and \
           parse_duration_string_ms(materialization_timer.group(1)) > 1000
+      assert re.search("RPCCount: [5-9]", runtime_profile)
+
+      rpc_read_timer = re.search("RPCReadTimer: (.*)", runtime_profile)
+      assert rpc_read_timer and len(rpc_read_timer.groups()) == 1
+      rpc_read_ns = parse_duration_string_ns(rpc_read_timer.group(1))
+      assert 0 < rpc_read_ns and rpc_read_ns < 1000000
+
+      rpc_write_timer = re.search("RPCWriteTimer: (.*)", runtime_profile)
+      assert rpc_write_timer and len(rpc_write_timer.groups()) == 1
+      rpc_write_ns = parse_duration_string_ns(rpc_write_timer.group(1))
+      assert 0 < rpc_write_ns and rpc_write_ns < 10000000
+
+      create_result_time = re.search("CreateResultSetTime: (.*)", 
runtime_profile)
+      assert create_result_time and len(create_result_time.groups()) == 1
+      create_result_ms = parse_duration_string_ms(create_result_time.group(1))
+      assert 2400 < create_result_ms and create_result_ms < 2600
+
     finally:
       self.client.close_query(handle)
 
diff --git a/tests/webserver/test_web_pages.py 
b/tests/webserver/test_web_pages.py
index 2ea85dfb2..22d3e2ba3 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -613,6 +613,26 @@ class TestWebPage(ImpalaTestSuite):
       assert any(pattern in t for t in thread_names), \
           "Could not find thread matching '%s'" % pattern
 
+  def test_rpc_read_write_metrics(self):
+    """Test that read/write metrics are exposed in /rpcz"""
+    rpcz = self.get_debug_page(self.RPCZ_URL)
+    hist_time_regex = "[0-9][0-9numsh.]*"
+    rpc_histogram_regex = (
+      "Count: [0-9]+, sum: " + hist_time_regex
+      + ", min / max: " + hist_time_regex + " / " + hist_time_regex
+      + ", 25th %-ile: " + hist_time_regex
+      + ", 50th %-ile: " + hist_time_regex
+      + ", 75th %-ile: " + hist_time_regex
+      + ", 90th %-ile: " + hist_time_regex
+      + ", 95th %-ile: " + hist_time_regex
+      + ", 99.9th %-ile: " + hist_time_regex)
+    assert len(rpcz['servers']) > 0
+    for s in rpcz['servers']:
+      for m in s['methods']:
+        assert re.search(rpc_histogram_regex, m["summary"])
+        assert re.search(rpc_histogram_regex, m["read"])
+        assert re.search(rpc_histogram_regex, m["write"])
+
   def test_krpc_rpcz(self):
     """Test that KRPC metrics are exposed in /rpcz and that they are updated 
when
     executing a query."""
diff --git a/www/rpcz.tmpl b/www/rpcz.tmpl
index 63a2c1b23..82869a9cf 100644
--- a/www/rpcz.tmpl
+++ b/www/rpcz.tmpl
@@ -178,19 +178,34 @@ under the License.
     Reset all
   </button>
 </h2>
+<label>
+<input type="checkbox" id="show_rw_stats" onClick="toggleRWStats()"/>
+  <span id="refresh_on">Show Read/Write stats</span>
+</label>
 {{/servers}}
 {{#servers}}
 
+
+<table width=100%>
+<tr>
+<td>
 <h3><samp>{{name}} </samp>
   <button class="btn btn-warning btn-xs" onClick="reset_server('{{name}}');">
     Reset stats
   </button>
 </h3>
+</td>
+<td align=right>
+</td>
+</tr>
+</table>
 
 <table class='table table-hover table-bordered' id='{{name}}'>
   <tr>
     <th>Method</th>
     <th>Duration summary</th>
+    <th class="rwstats" style="display:none">Read</th>
+    <th class="rwstats" style="display:none">Write</th>
     <th>In Flight</th>
     <th></th>
   </tr>
@@ -198,6 +213,8 @@ under the License.
   <tr>
     <td><samp>{{name}}</samp></td>
     <td>{{summary}}</td>
+    <td class="rwstats">{{read}}</td>
+    <td class="rwstats">{{write}}</td>
     <td>{{in_flight}}</td>
     <td align="center">
       <button class="btn btn-warning btn-xs"
@@ -210,6 +227,11 @@ under the License.
 </table>
 {{/servers}}
 
+<style>
+.rwstats{
+}
+</style>
+
 <script>
 function reset_all() {
   var xhr = new XMLHttpRequest();
@@ -241,6 +263,14 @@ function update_impala_services(json) {
       var row = table.insertRow();
       row.insertCell().innerHTML = "<samp>" + method.name + "</samp>";
       row.insertCell().innerHTML = method.summary;
+      cell = row.insertCell()
+      cell.className= "rwstats";
+      cell.style.display= getRWDisplay();
+      cell.innerHTML = method.read;
+      cell = row.insertCell()
+      cell.className= "rwstats";
+      cell.style.display= getRWDisplay();
+      cell.innerHTML = method.write;
       row.insertCell().innerHTML = method.in_flight;
       var reset_cell = row.insertCell();
       reset_cell.align = "center";
@@ -391,6 +421,21 @@ function toggleRefresh() {
     document.getElementById("refresh_on").textContent = "Auto-refresh off";
   }
 }
+
+function getRWDisplay() {
+  if (document.getElementById('show_rw_stats').checked) {
+    return "";
+  }
+  return "none";
+}
+
+function toggleRWStats() {
+  var rwDisplay = getRWDisplay();
+  for (item of document.getElementsByClassName('rwstats')) {
+    item.style.display = rwDisplay;
+  }
+}
+
 </script>
 
 {{> www/common-footer.tmpl }}


Reply via email to