IMPALA-7449: Fix network throughput calculation of DataStreamSender Currently, the network throughput presented in the query profile for DataStreamSender is computed by dividing the total bytes sent by the total network time which is the sum of observed network time of all individual RPCs. This is wrong in general and may only make sense if the network throughput is fixed. In addition, RPCs are asynchronous and they overlap with each other. So, dividing the total byte sent by network throughput may result in time which exceeds the wall clock time, making it impossible to interpret.
This change fixes the problem by measuring the network throughput of each individual RPC and uses a summary counter to track avg/min/max of network throughputs instead. Change-Id: I344ac76c0a1a49b4da3d37d2c547f3d5051ebe24 Reviewed-on: http://gerrit.cloudera.org:8080/11241 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2a60655b Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2a60655b Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2a60655b Branch: refs/heads/master Commit: 2a60655b09afaa76d3bf2120c7043eb0b22eefcf Parents: f849eff Author: Michael Ho <[email protected]> Authored: Wed Aug 15 14:25:16 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Aug 21 21:29:07 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/krpc-data-stream-sender.cc | 17 ++++++++++------- be/src/runtime/krpc-data-stream-sender.h | 11 ++++------- 2 files changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/2a60655b/be/src/runtime/krpc-data-stream-sender.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc index e6cea1f..6a2e5b3 100644 --- a/be/src/runtime/krpc-data-stream-sender.cc +++ b/be/src/runtime/krpc-data-stream-sender.cc @@ -394,10 +394,16 @@ void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() { const kudu::Status controller_status = rpc_controller_.status(); if (LIKELY(controller_status.ok())) { DCHECK(rpc_in_flight_batch_ != nullptr); - COUNTER_ADD(parent_->bytes_sent_counter_, - RowBatch::GetSerializedSize(*rpc_in_flight_batch_)); + int64_t row_batch_size = RowBatch::GetSerializedSize(*rpc_in_flight_batch_); int64_t network_time = total_time - resp_.receiver_latency_ns(); - COUNTER_ADD(&parent_->total_network_timer_, network_time); + COUNTER_ADD(parent_->bytes_sent_counter_, row_batch_size); + if (LIKELY(network_time > 0)) { + // 'row_batch_size' is bounded by FLAGS_rpc_max_message_size which shouldn't exceed + // max 32-bit signed value so multiplication below should not overflow. + DCHECK_LE(row_batch_size, numeric_limits<int32_t>::max()); + int64_t network_throughput = row_batch_size * NANOS_PER_SEC / network_time; + parent_->network_throughput_counter_->UpdateCounter(network_throughput); + } Status rpc_status = Status::OK(); int32_t status_code = resp_.status().status_code(); if (status_code == TErrorCode::DATASTREAM_RECVR_CLOSED) { @@ -584,7 +590,6 @@ KrpcDataStreamSender::KrpcDataStreamSender(int sender_id, const RowDescriptor* r sender_id_(sender_id), partition_type_(sink.output_partition.type), per_channel_buffer_size_(per_channel_buffer_size), - total_network_timer_(TUnit::TIME_NS, 0), dest_node_id_(sink.dest_node_id), next_unknown_partition_(0) { DCHECK_GT(destinations.size(), 0); @@ -642,9 +647,7 @@ Status KrpcDataStreamSender::Prepare( bytes_sent_time_series_counter_ = ADD_TIME_SERIES_COUNTER(profile(), "BytesSent", bytes_sent_counter_); network_throughput_counter_ = - profile()->AddDerivedCounter("NetworkThroughput", TUnit::BYTES_PER_SECOND, - bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_, - &total_network_timer_)); + ADD_SUMMARY_STATS_COUNTER(profile(), "NetworkThroughput", TUnit::BYTES_PER_SECOND); eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT); uncompressed_bytes_counter_ = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); http://git-wip-us.apache.org/repos/asf/impala/blob/2a60655b/be/src/runtime/krpc-data-stream-sender.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h index 6757c2a..65580b1 100644 --- a/be/src/runtime/krpc-data-stream-sender.h +++ b/be/src/runtime/krpc-data-stream-sender.h @@ -207,13 +207,10 @@ class KrpcDataStreamSender : public DataSink { /// Total number of rows sent. RuntimeProfile::Counter* total_sent_rows_counter_ = nullptr; - /// Approximate network throughput for sending row batches. - RuntimeProfile::Counter* network_throughput_counter_ = nullptr; - - /// Aggregated time spent in network (including queuing time in KRPC transfer queue) - /// for transmitting the RPC requests and receiving the responses. Used for computing - /// 'network_throughput_'. Not too meaningful by itself so not shown in profile. - RuntimeProfile::Counter total_network_timer_; + /// Summary of network throughput for sending row batches. Network time also includes + /// queuing time in KRPC transfer queue for transmitting the RPC requests and receiving + /// the responses. + RuntimeProfile::SummaryStatsCounter* network_throughput_counter_ = nullptr; /// Identifier of the destination plan node. PlanNodeId dest_node_id_;
